You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by dw...@apache.org on 2022/02/22 08:27:29 UTC

[flink] branch master updated (84530f4 -> c88189f)

This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from 84530f4  [FLINK-26152] [docs] Translate the page of SQL/queries/WITH clause (#18828)
     new 0b9f895  [FLINK-26146] Add tests of flink version upgrades to cover native snapshots
     new 9dd6043  [FLINK-26176] Fix scala savepoint migration tests
     new c88189f  [FLINK-26146] Adapt scala tests to cover native snapshots migration

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../LegacyStatefulJobSavepointMigrationITCase.java |   8 +-
 ...ava => StatefulJobSnapshotMigrationITCase.java} | 145 ++++++------
 .../StatefulJobWBroadcastStateMigrationITCase.java | 139 +++++++-----
 .../checkpointing/utils/MigrationTestUtils.java    |   4 +-
 ...estBase.java => SnapshotMigrationTestBase.java} | 191 ++++++++++++++--
 .../TypeSerializerSnapshotMigrationITCase.java     | 155 +++++++------
 .../_metadata                                      | Bin 41481 -> 0 bytes
 .../_metadata                                      | Bin 41584 -> 0 bytes
 .../_metadata                                      | Bin 41436 -> 0 bytes
 .../_metadata                                      | Bin 41962 -> 0 bytes
 .../_metadata                                      | Bin 42002 -> 0 bytes
 .../_metadata                                      | Bin 213759 -> 0 bytes
 .../_metadata                                      | Bin 213759 -> 0 bytes
 .../_metadata                                      | Bin 204543 -> 0 bytes
 .../_metadata                                      | Bin 204543 -> 0 bytes
 .../_metadata                                      | Bin 205638 -> 0 bytes
 .../_metadata                                      | Bin 205638 -> 0 bytes
 .../_metadata                                      | Bin 229848 -> 0 bytes
 .../_metadata                                      | Bin 229848 -> 0 bytes
 .../_metadata                                      | Bin 39985 -> 0 bytes
 .../_metadata                                      | Bin 40081 -> 0 bytes
 .../_metadata                                      | Bin 53342 -> 0 bytes
 .../_metadata                                      | Bin 53419 -> 0 bytes
 .../_metadata                                      | Bin 53271 -> 0 bytes
 .../_metadata                                      | Bin 53733 -> 0 bytes
 .../_metadata                                      | Bin 53773 -> 0 bytes
 .../_metadata                                      | Bin 220470 -> 0 bytes
 .../_metadata                                      | Bin 220470 -> 0 bytes
 .../_metadata                                      | Bin 253956 -> 0 bytes
 .../_metadata                                      | Bin 253956 -> 0 bytes
 .../_metadata                                      | Bin 287008 -> 0 bytes
 .../_metadata                                      | Bin 287008 -> 0 bytes
 .../_metadata                                      | Bin 51846 -> 0 bytes
 .../_metadata                                      | Bin 51942 -> 0 bytes
 .../StatefulJobSavepointMigrationITCase.scala      | 206 +++++++++--------
 ...StatefulJobWBroadcastStateMigrationITCase.scala | 251 ++++++++++-----------
 pom.xml                                            |   4 +-
 37 files changed, 651 insertions(+), 452 deletions(-)
 rename flink-tests/src/test/java/org/apache/flink/test/checkpointing/{utils => }/LegacyStatefulJobSavepointMigrationITCase.java (99%)
 rename flink-tests/src/test/java/org/apache/flink/test/checkpointing/{utils/StatefulJobSavepointMigrationITCase.java => StatefulJobSnapshotMigrationITCase.java} (76%)
 rename flink-tests/src/test/java/org/apache/flink/test/checkpointing/{utils => }/StatefulJobWBroadcastStateMigrationITCase.java (80%)
 rename flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/{SavepointMigrationTestBase.java => SnapshotMigrationTestBase.java} (54%)
 delete mode 100644 flink-tests/src/test/resources/stateful-scala-udf-migration-itcase-flink1.10-rocksdb-savepoint/_metadata
 delete mode 100644 flink-tests/src/test/resources/stateful-scala-udf-migration-itcase-flink1.11-rocksdb-savepoint/_metadata
 delete mode 100644 flink-tests/src/test/resources/stateful-scala-udf-migration-itcase-flink1.12-rocksdb-savepoint/_metadata
 delete mode 100644 flink-tests/src/test/resources/stateful-scala-udf-migration-itcase-flink1.13-rocksdb-savepoint/_metadata
 delete mode 100644 flink-tests/src/test/resources/stateful-scala-udf-migration-itcase-flink1.14-rocksdb-savepoint/_metadata
 delete mode 100644 flink-tests/src/test/resources/stateful-scala-udf-migration-itcase-flink1.3-jobmanager-savepoint/_metadata
 delete mode 100644 flink-tests/src/test/resources/stateful-scala-udf-migration-itcase-flink1.3-rocksdb-savepoint/_metadata
 delete mode 100644 flink-tests/src/test/resources/stateful-scala-udf-migration-itcase-flink1.4-jobmanager-savepoint/_metadata
 delete mode 100644 flink-tests/src/test/resources/stateful-scala-udf-migration-itcase-flink1.4-rocksdb-savepoint/_metadata
 delete mode 100644 flink-tests/src/test/resources/stateful-scala-udf-migration-itcase-flink1.6-jobmanager-savepoint/_metadata
 delete mode 100644 flink-tests/src/test/resources/stateful-scala-udf-migration-itcase-flink1.6-rocksdb-savepoint/_metadata
 delete mode 100644 flink-tests/src/test/resources/stateful-scala-udf-migration-itcase-flink1.7-jobmanager-savepoint/_metadata
 delete mode 100644 flink-tests/src/test/resources/stateful-scala-udf-migration-itcase-flink1.7-rocksdb-savepoint/_metadata
 delete mode 100644 flink-tests/src/test/resources/stateful-scala-udf-migration-itcase-flink1.8-rocksdb-savepoint/_metadata
 delete mode 100644 flink-tests/src/test/resources/stateful-scala-udf-migration-itcase-flink1.9-rocksdb-savepoint/_metadata
 delete mode 100644 flink-tests/src/test/resources/stateful-scala-with-broadcast-udf-migration-itcase-flink1.10-rocksdb-savepoint/_metadata
 delete mode 100644 flink-tests/src/test/resources/stateful-scala-with-broadcast-udf-migration-itcase-flink1.11-rocksdb-savepoint/_metadata
 delete mode 100644 flink-tests/src/test/resources/stateful-scala-with-broadcast-udf-migration-itcase-flink1.12-rocksdb-savepoint/_metadata
 delete mode 100644 flink-tests/src/test/resources/stateful-scala-with-broadcast-udf-migration-itcase-flink1.13-rocksdb-savepoint/_metadata
 delete mode 100644 flink-tests/src/test/resources/stateful-scala-with-broadcast-udf-migration-itcase-flink1.14-rocksdb-savepoint/_metadata
 delete mode 100644 flink-tests/src/test/resources/stateful-scala-with-broadcast-udf-migration-itcase-flink1.5-jobmanager-savepoint/_metadata
 delete mode 100644 flink-tests/src/test/resources/stateful-scala-with-broadcast-udf-migration-itcase-flink1.5-rocksdb-savepoint/_metadata
 delete mode 100644 flink-tests/src/test/resources/stateful-scala-with-broadcast-udf-migration-itcase-flink1.6-jobmanager-savepoint/_metadata
 delete mode 100644 flink-tests/src/test/resources/stateful-scala-with-broadcast-udf-migration-itcase-flink1.6-rocksdb-savepoint/_metadata
 delete mode 100644 flink-tests/src/test/resources/stateful-scala-with-broadcast-udf-migration-itcase-flink1.7-jobmanager-savepoint/_metadata
 delete mode 100644 flink-tests/src/test/resources/stateful-scala-with-broadcast-udf-migration-itcase-flink1.7-rocksdb-savepoint/_metadata
 delete mode 100644 flink-tests/src/test/resources/stateful-scala-with-broadcast-udf-migration-itcase-flink1.8-rocksdb-savepoint/_metadata
 delete mode 100644 flink-tests/src/test/resources/stateful-scala-with-broadcast-udf-migration-itcase-flink1.9-rocksdb-savepoint/_metadata

[flink] 03/03: [FLINK-26146] Adapt scala tests to cover native snapshots migration

Posted by dw...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit c88189f0245630b7b16e7e5dd63b8b7fbaecd92a
Author: Sebastian Mattheis <se...@ververica.com>
AuthorDate: Fri Feb 18 10:18:03 2022 +0100

    [FLINK-26146] Adapt scala tests to cover native snapshots migration
    
    Adapt scala savepoint migration tests to cover native savepoints and checkpoints.
    
    This closes #18850
---
 .../utils/SnapshotMigrationTestBase.java           |  11 ---
 .../StatefulJobSavepointMigrationITCase.scala      | 103 ++++++++++++++-------
 ...StatefulJobWBroadcastStateMigrationITCase.scala | 101 +++++++++++++-------
 3 files changed, 136 insertions(+), 79 deletions(-)

diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SnapshotMigrationTestBase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SnapshotMigrationTestBase.java
index 57a4894..2cb6b6a 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SnapshotMigrationTestBase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SnapshotMigrationTestBase.java
@@ -257,17 +257,6 @@ public abstract class SnapshotMigrationTestBase extends TestBaseUtils {
         return config;
     }
 
-    @Deprecated
-    @SafeVarargs
-    protected final void executeAndSavepoint(
-            StreamExecutionEnvironment env,
-            String snapshotPath,
-            Tuple2<String, Integer>... expectedAccumulators)
-            throws Exception {
-        executeAndSnapshot(
-                env, snapshotPath, SnapshotType.SAVEPOINT_CANONICAL, expectedAccumulators);
-    }
-
     @SafeVarargs
     protected final void executeAndSnapshot(
             StreamExecutionEnvironment env,
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/migration/StatefulJobSavepointMigrationITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/migration/StatefulJobSavepointMigrationITCase.scala
index 3ff91ee..cf88f39 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/migration/StatefulJobSavepointMigrationITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/migration/StatefulJobSavepointMigrationITCase.scala
@@ -41,7 +41,7 @@ import org.apache.flink.api.java.tuple.Tuple2
 import org.apache.flink.runtime.state.{FunctionInitializationContext, FunctionSnapshotContext, StateBackendLoader}
 import org.apache.flink.api.scala._
 import org.apache.flink.api.scala.migration.CustomEnum.CustomEnum
-import org.apache.flink.test.checkpointing.utils.SnapshotMigrationTestBase.ExecutionMode
+import org.apache.flink.test.checkpointing.utils.SnapshotMigrationTestBase.{ExecutionMode, SnapshotSpec, SnapshotType}
 import org.junit.runner.RunWith
 import org.junit.runners.Parameterized
 import org.junit.Test
@@ -59,31 +59,54 @@ object StatefulJobSavepointMigrationITCase {
   // master.
   val executionMode = ExecutionMode.VERIFY_SNAPSHOT
 
-  @Parameterized.Parameters(name = "Migrate Savepoint / Backend: {0}")
-  def parameters: util.Collection[(FlinkVersion, String)] = {
-    var parameters = util.Arrays.asList(
-      //      (FlinkVersion.v1_3, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
-      //      (FlinkVersion.v1_4, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
-      //      (FlinkVersion.v1_6, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
-      //      (FlinkVersion.v1_7, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
-      // Note: It is not safe to restore savepoints created in a Scala applications with Flink
-      // version 1.7 or below. The reason is that up to version 1.7 the underlying Scala serializer
-      // used names of anonymous classes that depend on the relative position/order in code, e.g.,
-      // if two anonymous classes, instantiated inside the same class and from the same base class,
-      // change order in the code their names are switched.
-      // As a consequence, changes in code may result in restore failures.
-      // This was fixed in version 1.8, see: https://issues.apache.org/jira/browse/FLINK-10493
-      (FlinkVersion.v1_8, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
-      (FlinkVersion.v1_9, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
-      (FlinkVersion.v1_10, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
-      (FlinkVersion.v1_11, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
-      (FlinkVersion.v1_12, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
-      (FlinkVersion.v1_13, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
-      (FlinkVersion.v1_14, StateBackendLoader.HASHMAP_STATE_BACKEND_NAME),
-      (FlinkVersion.v1_15, StateBackendLoader.HASHMAP_STATE_BACKEND_NAME),
-    )
+  @Parameterized.Parameters(name = "Test snapshot: {0}")
+  def parameters: util.Collection[SnapshotSpec] = {
+    // Note: It is not safe to restore savepoints created in a Scala applications with Flink
+    // version 1.7 or below. The reason is that up to version 1.7 the underlying Scala serializer
+    // used names of anonymous classes that depend on the relative position/order in code, e.g.,
+    // if two anonymous classes, instantiated inside the same class and from the same base class,
+    // change order in the code their names are switched.
+    // As a consequence, changes in code may result in restore failures.
+    // This was fixed in version 1.8, see: https://issues.apache.org/jira/browse/FLINK-10493
+    var parameters: util.List[SnapshotSpec] = new util.LinkedList[SnapshotSpec]()
+    parameters.addAll(
+      SnapshotSpec.withVersions(
+        StateBackendLoader.MEMORY_STATE_BACKEND_NAME,
+        SnapshotType.SAVEPOINT_CANONICAL,
+        FlinkVersion.rangeOf(FlinkVersion.v1_8, FlinkVersion.v1_13)))
+    parameters.addAll(
+      SnapshotSpec.withVersions(
+        StateBackendLoader.HASHMAP_STATE_BACKEND_NAME,
+        SnapshotType.SAVEPOINT_CANONICAL,
+        FlinkVersion.rangeOf(FlinkVersion.v1_14, currentVersion)))
+    parameters.addAll(
+      SnapshotSpec.withVersions(
+        StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME,
+        SnapshotType.SAVEPOINT_CANONICAL,
+        FlinkVersion.rangeOf(FlinkVersion.v1_15, currentVersion)))
+    parameters.addAll(
+      SnapshotSpec.withVersions(
+        StateBackendLoader.HASHMAP_STATE_BACKEND_NAME,
+        SnapshotType.SAVEPOINT_NATIVE,
+        FlinkVersion.rangeOf(FlinkVersion.v1_15, currentVersion)))
+    parameters.addAll(
+      SnapshotSpec.withVersions(
+        StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME,
+        SnapshotType.SAVEPOINT_NATIVE,
+        FlinkVersion.rangeOf(FlinkVersion.v1_15, currentVersion)))
+    parameters.addAll(
+      SnapshotSpec.withVersions(
+        StateBackendLoader.HASHMAP_STATE_BACKEND_NAME,
+        SnapshotType.CHECKPOINT,
+        FlinkVersion.rangeOf(FlinkVersion.v1_15, currentVersion)))
+    parameters.addAll(
+      SnapshotSpec.withVersions(
+        StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME,
+        SnapshotType.CHECKPOINT,
+        FlinkVersion.rangeOf(FlinkVersion.v1_15, currentVersion)))
     if (executionMode == ExecutionMode.CREATE_SNAPSHOT) {
-      parameters = parameters.stream().filter(x => x._1 == currentVersion)
+      parameters = parameters.stream()
+        .filter(x => x.getFlinkVersion().equals(currentVersion))
         .collect(Collectors.toList())
     }
     parameters
@@ -91,10 +114,20 @@ object StatefulJobSavepointMigrationITCase {
 
   val NUM_ELEMENTS = 4
 
-  def getSnapshotPath(migrationVersionAndBackend: (FlinkVersion, String)): String = {
-    s"stateful-scala-udf-migration-itcase" +
-      s"-flink${migrationVersionAndBackend._1}" +
-      s"-${migrationVersionAndBackend._2}-savepoint"
+  def getSnapshotPath(snapshotSpec: SnapshotSpec): String = {
+    val path = new StringBuilder(s"stateful-scala-udf-migration-itcase")
+    path ++= s"-flink${snapshotSpec.getFlinkVersion()}"
+    path ++= s"-${snapshotSpec.getStateBackendType()}"
+    snapshotSpec.getSnapshotType() match {
+      case SnapshotType.SAVEPOINT_CANONICAL =>
+        path ++= "-savepoint"
+      case SnapshotType.SAVEPOINT_NATIVE =>
+        path ++= "-savepoint-native"
+      case SnapshotType.CHECKPOINT =>
+        path ++= "-checkpoint"
+      case _ => throw new UnsupportedOperationException
+    }
+    path.toString()
   }
 }
 
@@ -102,8 +135,7 @@ object StatefulJobSavepointMigrationITCase {
  * ITCase for migration Scala state types across different Flink versions.
  */
 @RunWith(classOf[Parameterized])
-class StatefulJobSavepointMigrationITCase(
-                                           migrationVersionAndBackend: (FlinkVersion, String))
+class StatefulJobSavepointMigrationITCase(snapshotSpec: SnapshotSpec)
   extends SnapshotMigrationTestBase with Serializable {
 
   @Test
@@ -111,7 +143,7 @@ class StatefulJobSavepointMigrationITCase(
     val env = StreamExecutionEnvironment.getExecutionEnvironment
     env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
 
-    migrationVersionAndBackend._2 match {
+    snapshotSpec.getStateBackendType match {
       case StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME =>
         env.setStateBackend(new EmbeddedRocksDBStateBackend())
       case StateBackendLoader.MEMORY_STATE_BACKEND_NAME =>
@@ -138,10 +170,11 @@ class StatefulJobSavepointMigrationITCase(
       .addSink(new AccumulatorCountingSink)
 
     if (StatefulJobSavepointMigrationITCase.executionMode == ExecutionMode.CREATE_SNAPSHOT) {
-      executeAndSavepoint(
+      executeAndSnapshot(
         env,
         s"src/test/resources/"
-          + StatefulJobSavepointMigrationITCase.getSnapshotPath(migrationVersionAndBackend),
+          + StatefulJobSavepointMigrationITCase.getSnapshotPath(snapshotSpec),
+        snapshotSpec.getSnapshotType(),
         new Tuple2(
           AccumulatorCountingSink.NUM_ELEMENTS_ACCUMULATOR,
           StatefulJobSavepointMigrationITCase.NUM_ELEMENTS
@@ -152,7 +185,7 @@ class StatefulJobSavepointMigrationITCase(
       restoreAndExecute(
         env,
         SnapshotMigrationTestBase.getResourceFilename(
-          StatefulJobSavepointMigrationITCase.getSnapshotPath(migrationVersionAndBackend)),
+          StatefulJobSavepointMigrationITCase.getSnapshotPath(snapshotSpec)),
         new Tuple2(
           AccumulatorCountingSink.NUM_ELEMENTS_ACCUMULATOR,
           StatefulJobSavepointMigrationITCase.NUM_ELEMENTS)
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/migration/StatefulJobWBroadcastStateMigrationITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/migration/StatefulJobWBroadcastStateMigrationITCase.scala
index 47dc3ae..dc9b540 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/migration/StatefulJobWBroadcastStateMigrationITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/migration/StatefulJobWBroadcastStateMigrationITCase.scala
@@ -42,7 +42,7 @@ import org.apache.flink.streaming.api.functions.sink.RichSinkFunction
 import org.apache.flink.streaming.api.functions.source.SourceFunction
 import org.apache.flink.streaming.api.watermark.Watermark
 import org.apache.flink.test.checkpointing.utils.SnapshotMigrationTestBase
-import org.apache.flink.test.checkpointing.utils.SnapshotMigrationTestBase.ExecutionMode
+import org.apache.flink.test.checkpointing.utils.SnapshotMigrationTestBase.{ExecutionMode, SnapshotSpec, SnapshotType}
 import org.apache.flink.util.Collector
 import org.junit.runner.RunWith
 import org.junit.runners.Parameterized
@@ -61,39 +61,73 @@ object StatefulJobWBroadcastStateMigrationITCase {
   // master.
   val executionMode = ExecutionMode.VERIFY_SNAPSHOT
 
-  @Parameterized.Parameters(name = "Migrate Savepoint / Backend: {0}")
-  def parameters: util.Collection[(FlinkVersion, String)] = {
-    var parameters = util.Arrays.asList(
-      //      (FlinkVersion.v1_5, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
-      //      (FlinkVersion.v1_6, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
-      //      (FlinkVersion.v1_7, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
-      // Note: It is not safe to restore savepoints created in a Scala applications with Flink
-      // version 1.7 or below. The reason is that up to version 1.7 the underlying Scala serializer
-      // used names of anonymous classes that depend on the relative position/order in code, e.g.,
-      // if two anonymous classes, instantiated inside the same class and from the same base class,
-      // change order in the code their names are switched.
-      // As a consequence, changes in code may result in restore failures.
-      // This was fixed in version 1.8, see: https://issues.apache.org/jira/browse/FLINK-10493
-      (FlinkVersion.v1_8, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
-      (FlinkVersion.v1_9, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
-      (FlinkVersion.v1_10, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
-      (FlinkVersion.v1_11, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
-      (FlinkVersion.v1_12, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
-      (FlinkVersion.v1_13, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
-      (FlinkVersion.v1_14, StateBackendLoader.HASHMAP_STATE_BACKEND_NAME),
-      (FlinkVersion.v1_15, StateBackendLoader.HASHMAP_STATE_BACKEND_NAME),
-    )
+  @Parameterized.Parameters(name = "Test snapshot: {0}")
+  def parameters: util.Collection[SnapshotSpec] = {
+    // Note: It is not safe to restore savepoints created in a Scala applications with Flink
+    // version 1.7 or below. The reason is that up to version 1.7 the underlying Scala serializer
+    // used names of anonymous classes that depend on the relative position/order in code, e.g.,
+    // if two anonymous classes, instantiated inside the same class and from the same base class,
+    // change order in the code their names are switched.
+    // As a consequence, changes in code may result in restore failures.
+    // This was fixed in version 1.8, see: https://issues.apache.org/jira/browse/FLINK-10493
+    var parameters: util.List[SnapshotSpec] = new util.LinkedList[SnapshotSpec]()
+    parameters.addAll(
+      SnapshotSpec.withVersions(
+        StateBackendLoader.MEMORY_STATE_BACKEND_NAME,
+        SnapshotType.SAVEPOINT_CANONICAL,
+        FlinkVersion.rangeOf(FlinkVersion.v1_8, FlinkVersion.v1_13)))
+    parameters.addAll(
+      SnapshotSpec.withVersions(
+        StateBackendLoader.HASHMAP_STATE_BACKEND_NAME,
+        SnapshotType.SAVEPOINT_CANONICAL,
+        FlinkVersion.rangeOf(FlinkVersion.v1_14, currentVersion)))
+    parameters.addAll(
+      SnapshotSpec.withVersions(
+        StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME,
+        SnapshotType.SAVEPOINT_CANONICAL,
+        FlinkVersion.rangeOf(FlinkVersion.v1_15, currentVersion)))
+    parameters.addAll(
+      SnapshotSpec.withVersions(
+        StateBackendLoader.HASHMAP_STATE_BACKEND_NAME,
+        SnapshotType.SAVEPOINT_NATIVE,
+        FlinkVersion.rangeOf(FlinkVersion.v1_15, currentVersion)))
+    parameters.addAll(
+      SnapshotSpec.withVersions(
+        StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME,
+        SnapshotType.SAVEPOINT_NATIVE,
+        FlinkVersion.rangeOf(FlinkVersion.v1_15, currentVersion)))
+    parameters.addAll(
+      SnapshotSpec.withVersions(
+        StateBackendLoader.HASHMAP_STATE_BACKEND_NAME,
+        SnapshotType.CHECKPOINT,
+        FlinkVersion.rangeOf(FlinkVersion.v1_15, currentVersion)))
+    parameters.addAll(
+      SnapshotSpec.withVersions(
+        StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME,
+        SnapshotType.CHECKPOINT,
+        FlinkVersion.rangeOf(FlinkVersion.v1_15, currentVersion)))
     if (executionMode == ExecutionMode.CREATE_SNAPSHOT) {
-      parameters = parameters.stream().filter(x => x._1 == currentVersion)
+      parameters = parameters.stream()
+        .filter(x => x.getFlinkVersion().equals(currentVersion))
         .collect(Collectors.toList())
     }
     parameters
   }
 
-  def getSnapshotPath(migrationVersionAndBackend: (FlinkVersion, String)): String = {
-    s"stateful-scala-with-broadcast-udf-migration-itcase" +
-      s"-flink${migrationVersionAndBackend._1}" +
-      s"-${migrationVersionAndBackend._2}-savepoint"
+  def getSnapshotPath(snapshotSpec: SnapshotSpec): String = {
+    val path = new StringBuilder(s"stateful-scala-with-broadcast-udf-migration-itcase")
+    path ++= s"-flink${snapshotSpec.getFlinkVersion()}"
+    path ++= s"-${snapshotSpec.getStateBackendType()}"
+    snapshotSpec.getSnapshotType() match {
+      case SnapshotType.SAVEPOINT_CANONICAL =>
+        path ++= "-savepoint"
+      case SnapshotType.SAVEPOINT_NATIVE =>
+        path ++= "-savepoint-native"
+      case SnapshotType.CHECKPOINT =>
+        path ++= "-checkpoint"
+      case _ => throw new UnsupportedOperationException
+    }
+    path.toString()
   }
 
   val NUM_ELEMENTS = 4
@@ -103,7 +137,7 @@ object StatefulJobWBroadcastStateMigrationITCase {
  * ITCase for migration Scala state types across different Flink versions.
  */
 @RunWith(classOf[Parameterized])
-class StatefulJobWBroadcastStateMigrationITCase(migrationVersionAndBackend: (FlinkVersion, String))
+class StatefulJobWBroadcastStateMigrationITCase(snapshotSpec: SnapshotSpec)
   extends SnapshotMigrationTestBase with Serializable {
 
   @Test
@@ -112,7 +146,7 @@ class StatefulJobWBroadcastStateMigrationITCase(migrationVersionAndBackend: (Fli
     val env = StreamExecutionEnvironment.getExecutionEnvironment
     env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
 
-    migrationVersionAndBackend._2 match {
+    snapshotSpec.getStateBackendType match {
       case StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME =>
         env.setStateBackend(new EmbeddedRocksDBStateBackend())
       case StateBackendLoader.MEMORY_STATE_BACKEND_NAME =>
@@ -163,10 +197,11 @@ class StatefulJobWBroadcastStateMigrationITCase(migrationVersionAndBackend: (Fli
         .process(new TestBroadcastProcessFunction)
         .addSink(new AccumulatorCountingSink)
 
-      executeAndSavepoint(
+      executeAndSnapshot(
         env,
         s"src/test/resources/"
-          + StatefulJobWBroadcastStateMigrationITCase.getSnapshotPath(migrationVersionAndBackend),
+          + StatefulJobWBroadcastStateMigrationITCase.getSnapshotPath(snapshotSpec),
+        snapshotSpec.getSnapshotType(),
         new Tuple2(
           AccumulatorCountingSink.NUM_ELEMENTS_ACCUMULATOR,
           StatefulJobWBroadcastStateMigrationITCase.NUM_ELEMENTS
@@ -187,7 +222,7 @@ class StatefulJobWBroadcastStateMigrationITCase(migrationVersionAndBackend: (Fli
       restoreAndExecute(
         env,
         SnapshotMigrationTestBase.getResourceFilename(
-          StatefulJobWBroadcastStateMigrationITCase.getSnapshotPath(migrationVersionAndBackend)),
+          StatefulJobWBroadcastStateMigrationITCase.getSnapshotPath(snapshotSpec)),
         new Tuple2(
           AccumulatorCountingSink.NUM_ELEMENTS_ACCUMULATOR,
           StatefulJobWBroadcastStateMigrationITCase.NUM_ELEMENTS)

[flink] 01/03: [FLINK-26146] Add tests of flink version upgrades to cover native snapshots

Posted by dw...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 0b9f8956a1452a8b1259257af926eac7ad9e3bec
Author: Sebastian Mattheis <se...@ververica.com>
AuthorDate: Fri Feb 18 10:17:49 2022 +0100

    [FLINK-26146] Add tests of flink version upgrades to cover native snapshots
    
    Turned SavepointMigrationTestBase into SnapshotMigrationTestBase that supports testing canconical
    savepoints, native savepoints, and checkpoints; and adapted sub classes accordingly.
---
 .../LegacyStatefulJobSavepointMigrationITCase.java |   8 +-
 ...ava => StatefulJobSnapshotMigrationITCase.java} | 145 ++++++++-------
 .../StatefulJobWBroadcastStateMigrationITCase.java | 139 +++++++-------
 .../checkpointing/utils/MigrationTestUtils.java    |   4 +-
 ...estBase.java => SnapshotMigrationTestBase.java} | 200 +++++++++++++++++++--
 .../TypeSerializerSnapshotMigrationITCase.java     | 155 ++++++++--------
 pom.xml                                            |   4 +-
 7 files changed, 430 insertions(+), 225 deletions(-)

diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/LegacyStatefulJobSavepointMigrationITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/LegacyStatefulJobSavepointMigrationITCase.java
similarity index 99%
rename from flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/LegacyStatefulJobSavepointMigrationITCase.java
rename to flink-tests/src/test/java/org/apache/flink/test/checkpointing/LegacyStatefulJobSavepointMigrationITCase.java
index e59be25..fe16a25 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/LegacyStatefulJobSavepointMigrationITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/LegacyStatefulJobSavepointMigrationITCase.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.test.checkpointing.utils;
+package org.apache.flink.test.checkpointing;
 
 import org.apache.flink.FlinkVersion;
 import org.apache.flink.api.common.accumulators.IntCounter;
@@ -45,6 +45,7 @@ import org.apache.flink.streaming.api.operators.TimestampedCollector;
 import org.apache.flink.streaming.api.operators.Triggerable;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.test.checkpointing.utils.SnapshotMigrationTestBase;
 import org.apache.flink.util.Collector;
 
 import org.junit.Ignore;
@@ -62,7 +63,7 @@ import static org.junit.Assert.assertEquals;
  * previous Flink versions, as well as for different state backends.
  */
 @RunWith(Parameterized.class)
-public class LegacyStatefulJobSavepointMigrationITCase extends SavepointMigrationTestBase {
+public class LegacyStatefulJobSavepointMigrationITCase extends SnapshotMigrationTestBase {
 
     private static final int NUM_SOURCE_ELEMENTS = 4;
 
@@ -143,11 +144,12 @@ public class LegacyStatefulJobSavepointMigrationITCase extends SavepointMigratio
                 .uid("TimelyStatefulOperator")
                 .addSink(new AccumulatorCountingSink<Tuple2<Long, Long>>());
 
-        executeAndSavepoint(
+        executeAndSnapshot(
                 env,
                 "src/test/resources/"
                         + getSavepointPath(
                                 flinkGenerateSavepointVersion, flinkGenerateSavepointBackendType),
+                SnapshotType.SAVEPOINT_CANONICAL,
                 new Tuple2<>(
                         AccumulatorCountingSink.NUM_ELEMENTS_ACCUMULATOR, NUM_SOURCE_ELEMENTS));
     }
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobSavepointMigrationITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StatefulJobSnapshotMigrationITCase.java
similarity index 76%
rename from flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobSavepointMigrationITCase.java
rename to flink-tests/src/test/java/org/apache/flink/test/checkpointing/StatefulJobSnapshotMigrationITCase.java
index 260c1f2..df2a20b 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobSavepointMigrationITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StatefulJobSnapshotMigrationITCase.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.test.checkpointing.utils;
+package org.apache.flink.test.checkpointing;
 
 import org.apache.flink.FlinkVersion;
 import org.apache.flink.api.common.accumulators.IntCounter;
@@ -28,8 +28,9 @@ import org.apache.flink.api.common.typeinfo.TypeHint;
 import org.apache.flink.api.common.typeutils.base.LongSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
+import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;
 import org.apache.flink.runtime.state.StateBackendLoader;
+import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
 import org.apache.flink.runtime.state.memory.MemoryStateBackend;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
@@ -40,14 +41,17 @@ import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.operators.Triggerable;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.test.checkpointing.utils.MigrationTestUtils;
+import org.apache.flink.test.checkpointing.utils.SnapshotMigrationTestBase;
 import org.apache.flink.util.Collector;
 
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
-import java.util.Arrays;
 import java.util.Collection;
+import java.util.LinkedList;
+import java.util.stream.Collectors;
 
 import static org.junit.Assert.assertEquals;
 
@@ -56,59 +60,69 @@ import static org.junit.Assert.assertEquals;
  * previous Flink versions, as well as for different state backends.
  */
 @RunWith(Parameterized.class)
-public class StatefulJobSavepointMigrationITCase extends SavepointMigrationTestBase {
+public class StatefulJobSnapshotMigrationITCase extends SnapshotMigrationTestBase {
 
     private static final int NUM_SOURCE_ELEMENTS = 4;
 
-    /**
-     * This test runs in either of two modes: 1) we want to generate the binary savepoint, i.e. we
-     * have to run the checkpointing functions 2) we want to verify restoring, so we have to run the
-     * checking functions.
-     */
-    public enum ExecutionMode {
-        PERFORM_SAVEPOINT,
-        VERIFY_SAVEPOINT
-    }
+    // TODO increase this to newer version to create and test snapshot migration for newer versions
+    private static final FlinkVersion currentVersion = FlinkVersion.v1_14;
 
-    // TODO change this to PERFORM_SAVEPOINT to regenerate binary savepoints
-    // TODO Note: You should generate the savepoint based on the release branch instead of the
+    // TODO change this to CREATE_SNAPSHOT to (re)create binary snapshots
+    // TODO Note: You should generate the snapshot based on the release branch instead of the
     // master.
-    private final ExecutionMode executionMode = ExecutionMode.VERIFY_SAVEPOINT;
-
-    @Parameterized.Parameters(name = "Migrate Savepoint / Backend: {0}")
-    public static Collection<Tuple2<FlinkVersion, String>> parameters() {
-        return Arrays.asList(
-                Tuple2.of(FlinkVersion.v1_4, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
-                Tuple2.of(FlinkVersion.v1_4, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME),
-                Tuple2.of(FlinkVersion.v1_5, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
-                Tuple2.of(FlinkVersion.v1_5, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME),
-                Tuple2.of(FlinkVersion.v1_6, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
-                Tuple2.of(FlinkVersion.v1_6, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME),
-                Tuple2.of(FlinkVersion.v1_7, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
-                Tuple2.of(FlinkVersion.v1_7, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME),
-                Tuple2.of(FlinkVersion.v1_8, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
-                Tuple2.of(FlinkVersion.v1_8, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME),
-                Tuple2.of(FlinkVersion.v1_9, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
-                Tuple2.of(FlinkVersion.v1_9, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME),
-                Tuple2.of(FlinkVersion.v1_10, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
-                Tuple2.of(FlinkVersion.v1_10, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME),
-                Tuple2.of(FlinkVersion.v1_11, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
-                Tuple2.of(FlinkVersion.v1_11, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME),
-                Tuple2.of(FlinkVersion.v1_12, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
-                Tuple2.of(FlinkVersion.v1_12, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME),
-                Tuple2.of(FlinkVersion.v1_13, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
-                Tuple2.of(FlinkVersion.v1_13, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME),
-                Tuple2.of(FlinkVersion.v1_14, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
-                Tuple2.of(FlinkVersion.v1_14, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME));
+    private static final ExecutionMode executionMode = ExecutionMode.VERIFY_SNAPSHOT;
+
+    @Parameterized.Parameters(name = "Test snapshot: {0}")
+    public static Collection<SnapshotSpec> parameters() {
+        Collection<SnapshotSpec> parameters = new LinkedList<>();
+        parameters.addAll(
+                SnapshotSpec.withVersions(
+                        StateBackendLoader.MEMORY_STATE_BACKEND_NAME,
+                        SnapshotType.SAVEPOINT_CANONICAL,
+                        FlinkVersion.rangeOf(FlinkVersion.v1_4, FlinkVersion.v1_14)));
+        parameters.addAll(
+                SnapshotSpec.withVersions(
+                        StateBackendLoader.HASHMAP_STATE_BACKEND_NAME,
+                        SnapshotType.SAVEPOINT_CANONICAL,
+                        FlinkVersion.rangeOf(FlinkVersion.v1_15, currentVersion)));
+        parameters.addAll(
+                SnapshotSpec.withVersions(
+                        StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME,
+                        SnapshotType.SAVEPOINT_CANONICAL,
+                        FlinkVersion.rangeOf(FlinkVersion.v1_4, currentVersion)));
+        parameters.addAll(
+                SnapshotSpec.withVersions(
+                        StateBackendLoader.HASHMAP_STATE_BACKEND_NAME,
+                        SnapshotType.SAVEPOINT_NATIVE,
+                        FlinkVersion.rangeOf(FlinkVersion.v1_15, currentVersion)));
+        parameters.addAll(
+                SnapshotSpec.withVersions(
+                        StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME,
+                        SnapshotType.SAVEPOINT_NATIVE,
+                        FlinkVersion.rangeOf(FlinkVersion.v1_15, currentVersion)));
+        parameters.addAll(
+                SnapshotSpec.withVersions(
+                        StateBackendLoader.HASHMAP_STATE_BACKEND_NAME,
+                        SnapshotType.CHECKPOINT,
+                        FlinkVersion.rangeOf(FlinkVersion.v1_15, currentVersion)));
+        parameters.addAll(
+                SnapshotSpec.withVersions(
+                        StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME,
+                        SnapshotType.CHECKPOINT,
+                        FlinkVersion.rangeOf(FlinkVersion.v1_15, currentVersion)));
+        if (executionMode == ExecutionMode.CREATE_SNAPSHOT) {
+            parameters =
+                    parameters.stream()
+                            .filter(x -> x.getFlinkVersion().equals(currentVersion))
+                            .collect(Collectors.toList());
+        }
+        return parameters;
     }
 
-    private final FlinkVersion testMigrateVersion;
-    private final String testStateBackend;
+    private final SnapshotSpec snapshotSpec;
 
-    public StatefulJobSavepointMigrationITCase(
-            Tuple2<FlinkVersion, String> testMigrateVersionAndBackend) throws Exception {
-        this.testMigrateVersion = testMigrateVersionAndBackend.f0;
-        this.testStateBackend = testMigrateVersionAndBackend.f1;
+    public StatefulJobSnapshotMigrationITCase(SnapshotSpec snapshotSpec) throws Exception {
+        this.snapshotSpec = snapshotSpec;
     }
 
     @Test
@@ -119,13 +133,16 @@ public class StatefulJobSavepointMigrationITCase extends SavepointMigrationTestB
         final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
         env.setRestartStrategy(RestartStrategies.noRestart());
 
-        switch (testStateBackend) {
+        switch (snapshotSpec.getStateBackendType()) {
             case StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME:
-                env.setStateBackend(new RocksDBStateBackend(new MemoryStateBackend()));
+                env.setStateBackend(new EmbeddedRocksDBStateBackend());
                 break;
             case StateBackendLoader.MEMORY_STATE_BACKEND_NAME:
                 env.setStateBackend(new MemoryStateBackend());
                 break;
+            case StateBackendLoader.HASHMAP_STATE_BACKEND_NAME:
+                env.setStateBackend(new HashMapStateBackend());
+                break;
             default:
                 throw new UnsupportedOperationException();
         }
@@ -140,7 +157,7 @@ public class StatefulJobSavepointMigrationITCase extends SavepointMigrationTestB
         RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> flatMap;
         OneInputStreamOperator<Tuple2<Long, Long>, Tuple2<Long, Long>> timelyOperator;
 
-        if (executionMode == ExecutionMode.PERFORM_SAVEPOINT) {
+        if (executionMode == ExecutionMode.CREATE_SNAPSHOT) {
             nonParallelSource =
                     new MigrationTestUtils.CheckpointingNonParallelSourceWithListState(
                             NUM_SOURCE_ELEMENTS);
@@ -149,7 +166,7 @@ public class StatefulJobSavepointMigrationITCase extends SavepointMigrationTestB
                             NUM_SOURCE_ELEMENTS);
             flatMap = new CheckpointingKeyedStateFlatMap();
             timelyOperator = new CheckpointingTimelyStatefulOperator();
-        } else if (executionMode == ExecutionMode.VERIFY_SAVEPOINT) {
+        } else if (executionMode == ExecutionMode.VERIFY_SNAPSHOT) {
             nonParallelSource =
                     new MigrationTestUtils.CheckingNonParallelSourceWithListState(
                             NUM_SOURCE_ELEMENTS);
@@ -190,17 +207,20 @@ public class StatefulJobSavepointMigrationITCase extends SavepointMigrationTestB
                 .uid("CheckpointingTimelyStatefulOperator2")
                 .addSink(new MigrationTestUtils.AccumulatorCountingSink<>());
 
-        if (executionMode == ExecutionMode.PERFORM_SAVEPOINT) {
-            executeAndSavepoint(
+        final String snapshotPath = getSnapshotPath(snapshotSpec);
+
+        if (executionMode == ExecutionMode.CREATE_SNAPSHOT) {
+            executeAndSnapshot(
                     env,
-                    "src/test/resources/" + getSavepointPath(testMigrateVersion, testStateBackend),
+                    "src/test/resources/" + snapshotPath,
+                    snapshotSpec.getSnapshotType(),
                     new Tuple2<>(
                             MigrationTestUtils.AccumulatorCountingSink.NUM_ELEMENTS_ACCUMULATOR,
                             NUM_SOURCE_ELEMENTS * 2));
         } else {
             restoreAndExecute(
                     env,
-                    getResourceFilename(getSavepointPath(testMigrateVersion, testStateBackend)),
+                    getResourceFilename(snapshotPath),
                     new Tuple2<>(
                             MigrationTestUtils.CheckingNonParallelSourceWithListState
                                     .SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR,
@@ -228,17 +248,8 @@ public class StatefulJobSavepointMigrationITCase extends SavepointMigrationTestB
         }
     }
 
-    private String getSavepointPath(FlinkVersion savepointVersion, String backendType) {
-        switch (backendType) {
-            case StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME:
-                return "new-stateful-udf-migration-itcase-flink"
-                        + savepointVersion
-                        + "-rocksdb-savepoint";
-            case StateBackendLoader.MEMORY_STATE_BACKEND_NAME:
-                return "new-stateful-udf-migration-itcase-flink" + savepointVersion + "-savepoint";
-            default:
-                throw new UnsupportedOperationException();
-        }
+    private static String getSnapshotPath(SnapshotSpec snapshotSpec) {
+        return "new-stateful-udf-migration-itcase-" + snapshotSpec;
     }
 
     private static class CheckpointingKeyedStateFlatMap
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobWBroadcastStateMigrationITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StatefulJobWBroadcastStateMigrationITCase.java
similarity index 80%
rename from flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobWBroadcastStateMigrationITCase.java
rename to flink-tests/src/test/java/org/apache/flink/test/checkpointing/StatefulJobWBroadcastStateMigrationITCase.java
index 4470b85..459ce1b 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobWBroadcastStateMigrationITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StatefulJobWBroadcastStateMigrationITCase.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.test.checkpointing.utils;
+package org.apache.flink.test.checkpointing;
 
 import org.apache.flink.FlinkVersion;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
@@ -25,14 +25,17 @@ import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
+import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;
 import org.apache.flink.runtime.state.StateBackendLoader;
+import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
 import org.apache.flink.runtime.state.memory.MemoryStateBackend;
 import org.apache.flink.streaming.api.datastream.BroadcastStream;
 import org.apache.flink.streaming.api.datastream.KeyedStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.test.checkpointing.utils.MigrationTestUtils;
+import org.apache.flink.test.checkpointing.utils.SnapshotMigrationTestBase;
 import org.apache.flink.util.Collector;
 
 import org.junit.Assert;
@@ -40,10 +43,11 @@ import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
-import java.util.Arrays;
 import java.util.Collection;
 import java.util.HashMap;
+import java.util.LinkedList;
 import java.util.Map;
+import java.util.stream.Collectors;
 
 /**
  * Migration ITCases for a stateful job with broadcast state. The tests are parameterized to
@@ -51,48 +55,69 @@ import java.util.Map;
  * state backends.
  */
 @RunWith(Parameterized.class)
-public class StatefulJobWBroadcastStateMigrationITCase extends SavepointMigrationTestBase {
+public class StatefulJobWBroadcastStateMigrationITCase extends SnapshotMigrationTestBase {
 
     private static final int NUM_SOURCE_ELEMENTS = 4;
 
-    // TODO change this to PERFORM_SAVEPOINT to regenerate binary savepoints
-    // TODO Note: You should generate the savepoint based on the release branch instead of the
+    // TODO increase this to newer version to create and test snapshot migration for newer versions
+    private static final FlinkVersion currentVersion = FlinkVersion.v1_14;
+
+    // TODO change this to CREATE_SNAPSHOT to (re)create binary snapshots
+    // TODO Note: You should generate the snapshot based on the release branch instead of the
     // master.
-    private final StatefulJobSavepointMigrationITCase.ExecutionMode executionMode =
-            StatefulJobSavepointMigrationITCase.ExecutionMode.VERIFY_SAVEPOINT;
-
-    @Parameterized.Parameters(name = "Migrate Savepoint / Backend: {0}")
-    public static Collection<Tuple2<FlinkVersion, String>> parameters() {
-        return Arrays.asList(
-                Tuple2.of(FlinkVersion.v1_5, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
-                Tuple2.of(FlinkVersion.v1_5, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME),
-                Tuple2.of(FlinkVersion.v1_6, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
-                Tuple2.of(FlinkVersion.v1_6, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME),
-                Tuple2.of(FlinkVersion.v1_7, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
-                Tuple2.of(FlinkVersion.v1_7, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME),
-                Tuple2.of(FlinkVersion.v1_8, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
-                Tuple2.of(FlinkVersion.v1_8, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME),
-                Tuple2.of(FlinkVersion.v1_9, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
-                Tuple2.of(FlinkVersion.v1_9, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME),
-                Tuple2.of(FlinkVersion.v1_10, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
-                Tuple2.of(FlinkVersion.v1_10, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME),
-                Tuple2.of(FlinkVersion.v1_11, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
-                Tuple2.of(FlinkVersion.v1_11, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME),
-                Tuple2.of(FlinkVersion.v1_12, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
-                Tuple2.of(FlinkVersion.v1_12, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME),
-                Tuple2.of(FlinkVersion.v1_13, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
-                Tuple2.of(FlinkVersion.v1_13, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME),
-                Tuple2.of(FlinkVersion.v1_14, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
-                Tuple2.of(FlinkVersion.v1_14, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME));
+    private static final ExecutionMode executionMode = ExecutionMode.VERIFY_SNAPSHOT;
+
+    @Parameterized.Parameters(name = "Test snapshot: {0}")
+    public static Collection<SnapshotSpec> parameters() {
+        Collection<SnapshotSpec> parameters = new LinkedList<>();
+        parameters.addAll(
+                SnapshotSpec.withVersions(
+                        StateBackendLoader.MEMORY_STATE_BACKEND_NAME,
+                        SnapshotType.SAVEPOINT_CANONICAL,
+                        FlinkVersion.rangeOf(FlinkVersion.v1_5, FlinkVersion.v1_14)));
+        parameters.addAll(
+                SnapshotSpec.withVersions(
+                        StateBackendLoader.HASHMAP_STATE_BACKEND_NAME,
+                        SnapshotType.SAVEPOINT_CANONICAL,
+                        FlinkVersion.rangeOf(FlinkVersion.v1_15, currentVersion)));
+        parameters.addAll(
+                SnapshotSpec.withVersions(
+                        StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME,
+                        SnapshotType.SAVEPOINT_CANONICAL,
+                        FlinkVersion.rangeOf(FlinkVersion.v1_5, currentVersion)));
+        parameters.addAll(
+                SnapshotSpec.withVersions(
+                        StateBackendLoader.HASHMAP_STATE_BACKEND_NAME,
+                        SnapshotType.SAVEPOINT_NATIVE,
+                        FlinkVersion.rangeOf(FlinkVersion.v1_15, currentVersion)));
+        parameters.addAll(
+                SnapshotSpec.withVersions(
+                        StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME,
+                        SnapshotType.SAVEPOINT_NATIVE,
+                        FlinkVersion.rangeOf(FlinkVersion.v1_15, currentVersion)));
+        parameters.addAll(
+                SnapshotSpec.withVersions(
+                        StateBackendLoader.HASHMAP_STATE_BACKEND_NAME,
+                        SnapshotType.CHECKPOINT,
+                        FlinkVersion.rangeOf(FlinkVersion.v1_15, currentVersion)));
+        parameters.addAll(
+                SnapshotSpec.withVersions(
+                        StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME,
+                        SnapshotType.CHECKPOINT,
+                        FlinkVersion.rangeOf(FlinkVersion.v1_15, currentVersion)));
+        if (executionMode == ExecutionMode.CREATE_SNAPSHOT) {
+            parameters =
+                    parameters.stream()
+                            .filter(x -> x.getFlinkVersion().equals(currentVersion))
+                            .collect(Collectors.toList());
+        }
+        return parameters;
     }
 
-    private final FlinkVersion testMigrateVersion;
-    private final String testStateBackend;
+    private final SnapshotSpec snapshotSpec;
 
-    public StatefulJobWBroadcastStateMigrationITCase(
-            Tuple2<FlinkVersion, String> testMigrateVersionAndBackend) throws Exception {
-        this.testMigrateVersion = testMigrateVersionAndBackend.f0;
-        this.testStateBackend = testMigrateVersionAndBackend.f1;
+    public StatefulJobWBroadcastStateMigrationITCase(SnapshotSpec snapshotSpec) throws Exception {
+        this.snapshotSpec = snapshotSpec;
     }
 
     @Test
@@ -103,13 +128,16 @@ public class StatefulJobWBroadcastStateMigrationITCase extends SavepointMigratio
         final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
         env.setRestartStrategy(RestartStrategies.noRestart());
 
-        switch (testStateBackend) {
+        switch (snapshotSpec.getStateBackendType()) {
             case StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME:
-                env.setStateBackend(new RocksDBStateBackend(new MemoryStateBackend()));
+                env.setStateBackend(new EmbeddedRocksDBStateBackend());
                 break;
             case StateBackendLoader.MEMORY_STATE_BACKEND_NAME:
                 env.setStateBackend(new MemoryStateBackend());
                 break;
+            case StateBackendLoader.HASHMAP_STATE_BACKEND_NAME:
+                env.setStateBackend(new HashMapStateBackend());
+                break;
             default:
                 throw new UnsupportedOperationException();
         }
@@ -148,7 +176,7 @@ public class StatefulJobWBroadcastStateMigrationITCase extends SavepointMigratio
         expectedThirdState.put(2L, "2");
         expectedThirdState.put(3L, "3");
 
-        if (executionMode == StatefulJobSavepointMigrationITCase.ExecutionMode.PERFORM_SAVEPOINT) {
+        if (executionMode == ExecutionMode.CREATE_SNAPSHOT) {
             nonParallelSource =
                     new MigrationTestUtils.CheckpointingNonParallelSourceWithListState(
                             NUM_SOURCE_ELEMENTS);
@@ -163,8 +191,7 @@ public class StatefulJobWBroadcastStateMigrationITCase extends SavepointMigratio
                             NUM_SOURCE_ELEMENTS);
             firstBroadcastFunction = new CheckpointingKeyedBroadcastFunction();
             secondBroadcastFunction = new CheckpointingKeyedSingleBroadcastFunction();
-        } else if (executionMode
-                == StatefulJobSavepointMigrationITCase.ExecutionMode.VERIFY_SAVEPOINT) {
+        } else if (executionMode == ExecutionMode.VERIFY_SNAPSHOT) {
             nonParallelSource =
                     new MigrationTestUtils.CheckingNonParallelSourceWithListState(
                             NUM_SOURCE_ELEMENTS);
@@ -252,19 +279,18 @@ public class StatefulJobWBroadcastStateMigrationITCase extends SavepointMigratio
                 .uid("BrProcess2")
                 .addSink(new MigrationTestUtils.AccumulatorCountingSink<>());
 
-        if (executionMode == StatefulJobSavepointMigrationITCase.ExecutionMode.PERFORM_SAVEPOINT) {
-            executeAndSavepoint(
+        if (executionMode == ExecutionMode.CREATE_SNAPSHOT) {
+            executeAndSnapshot(
                     env,
-                    "src/test/resources/"
-                            + getBroadcastSavepointPath(testMigrateVersion, testStateBackend),
+                    "src/test/resources/" + getSnapshotPath(snapshotSpec),
+                    snapshotSpec.getSnapshotType(),
                     new Tuple2<>(
                             MigrationTestUtils.AccumulatorCountingSink.NUM_ELEMENTS_ACCUMULATOR,
                             2 * NUM_SOURCE_ELEMENTS));
         } else {
             restoreAndExecute(
                     env,
-                    getResourceFilename(
-                            getBroadcastSavepointPath(testMigrateVersion, testStateBackend)),
+                    getResourceFilename(getSnapshotPath(snapshotSpec)),
                     new Tuple2<>(
                             MigrationTestUtils.CheckingNonParallelSourceWithListState
                                     .SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR,
@@ -279,19 +305,8 @@ public class StatefulJobWBroadcastStateMigrationITCase extends SavepointMigratio
         }
     }
 
-    private String getBroadcastSavepointPath(FlinkVersion savepointVersion, String backendType) {
-        switch (backendType) {
-            case StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME:
-                return "new-stateful-broadcast-udf-migration-itcase-flink"
-                        + savepointVersion
-                        + "-rocksdb-savepoint";
-            case StateBackendLoader.MEMORY_STATE_BACKEND_NAME:
-                return "new-stateful-broadcast-udf-migration-itcase-flink"
-                        + savepointVersion
-                        + "-savepoint";
-            default:
-                throw new UnsupportedOperationException();
-        }
+    private String getSnapshotPath(SnapshotSpec snapshotSpec) {
+        return "new-stateful-broadcast-udf-migration-itcase-" + snapshotSpec;
     }
 
     /**
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/MigrationTestUtils.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/MigrationTestUtils.java
index 85ff84c..9fd021e 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/MigrationTestUtils.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/MigrationTestUtils.java
@@ -110,7 +110,7 @@ public class MigrationTestUtils {
 
         private static final long serialVersionUID = 1L;
 
-        static final String SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR =
+        public static final String SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR =
                 CheckingNonParallelSourceWithListState.class + "_RESTORE_CHECK";
 
         private volatile boolean isRunning = true;
@@ -251,7 +251,7 @@ public class MigrationTestUtils {
 
         private static final long serialVersionUID = 1L;
 
-        static final String SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR =
+        public static final String SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR =
                 CheckingParallelSourceWithUnionListState.class + "_RESTORE_CHECK";
 
         private volatile boolean isRunning = true;
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SnapshotMigrationTestBase.java
similarity index 53%
rename from flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java
rename to flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SnapshotMigrationTestBase.java
index f17f59c..57a4894 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SnapshotMigrationTestBase.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.test.checkpointing.utils;
 
+import org.apache.flink.FlinkVersion;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.api.common.time.Deadline;
@@ -33,6 +34,7 @@ import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.core.execution.SavepointFormatType;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
+import org.apache.flink.runtime.state.StateBackendLoader;
 import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.test.util.MiniClusterWithClientResource;
@@ -46,9 +48,13 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.File;
+import java.io.Serializable;
 import java.net.URI;
 import java.net.URL;
 import java.time.Duration;
+import java.util.Collection;
+import java.util.LinkedList;
+import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
@@ -58,18 +64,154 @@ import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.fail;
 
 /** Test savepoint migration. */
-public abstract class SavepointMigrationTestBase extends TestBaseUtils {
+
+/**
+ * Base for testing snapshot migration. The base test supports snapshots types as defined in {@link
+ * SnapshotType}.
+ */
+public abstract class SnapshotMigrationTestBase extends TestBaseUtils {
 
     @ClassRule public static final TemporaryFolder TEMP_FOLDER = new TemporaryFolder();
 
     @Rule public final MiniClusterWithClientResource miniClusterResource;
 
-    private static final Logger LOG = LoggerFactory.getLogger(SavepointMigrationTestBase.class);
+    private static final Logger LOG = LoggerFactory.getLogger(SnapshotMigrationTestBase.class);
 
     protected static final int DEFAULT_PARALLELISM = 4;
 
+    /**
+     * Modes for migration test execution. This enum is supposed to serve as a switch between two
+     * modes of test execution: 1) create snapshots and 2) verify snapshots:
+     */
+    public enum ExecutionMode {
+        /** Create binary snapshot(s), i.e. run the checkpointing functions. */
+        CREATE_SNAPSHOT,
+        /** Verify snapshot(s), i.e, restore snapshot and check execution result. */
+        VERIFY_SNAPSHOT
+    }
+
+    /** Types of snapshot supported by this base test. */
+    public enum SnapshotType {
+        /** Savepoints with Flink canonical format. */
+        SAVEPOINT_CANONICAL,
+        /** Savepoint with native format of respective state backend. */
+        SAVEPOINT_NATIVE,
+        /** Checkpoint. */
+        CHECKPOINT
+    }
+
+    /**
+     * A snapshot specification (immutable) for migration tests that consists of {@link
+     * FlinkVersion} that the snapshot has been created with, {@link
+     * SnapshotMigrationTestBase.SnapshotType}, and state backend type that the snapshot has been
+     * ctreated from.
+     */
+    public static class SnapshotSpec implements Serializable {
+        private final FlinkVersion flinkVersion;
+        private final String stateBackendType;
+        private final SnapshotMigrationTestBase.SnapshotType snapshotType;
+
+        /**
+         * Creates a {@link SnapshotSpec} with specified parameters.
+         *
+         * @param flinkVersion Specifies the {@link FlinkVersion}.
+         * @param stateBackendType Specifies the state backend type.
+         * @param snapshotType Specifies the {@link SnapshotMigrationTestBase.SnapshotType}.
+         */
+        public SnapshotSpec(
+                FlinkVersion flinkVersion,
+                String stateBackendType,
+                SnapshotMigrationTestBase.SnapshotType snapshotType) {
+            this.flinkVersion = flinkVersion;
+            this.stateBackendType = stateBackendType;
+            this.snapshotType = snapshotType;
+        }
+
+        /**
+         * Gets the {@link FlinkVersion} that the snapshot has been created with.
+         *
+         * @return {@link FlinkVersion}
+         */
+        public FlinkVersion getFlinkVersion() {
+            return flinkVersion;
+        }
+
+        /**
+         * Gets the state backend type that the snapshot has been created from.
+         *
+         * @return State backend type.
+         */
+        public String getStateBackendType() {
+            return stateBackendType;
+        }
+
+        /**
+         * Gets the {@link SnapshotMigrationTestBase.SnapshotType}.
+         *
+         * @return {@link SnapshotMigrationTestBase.SnapshotType}
+         */
+        public SnapshotMigrationTestBase.SnapshotType getSnapshotType() {
+            return snapshotType;
+        }
+
+        /**
+         * Creates a collection of {@link SnapshotSpec} for a given collection of {@link
+         * FlinkVersion} with the same parameters but different {@link FlinkVersion}.
+         *
+         * @param stateBackendType Specifies the state backend type.
+         * @param snapshotType Specifies the snapshot type.
+         * @param flinkVersions A collection of {@link FlinkVersion}.
+         * @return A collection of {@link SnapshotSpec} that differ only by means of {@link
+         *     FlinkVersion} FlinkVersion}.
+         */
+        public static Collection<SnapshotSpec> withVersions(
+                String stateBackendType,
+                SnapshotMigrationTestBase.SnapshotType snapshotType,
+                Collection<FlinkVersion> flinkVersions) {
+            List<SnapshotSpec> snapshotSpecCollection = new LinkedList<>();
+            for (FlinkVersion version : flinkVersions) {
+                snapshotSpecCollection.add(
+                        new SnapshotSpec(version, stateBackendType, snapshotType));
+            }
+            return snapshotSpecCollection;
+        }
+
+        @Override
+        public String toString() {
+            StringBuilder str = new StringBuilder("flink" + flinkVersion);
+            switch (stateBackendType) {
+                case StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME:
+                    str.append("-rocksdb");
+                    break;
+                case StateBackendLoader.MEMORY_STATE_BACKEND_NAME:
+                    // This is implicit due to backwards compatibility with legacy artifact names.
+                    break;
+                case StateBackendLoader.HASHMAP_STATE_BACKEND_NAME:
+                    str.append("-hashmap");
+                    break;
+                default:
+                    throw new UnsupportedOperationException("State backend type not supported.");
+            }
+            switch (snapshotType) {
+                case SAVEPOINT_CANONICAL:
+                    str.append("-savepoint");
+                    // Canonical implicit due to backwards compatibility with legacy artifact names.
+                    break;
+                case SAVEPOINT_NATIVE:
+                    str.append("-savepoint-native");
+                    break;
+                case CHECKPOINT:
+                    str.append("-checkpoint");
+                    break;
+                default:
+                    throw new UnsupportedOperationException("Snapshot type not supported.");
+            }
+            return str.toString();
+        }
+    }
+
     protected static String getResourceFilename(String filename) {
-        ClassLoader cl = SavepointMigrationTestBase.class.getClassLoader();
+        ClassLoader cl = SnapshotMigrationTestBase.class.getClassLoader();
         URL resource = cl.getResource(filename);
         if (resource == null) {
             throw new NullPointerException("Missing snapshot resource.");
@@ -77,7 +219,7 @@ public abstract class SavepointMigrationTestBase extends TestBaseUtils {
         return resource.getFile();
     }
 
-    protected SavepointMigrationTestBase() throws Exception {
+    protected SnapshotMigrationTestBase() throws Exception {
         miniClusterResource =
                 new MiniClusterWithClientResource(
                         new MiniClusterResourceConfiguration.Builder()
@@ -115,10 +257,22 @@ public abstract class SavepointMigrationTestBase extends TestBaseUtils {
         return config;
     }
 
+    @Deprecated
     @SafeVarargs
     protected final void executeAndSavepoint(
             StreamExecutionEnvironment env,
-            String savepointPath,
+            String snapshotPath,
+            Tuple2<String, Integer>... expectedAccumulators)
+            throws Exception {
+        executeAndSnapshot(
+                env, snapshotPath, SnapshotType.SAVEPOINT_CANONICAL, expectedAccumulators);
+    }
+
+    @SafeVarargs
+    protected final void executeAndSnapshot(
+            StreamExecutionEnvironment env,
+            String snapshotPath,
+            SnapshotType snapshotType,
             Tuple2<String, Integer>... expectedAccumulators)
             throws Exception {
 
@@ -162,27 +316,41 @@ public abstract class SavepointMigrationTestBase extends TestBaseUtils {
             fail("Did not see the expected accumulator results within time limit.");
         }
 
-        LOG.info("Triggering savepoint.");
+        LOG.info("Triggering snapshot.");
 
-        CompletableFuture<String> savepointPathFuture =
-                client.triggerSavepoint(jobID, null, SavepointFormatType.CANONICAL);
+        CompletableFuture<String> snapshotPathFuture;
+        switch (snapshotType) {
+            case SAVEPOINT_CANONICAL:
+                snapshotPathFuture =
+                        client.triggerSavepoint(jobID, null, SavepointFormatType.CANONICAL);
+                break;
+            case SAVEPOINT_NATIVE:
+                snapshotPathFuture =
+                        client.triggerSavepoint(jobID, null, SavepointFormatType.NATIVE);
+                break;
+            case CHECKPOINT:
+                snapshotPathFuture = miniClusterResource.getMiniCluster().triggerCheckpoint(jobID);
+                break;
+            default:
+                throw new UnsupportedOperationException("Snapshot type not supported/implemented.");
+        }
 
-        String jobmanagerSavepointPath =
-                savepointPathFuture.get(deadLine.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
+        String jobmanagerSnapshotPath =
+                snapshotPathFuture.get(deadLine.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
 
-        File jobManagerSavepoint = new File(new URI(jobmanagerSavepointPath).getPath());
+        File jobManagerSnapshot = new File(new URI(jobmanagerSnapshotPath).getPath());
         // savepoints were changed to be directories in Flink 1.3
-        if (jobManagerSavepoint.isDirectory()) {
-            FileUtils.moveDirectory(jobManagerSavepoint, new File(savepointPath));
+        if (jobManagerSnapshot.isDirectory()) {
+            FileUtils.moveDirectory(jobManagerSnapshot, new File(snapshotPath));
         } else {
-            FileUtils.moveFile(jobManagerSavepoint, new File(savepointPath));
+            FileUtils.moveFile(jobManagerSnapshot, new File(snapshotPath));
         }
     }
 
     @SafeVarargs
     protected final void restoreAndExecute(
             StreamExecutionEnvironment env,
-            String savepointPath,
+            String snapshotPath,
             Tuple2<String, Integer>... expectedAccumulators)
             throws Exception {
 
@@ -193,7 +361,7 @@ public abstract class SavepointMigrationTestBase extends TestBaseUtils {
         // Submit the job
         JobGraph jobGraph = env.getStreamGraph().getJobGraph();
 
-        jobGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(savepointPath));
+        jobGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(snapshotPath));
 
         JobID jobID = client.submitJob(jobGraph).get();
 
diff --git a/flink-tests/src/test/java/org/apache/flink/test/migration/TypeSerializerSnapshotMigrationITCase.java b/flink-tests/src/test/java/org/apache/flink/test/migration/TypeSerializerSnapshotMigrationITCase.java
index aa22407..96d607b 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/migration/TypeSerializerSnapshotMigrationITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/migration/TypeSerializerSnapshotMigrationITCase.java
@@ -29,104 +29,117 @@ import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
 import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
 import org.apache.flink.api.common.typeutils.base.LongSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
+import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.runtime.state.StateBackendLoader;
+import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
 import org.apache.flink.runtime.state.memory.MemoryStateBackend;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.test.checkpointing.utils.MigrationTestUtils;
-import org.apache.flink.test.checkpointing.utils.SavepointMigrationTestBase;
+import org.apache.flink.test.checkpointing.utils.SnapshotMigrationTestBase;
 
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
 import java.io.IOException;
-import java.util.Arrays;
 import java.util.Collection;
+import java.util.LinkedList;
+import java.util.stream.Collectors;
 
 /**
  * Migration IT cases for upgrading a legacy {@link TypeSerializerConfigSnapshot} that is written in
  * checkpoints to {@link TypeSerializerSnapshot} interface.
  *
- * <p>The savepoints used by this test were written with a serializer snapshot class that extends
+ * <p>The snapshots used by this test were written with a serializer snapshot class that extends
  * {@link TypeSerializerConfigSnapshot}, as can be seen in the commented out code at the end of this
  * class. On restore, we change the snapshot to implement directly a {@link TypeSerializerSnapshot}.
  */
 @RunWith(Parameterized.class)
-public class TypeSerializerSnapshotMigrationITCase extends SavepointMigrationTestBase {
+public class TypeSerializerSnapshotMigrationITCase extends SnapshotMigrationTestBase {
 
     private static final int NUM_SOURCE_ELEMENTS = 4;
 
-    /**
-     * This test runs in either of two modes: 1) we want to generate the binary savepoint, i.e. we
-     * have to run the checkpointing functions 2) we want to verify restoring, so we have to run the
-     * checking functions.
-     */
-    public enum ExecutionMode {
-        PERFORM_SAVEPOINT,
-        VERIFY_SAVEPOINT
-    }
+    // TODO increase this to newer version to create and test snapshot migration for newer versions
+    private static final FlinkVersion currentVersion = FlinkVersion.v1_14;
 
-    // TODO change this to PERFORM_SAVEPOINT to regenerate binary savepoints
-    // TODO Note: You should generate the savepoint based on the release branch instead of the
+    // TODO change this to CREATE_SNAPSHOT to (re)create binary snapshots
+    // TODO Note: You should generate the snapshot based on the release branch instead of the
     // master.
-    private final ExecutionMode executionMode = ExecutionMode.VERIFY_SAVEPOINT;
-
-    @Parameterized.Parameters(name = "Migrate Savepoint / Backend: {0}")
-    public static Collection<Tuple2<FlinkVersion, String>> parameters() {
-        return Arrays.asList(
-                Tuple2.of(FlinkVersion.v1_3, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
-                Tuple2.of(FlinkVersion.v1_3, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME),
-                Tuple2.of(FlinkVersion.v1_4, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
-                Tuple2.of(FlinkVersion.v1_4, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME),
-                Tuple2.of(FlinkVersion.v1_5, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
-                Tuple2.of(FlinkVersion.v1_5, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME),
-                Tuple2.of(FlinkVersion.v1_6, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
-                Tuple2.of(FlinkVersion.v1_6, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME),
-                Tuple2.of(FlinkVersion.v1_7, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
-                Tuple2.of(FlinkVersion.v1_7, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME),
-                Tuple2.of(FlinkVersion.v1_8, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
-                Tuple2.of(FlinkVersion.v1_8, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME),
-                Tuple2.of(FlinkVersion.v1_9, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
-                Tuple2.of(FlinkVersion.v1_9, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME),
-                Tuple2.of(FlinkVersion.v1_10, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
-                Tuple2.of(FlinkVersion.v1_10, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME),
-                Tuple2.of(FlinkVersion.v1_11, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
-                Tuple2.of(FlinkVersion.v1_11, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME),
-                Tuple2.of(FlinkVersion.v1_12, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
-                Tuple2.of(FlinkVersion.v1_12, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME),
-                Tuple2.of(FlinkVersion.v1_13, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
-                Tuple2.of(FlinkVersion.v1_13, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME),
-                Tuple2.of(FlinkVersion.v1_14, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
-                Tuple2.of(FlinkVersion.v1_14, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME));
+    private static final ExecutionMode executionMode = ExecutionMode.VERIFY_SNAPSHOT;
+
+    @Parameterized.Parameters(name = "Test snapshot: {0}")
+    public static Collection<SnapshotSpec> parameters() {
+        Collection<SnapshotSpec> parameters = new LinkedList<>();
+        parameters.addAll(
+                SnapshotSpec.withVersions(
+                        StateBackendLoader.MEMORY_STATE_BACKEND_NAME,
+                        SnapshotType.SAVEPOINT_CANONICAL,
+                        FlinkVersion.rangeOf(FlinkVersion.v1_3, FlinkVersion.v1_14)));
+        parameters.addAll(
+                SnapshotSpec.withVersions(
+                        StateBackendLoader.HASHMAP_STATE_BACKEND_NAME,
+                        SnapshotType.SAVEPOINT_CANONICAL,
+                        FlinkVersion.rangeOf(FlinkVersion.v1_15, currentVersion)));
+        parameters.addAll(
+                SnapshotSpec.withVersions(
+                        StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME,
+                        SnapshotType.SAVEPOINT_CANONICAL,
+                        FlinkVersion.rangeOf(FlinkVersion.v1_3, currentVersion)));
+        parameters.addAll(
+                SnapshotSpec.withVersions(
+                        StateBackendLoader.HASHMAP_STATE_BACKEND_NAME,
+                        SnapshotType.SAVEPOINT_NATIVE,
+                        FlinkVersion.rangeOf(FlinkVersion.v1_15, currentVersion)));
+        parameters.addAll(
+                SnapshotSpec.withVersions(
+                        StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME,
+                        SnapshotType.SAVEPOINT_NATIVE,
+                        FlinkVersion.rangeOf(FlinkVersion.v1_15, currentVersion)));
+        parameters.addAll(
+                SnapshotSpec.withVersions(
+                        StateBackendLoader.HASHMAP_STATE_BACKEND_NAME,
+                        SnapshotType.CHECKPOINT,
+                        FlinkVersion.rangeOf(FlinkVersion.v1_15, currentVersion)));
+        parameters.addAll(
+                SnapshotSpec.withVersions(
+                        StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME,
+                        SnapshotType.CHECKPOINT,
+                        FlinkVersion.rangeOf(FlinkVersion.v1_15, currentVersion)));
+        if (executionMode == ExecutionMode.CREATE_SNAPSHOT) {
+            parameters =
+                    parameters.stream()
+                            .filter(x -> x.getFlinkVersion().equals(currentVersion))
+                            .collect(Collectors.toList());
+        }
+        return parameters;
     }
 
-    private final FlinkVersion testMigrateVersion;
-    private final String testStateBackend;
+    private final SnapshotSpec snapshotSpec;
 
-    public TypeSerializerSnapshotMigrationITCase(
-            Tuple2<FlinkVersion, String> testMigrateVersionAndBackend) throws Exception {
-        this.testMigrateVersion = testMigrateVersionAndBackend.f0;
-        this.testStateBackend = testMigrateVersionAndBackend.f1;
+    public TypeSerializerSnapshotMigrationITCase(SnapshotSpec snapshotSpec) throws Exception {
+        this.snapshotSpec = snapshotSpec;
     }
 
     @Test
-    public void testSavepoint() throws Exception {
+    public void testSnapshot() throws Exception {
         final int parallelism = 1;
 
         final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
         env.setRestartStrategy(RestartStrategies.noRestart());
 
-        switch (testStateBackend) {
+        switch (snapshotSpec.getStateBackendType()) {
             case StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME:
-                env.setStateBackend(new RocksDBStateBackend(new MemoryStateBackend()));
+                env.setStateBackend(new EmbeddedRocksDBStateBackend());
                 break;
             case StateBackendLoader.MEMORY_STATE_BACKEND_NAME:
                 env.setStateBackend(new MemoryStateBackend());
                 break;
+            case StateBackendLoader.HASHMAP_STATE_BACKEND_NAME:
+                env.setStateBackend(new HashMapStateBackend());
+                break;
             default:
                 throw new UnsupportedOperationException();
         }
@@ -145,36 +158,30 @@ public class TypeSerializerSnapshotMigrationITCase extends SavepointMigrationTes
                 .map(new TestMapFunction())
                 .addSink(new MigrationTestUtils.AccumulatorCountingSink<>());
 
-        if (executionMode == ExecutionMode.PERFORM_SAVEPOINT) {
-            executeAndSavepoint(
+        final String snapshotPath = getSnapshotPath(snapshotSpec);
+
+        if (executionMode == ExecutionMode.CREATE_SNAPSHOT) {
+            executeAndSnapshot(
                     env,
-                    "src/test/resources/" + getSavepointPath(testMigrateVersion, testStateBackend),
-                    new Tuple2<>(
+                    "src/test/resources/" + snapshotPath,
+                    snapshotSpec.getSnapshotType(),
+                    Tuple2.of(
                             MigrationTestUtils.AccumulatorCountingSink.NUM_ELEMENTS_ACCUMULATOR,
                             NUM_SOURCE_ELEMENTS));
-        } else {
+        } else if (executionMode == ExecutionMode.VERIFY_SNAPSHOT) {
             restoreAndExecute(
                     env,
-                    getResourceFilename(getSavepointPath(testMigrateVersion, testStateBackend)),
-                    new Tuple2<>(
+                    getResourceFilename(snapshotPath),
+                    Tuple2.of(
                             MigrationTestUtils.AccumulatorCountingSink.NUM_ELEMENTS_ACCUMULATOR,
                             NUM_SOURCE_ELEMENTS));
+        } else {
+            throw new IllegalStateException("Unknown ExecutionMode " + executionMode);
         }
     }
 
-    private String getSavepointPath(FlinkVersion savepointVersion, String backendType) {
-        switch (backendType) {
-            case "rocksdb":
-                return "type-serializer-snapshot-migration-itcase-flink"
-                        + savepointVersion
-                        + "-rocksdb-savepoint";
-            case "jobmanager":
-                return "type-serializer-snapshot-migration-itcase-flink"
-                        + savepointVersion
-                        + "-savepoint";
-            default:
-                throw new UnsupportedOperationException();
-        }
+    private String getSnapshotPath(SnapshotSpec snapshotSpec) {
+        return "type-serializer-snapshot-migration-itcase-" + snapshotSpec;
     }
 
     private static class TestMapFunction
diff --git a/pom.xml b/pom.xml
index 14de924..d471a8e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1494,7 +1494,9 @@ under the License.
 						<exclude>**/src/test/resources/**/test-data</exclude>
 						<exclude>**/src/test/resources/*-snapshot</exclude>
 						<exclude>**/src/test/resources/*.snapshot</exclude>
-						<exclude>**/src/test/resources/*-savepoint</exclude>
+						<exclude>**/src/test/resources/*-savepoint/**</exclude>
+						<exclude>**/src/test/resources/*-savepoint-native/**</exclude>
+						<exclude>**/src/test/resources/*-checkpoint/**</exclude>
 						<exclude>flink-core/src/test/resources/serialized-kryo-serializer-1.3</exclude>
 						<exclude>flink-core/src/test/resources/type-without-avro-serialized-using-kryo</exclude>
 						<exclude>flink-formats/flink-avro/src/test/resources/flink-1.4-serializer-java-serialized</exclude>

[flink] 02/03: [FLINK-26176] Fix scala savepoint migration tests

Posted by dw...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 9dd6043f4e474658cd6830a011d74f0147fbcc07
Author: Sebastian Mattheis <se...@ververica.com>
AuthorDate: Fri Feb 18 10:17:59 2022 +0100

    [FLINK-26176] Fix scala savepoint migration tests
    
    - Remove test parameters and artifacts for tests of rocksb state backend since artifacts are no
      rocksdb savepoints but actually memory state savepoints
    - Remove test parameters and artifacts for tests of savepoints with Flink version 1.7 or below
      as those are not safe to restore due to bug FLINK-10493
---
 .../_metadata                                      | Bin 41481 -> 0 bytes
 .../_metadata                                      | Bin 41584 -> 0 bytes
 .../_metadata                                      | Bin 41436 -> 0 bytes
 .../_metadata                                      | Bin 41962 -> 0 bytes
 .../_metadata                                      | Bin 42002 -> 0 bytes
 .../_metadata                                      | Bin 213759 -> 0 bytes
 .../_metadata                                      | Bin 213759 -> 0 bytes
 .../_metadata                                      | Bin 204543 -> 0 bytes
 .../_metadata                                      | Bin 204543 -> 0 bytes
 .../_metadata                                      | Bin 205638 -> 0 bytes
 .../_metadata                                      | Bin 205638 -> 0 bytes
 .../_metadata                                      | Bin 229848 -> 0 bytes
 .../_metadata                                      | Bin 229848 -> 0 bytes
 .../_metadata                                      | Bin 39985 -> 0 bytes
 .../_metadata                                      | Bin 40081 -> 0 bytes
 .../_metadata                                      | Bin 53342 -> 0 bytes
 .../_metadata                                      | Bin 53419 -> 0 bytes
 .../_metadata                                      | Bin 53271 -> 0 bytes
 .../_metadata                                      | Bin 53733 -> 0 bytes
 .../_metadata                                      | Bin 53773 -> 0 bytes
 .../_metadata                                      | Bin 220470 -> 0 bytes
 .../_metadata                                      | Bin 220470 -> 0 bytes
 .../_metadata                                      | Bin 253956 -> 0 bytes
 .../_metadata                                      | Bin 253956 -> 0 bytes
 .../_metadata                                      | Bin 287008 -> 0 bytes
 .../_metadata                                      | Bin 287008 -> 0 bytes
 .../_metadata                                      | Bin 51846 -> 0 bytes
 .../_metadata                                      | Bin 51942 -> 0 bytes
 .../StatefulJobSavepointMigrationITCase.scala      | 153 +++++++---------
 ...StatefulJobWBroadcastStateMigrationITCase.scala | 198 ++++++++-------------
 30 files changed, 144 insertions(+), 207 deletions(-)

diff --git a/flink-tests/src/test/resources/stateful-scala-udf-migration-itcase-flink1.10-rocksdb-savepoint/_metadata b/flink-tests/src/test/resources/stateful-scala-udf-migration-itcase-flink1.10-rocksdb-savepoint/_metadata
deleted file mode 100644
index d6ade08..0000000
Binary files a/flink-tests/src/test/resources/stateful-scala-udf-migration-itcase-flink1.10-rocksdb-savepoint/_metadata and /dev/null differ
diff --git a/flink-tests/src/test/resources/stateful-scala-udf-migration-itcase-flink1.11-rocksdb-savepoint/_metadata b/flink-tests/src/test/resources/stateful-scala-udf-migration-itcase-flink1.11-rocksdb-savepoint/_metadata
deleted file mode 100644
index 56d6d77..0000000
Binary files a/flink-tests/src/test/resources/stateful-scala-udf-migration-itcase-flink1.11-rocksdb-savepoint/_metadata and /dev/null differ
diff --git a/flink-tests/src/test/resources/stateful-scala-udf-migration-itcase-flink1.12-rocksdb-savepoint/_metadata b/flink-tests/src/test/resources/stateful-scala-udf-migration-itcase-flink1.12-rocksdb-savepoint/_metadata
deleted file mode 100644
index dd2ef62..0000000
Binary files a/flink-tests/src/test/resources/stateful-scala-udf-migration-itcase-flink1.12-rocksdb-savepoint/_metadata and /dev/null differ
diff --git a/flink-tests/src/test/resources/stateful-scala-udf-migration-itcase-flink1.13-rocksdb-savepoint/_metadata b/flink-tests/src/test/resources/stateful-scala-udf-migration-itcase-flink1.13-rocksdb-savepoint/_metadata
deleted file mode 100644
index 24636f5..0000000
Binary files a/flink-tests/src/test/resources/stateful-scala-udf-migration-itcase-flink1.13-rocksdb-savepoint/_metadata and /dev/null differ
diff --git a/flink-tests/src/test/resources/stateful-scala-udf-migration-itcase-flink1.14-rocksdb-savepoint/_metadata b/flink-tests/src/test/resources/stateful-scala-udf-migration-itcase-flink1.14-rocksdb-savepoint/_metadata
deleted file mode 100644
index fbbf888..0000000
Binary files a/flink-tests/src/test/resources/stateful-scala-udf-migration-itcase-flink1.14-rocksdb-savepoint/_metadata and /dev/null differ
diff --git a/flink-tests/src/test/resources/stateful-scala-udf-migration-itcase-flink1.3-jobmanager-savepoint/_metadata b/flink-tests/src/test/resources/stateful-scala-udf-migration-itcase-flink1.3-jobmanager-savepoint/_metadata
deleted file mode 100644
index e0c6da6..0000000
Binary files a/flink-tests/src/test/resources/stateful-scala-udf-migration-itcase-flink1.3-jobmanager-savepoint/_metadata and /dev/null differ
diff --git a/flink-tests/src/test/resources/stateful-scala-udf-migration-itcase-flink1.3-rocksdb-savepoint/_metadata b/flink-tests/src/test/resources/stateful-scala-udf-migration-itcase-flink1.3-rocksdb-savepoint/_metadata
deleted file mode 100644
index f719eac..0000000
Binary files a/flink-tests/src/test/resources/stateful-scala-udf-migration-itcase-flink1.3-rocksdb-savepoint/_metadata and /dev/null differ
diff --git a/flink-tests/src/test/resources/stateful-scala-udf-migration-itcase-flink1.4-jobmanager-savepoint/_metadata b/flink-tests/src/test/resources/stateful-scala-udf-migration-itcase-flink1.4-jobmanager-savepoint/_metadata
deleted file mode 100644
index 69aa8d4..0000000
Binary files a/flink-tests/src/test/resources/stateful-scala-udf-migration-itcase-flink1.4-jobmanager-savepoint/_metadata and /dev/null differ
diff --git a/flink-tests/src/test/resources/stateful-scala-udf-migration-itcase-flink1.4-rocksdb-savepoint/_metadata b/flink-tests/src/test/resources/stateful-scala-udf-migration-itcase-flink1.4-rocksdb-savepoint/_metadata
deleted file mode 100644
index 7dcef9c..0000000
Binary files a/flink-tests/src/test/resources/stateful-scala-udf-migration-itcase-flink1.4-rocksdb-savepoint/_metadata and /dev/null differ
diff --git a/flink-tests/src/test/resources/stateful-scala-udf-migration-itcase-flink1.6-jobmanager-savepoint/_metadata b/flink-tests/src/test/resources/stateful-scala-udf-migration-itcase-flink1.6-jobmanager-savepoint/_metadata
deleted file mode 100644
index 55243f3..0000000
Binary files a/flink-tests/src/test/resources/stateful-scala-udf-migration-itcase-flink1.6-jobmanager-savepoint/_metadata and /dev/null differ
diff --git a/flink-tests/src/test/resources/stateful-scala-udf-migration-itcase-flink1.6-rocksdb-savepoint/_metadata b/flink-tests/src/test/resources/stateful-scala-udf-migration-itcase-flink1.6-rocksdb-savepoint/_metadata
deleted file mode 100644
index defe3b1..0000000
Binary files a/flink-tests/src/test/resources/stateful-scala-udf-migration-itcase-flink1.6-rocksdb-savepoint/_metadata and /dev/null differ
diff --git a/flink-tests/src/test/resources/stateful-scala-udf-migration-itcase-flink1.7-jobmanager-savepoint/_metadata b/flink-tests/src/test/resources/stateful-scala-udf-migration-itcase-flink1.7-jobmanager-savepoint/_metadata
deleted file mode 100644
index b421e64..0000000
Binary files a/flink-tests/src/test/resources/stateful-scala-udf-migration-itcase-flink1.7-jobmanager-savepoint/_metadata and /dev/null differ
diff --git a/flink-tests/src/test/resources/stateful-scala-udf-migration-itcase-flink1.7-rocksdb-savepoint/_metadata b/flink-tests/src/test/resources/stateful-scala-udf-migration-itcase-flink1.7-rocksdb-savepoint/_metadata
deleted file mode 100644
index 53e493c..0000000
Binary files a/flink-tests/src/test/resources/stateful-scala-udf-migration-itcase-flink1.7-rocksdb-savepoint/_metadata and /dev/null differ
diff --git a/flink-tests/src/test/resources/stateful-scala-udf-migration-itcase-flink1.8-rocksdb-savepoint/_metadata b/flink-tests/src/test/resources/stateful-scala-udf-migration-itcase-flink1.8-rocksdb-savepoint/_metadata
deleted file mode 100644
index 3f41ed6..0000000
Binary files a/flink-tests/src/test/resources/stateful-scala-udf-migration-itcase-flink1.8-rocksdb-savepoint/_metadata and /dev/null differ
diff --git a/flink-tests/src/test/resources/stateful-scala-udf-migration-itcase-flink1.9-rocksdb-savepoint/_metadata b/flink-tests/src/test/resources/stateful-scala-udf-migration-itcase-flink1.9-rocksdb-savepoint/_metadata
deleted file mode 100644
index 2c191ef..0000000
Binary files a/flink-tests/src/test/resources/stateful-scala-udf-migration-itcase-flink1.9-rocksdb-savepoint/_metadata and /dev/null differ
diff --git a/flink-tests/src/test/resources/stateful-scala-with-broadcast-udf-migration-itcase-flink1.10-rocksdb-savepoint/_metadata b/flink-tests/src/test/resources/stateful-scala-with-broadcast-udf-migration-itcase-flink1.10-rocksdb-savepoint/_metadata
deleted file mode 100644
index e558ab9..0000000
Binary files a/flink-tests/src/test/resources/stateful-scala-with-broadcast-udf-migration-itcase-flink1.10-rocksdb-savepoint/_metadata and /dev/null differ
diff --git a/flink-tests/src/test/resources/stateful-scala-with-broadcast-udf-migration-itcase-flink1.11-rocksdb-savepoint/_metadata b/flink-tests/src/test/resources/stateful-scala-with-broadcast-udf-migration-itcase-flink1.11-rocksdb-savepoint/_metadata
deleted file mode 100644
index b4ffaa1..0000000
Binary files a/flink-tests/src/test/resources/stateful-scala-with-broadcast-udf-migration-itcase-flink1.11-rocksdb-savepoint/_metadata and /dev/null differ
diff --git a/flink-tests/src/test/resources/stateful-scala-with-broadcast-udf-migration-itcase-flink1.12-rocksdb-savepoint/_metadata b/flink-tests/src/test/resources/stateful-scala-with-broadcast-udf-migration-itcase-flink1.12-rocksdb-savepoint/_metadata
deleted file mode 100644
index e36c884..0000000
Binary files a/flink-tests/src/test/resources/stateful-scala-with-broadcast-udf-migration-itcase-flink1.12-rocksdb-savepoint/_metadata and /dev/null differ
diff --git a/flink-tests/src/test/resources/stateful-scala-with-broadcast-udf-migration-itcase-flink1.13-rocksdb-savepoint/_metadata b/flink-tests/src/test/resources/stateful-scala-with-broadcast-udf-migration-itcase-flink1.13-rocksdb-savepoint/_metadata
deleted file mode 100644
index 0615aa8..0000000
Binary files a/flink-tests/src/test/resources/stateful-scala-with-broadcast-udf-migration-itcase-flink1.13-rocksdb-savepoint/_metadata and /dev/null differ
diff --git a/flink-tests/src/test/resources/stateful-scala-with-broadcast-udf-migration-itcase-flink1.14-rocksdb-savepoint/_metadata b/flink-tests/src/test/resources/stateful-scala-with-broadcast-udf-migration-itcase-flink1.14-rocksdb-savepoint/_metadata
deleted file mode 100644
index 7436fca..0000000
Binary files a/flink-tests/src/test/resources/stateful-scala-with-broadcast-udf-migration-itcase-flink1.14-rocksdb-savepoint/_metadata and /dev/null differ
diff --git a/flink-tests/src/test/resources/stateful-scala-with-broadcast-udf-migration-itcase-flink1.5-jobmanager-savepoint/_metadata b/flink-tests/src/test/resources/stateful-scala-with-broadcast-udf-migration-itcase-flink1.5-jobmanager-savepoint/_metadata
deleted file mode 100644
index f845ee3..0000000
Binary files a/flink-tests/src/test/resources/stateful-scala-with-broadcast-udf-migration-itcase-flink1.5-jobmanager-savepoint/_metadata and /dev/null differ
diff --git a/flink-tests/src/test/resources/stateful-scala-with-broadcast-udf-migration-itcase-flink1.5-rocksdb-savepoint/_metadata b/flink-tests/src/test/resources/stateful-scala-with-broadcast-udf-migration-itcase-flink1.5-rocksdb-savepoint/_metadata
deleted file mode 100644
index 6b05ef9..0000000
Binary files a/flink-tests/src/test/resources/stateful-scala-with-broadcast-udf-migration-itcase-flink1.5-rocksdb-savepoint/_metadata and /dev/null differ
diff --git a/flink-tests/src/test/resources/stateful-scala-with-broadcast-udf-migration-itcase-flink1.6-jobmanager-savepoint/_metadata b/flink-tests/src/test/resources/stateful-scala-with-broadcast-udf-migration-itcase-flink1.6-jobmanager-savepoint/_metadata
deleted file mode 100644
index b64e810..0000000
Binary files a/flink-tests/src/test/resources/stateful-scala-with-broadcast-udf-migration-itcase-flink1.6-jobmanager-savepoint/_metadata and /dev/null differ
diff --git a/flink-tests/src/test/resources/stateful-scala-with-broadcast-udf-migration-itcase-flink1.6-rocksdb-savepoint/_metadata b/flink-tests/src/test/resources/stateful-scala-with-broadcast-udf-migration-itcase-flink1.6-rocksdb-savepoint/_metadata
deleted file mode 100644
index 59bebc9..0000000
Binary files a/flink-tests/src/test/resources/stateful-scala-with-broadcast-udf-migration-itcase-flink1.6-rocksdb-savepoint/_metadata and /dev/null differ
diff --git a/flink-tests/src/test/resources/stateful-scala-with-broadcast-udf-migration-itcase-flink1.7-jobmanager-savepoint/_metadata b/flink-tests/src/test/resources/stateful-scala-with-broadcast-udf-migration-itcase-flink1.7-jobmanager-savepoint/_metadata
deleted file mode 100644
index 88cee1b..0000000
Binary files a/flink-tests/src/test/resources/stateful-scala-with-broadcast-udf-migration-itcase-flink1.7-jobmanager-savepoint/_metadata and /dev/null differ
diff --git a/flink-tests/src/test/resources/stateful-scala-with-broadcast-udf-migration-itcase-flink1.7-rocksdb-savepoint/_metadata b/flink-tests/src/test/resources/stateful-scala-with-broadcast-udf-migration-itcase-flink1.7-rocksdb-savepoint/_metadata
deleted file mode 100644
index e4b058a..0000000
Binary files a/flink-tests/src/test/resources/stateful-scala-with-broadcast-udf-migration-itcase-flink1.7-rocksdb-savepoint/_metadata and /dev/null differ
diff --git a/flink-tests/src/test/resources/stateful-scala-with-broadcast-udf-migration-itcase-flink1.8-rocksdb-savepoint/_metadata b/flink-tests/src/test/resources/stateful-scala-with-broadcast-udf-migration-itcase-flink1.8-rocksdb-savepoint/_metadata
deleted file mode 100644
index f2870a4..0000000
Binary files a/flink-tests/src/test/resources/stateful-scala-with-broadcast-udf-migration-itcase-flink1.8-rocksdb-savepoint/_metadata and /dev/null differ
diff --git a/flink-tests/src/test/resources/stateful-scala-with-broadcast-udf-migration-itcase-flink1.9-rocksdb-savepoint/_metadata b/flink-tests/src/test/resources/stateful-scala-with-broadcast-udf-migration-itcase-flink1.9-rocksdb-savepoint/_metadata
deleted file mode 100644
index 5f20171..0000000
Binary files a/flink-tests/src/test/resources/stateful-scala-with-broadcast-udf-migration-itcase-flink1.9-rocksdb-savepoint/_metadata and /dev/null differ
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/migration/StatefulJobSavepointMigrationITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/migration/StatefulJobSavepointMigrationITCase.scala
index c68abf3..3ff91ee 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/migration/StatefulJobSavepointMigrationITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/migration/StatefulJobSavepointMigrationITCase.scala
@@ -35,56 +35,67 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
 import org.apache.flink.streaming.api.functions.sink.RichSinkFunction
 import org.apache.flink.streaming.api.functions.source.SourceFunction
 import org.apache.flink.streaming.api.watermark.Watermark
-import org.apache.flink.test.checkpointing.utils.SavepointMigrationTestBase
+import org.apache.flink.test.checkpointing.utils.SnapshotMigrationTestBase
 import org.apache.flink.util.Collector
 import org.apache.flink.api.java.tuple.Tuple2
 import org.apache.flink.runtime.state.{FunctionInitializationContext, FunctionSnapshotContext, StateBackendLoader}
 import org.apache.flink.api.scala._
 import org.apache.flink.api.scala.migration.CustomEnum.CustomEnum
-
+import org.apache.flink.test.checkpointing.utils.SnapshotMigrationTestBase.ExecutionMode
 import org.junit.runner.RunWith
 import org.junit.runners.Parameterized
-import org.junit.{Ignore, Test}
+import org.junit.Test
 
+import java.util.stream.Collectors
 import scala.util.{Failure, Try}
 
 object StatefulJobSavepointMigrationITCase {
 
+  // TODO increase this to newer version to create and test snapshot migration for newer versions
+  val currentVersion = FlinkVersion.v1_14
+
+  // TODO change this to CREATE_SNAPSHOT to (re)create binary snapshots
+  // TODO Note: You should generate the snapshot based on the release branch instead of the
+  // master.
+  val executionMode = ExecutionMode.VERIFY_SNAPSHOT
+
   @Parameterized.Parameters(name = "Migrate Savepoint / Backend: {0}")
   def parameters: util.Collection[(FlinkVersion, String)] = {
-    util.Arrays.asList(
-      (FlinkVersion.v1_3, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
-      (FlinkVersion.v1_3, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME),
-      (FlinkVersion.v1_4, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
-      (FlinkVersion.v1_4, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME),
-      (FlinkVersion.v1_6, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
-      (FlinkVersion.v1_6, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME),
-      (FlinkVersion.v1_7, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
-      (FlinkVersion.v1_7, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME),
+    var parameters = util.Arrays.asList(
+      //      (FlinkVersion.v1_3, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
+      //      (FlinkVersion.v1_4, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
+      //      (FlinkVersion.v1_6, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
+      //      (FlinkVersion.v1_7, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
+      // Note: It is not safe to restore savepoints created in a Scala applications with Flink
+      // version 1.7 or below. The reason is that up to version 1.7 the underlying Scala serializer
+      // used names of anonymous classes that depend on the relative position/order in code, e.g.,
+      // if two anonymous classes, instantiated inside the same class and from the same base class,
+      // change order in the code their names are switched.
+      // As a consequence, changes in code may result in restore failures.
+      // This was fixed in version 1.8, see: https://issues.apache.org/jira/browse/FLINK-10493
       (FlinkVersion.v1_8, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
-      (FlinkVersion.v1_8, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME),
       (FlinkVersion.v1_9, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
-      (FlinkVersion.v1_9, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME),
       (FlinkVersion.v1_10, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
-      (FlinkVersion.v1_10, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME),
       (FlinkVersion.v1_11, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
-      (FlinkVersion.v1_11, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME),
       (FlinkVersion.v1_12, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
-      (FlinkVersion.v1_12, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME),
       (FlinkVersion.v1_13, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
-      (FlinkVersion.v1_13, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME),
       (FlinkVersion.v1_14, StateBackendLoader.HASHMAP_STATE_BACKEND_NAME),
-      (FlinkVersion.v1_14, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME))
+      (FlinkVersion.v1_15, StateBackendLoader.HASHMAP_STATE_BACKEND_NAME),
+    )
+    if (executionMode == ExecutionMode.CREATE_SNAPSHOT) {
+      parameters = parameters.stream().filter(x => x._1 == currentVersion)
+        .collect(Collectors.toList())
+    }
+    parameters
   }
 
-  // TODO to generate savepoints for a specific Flink version / backend type,
-  // TODO change these values accordingly, e.g. to generate for 1.3 with RocksDB,
-  // TODO set as (FlinkVersion.v1_3, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME)
-  // TODO Note: You should generate the savepoint based on the release branch instead of the master.
-  val GENERATE_SAVEPOINT_VER: FlinkVersion = FlinkVersion.v1_14
-  val GENERATE_SAVEPOINT_BACKEND_TYPE: String = StateBackendLoader.HASHMAP_STATE_BACKEND_NAME
-
   val NUM_ELEMENTS = 4
+
+  def getSnapshotPath(migrationVersionAndBackend: (FlinkVersion, String)): String = {
+    s"stateful-scala-udf-migration-itcase" +
+      s"-flink${migrationVersionAndBackend._1}" +
+      s"-${migrationVersionAndBackend._2}-savepoint"
+  }
 }
 
 /**
@@ -92,16 +103,15 @@ object StatefulJobSavepointMigrationITCase {
  */
 @RunWith(classOf[Parameterized])
 class StatefulJobSavepointMigrationITCase(
-    migrationVersionAndBackend: (FlinkVersion, String))
-  extends SavepointMigrationTestBase with Serializable {
+                                           migrationVersionAndBackend: (FlinkVersion, String))
+  extends SnapshotMigrationTestBase with Serializable {
 
-  @Ignore
   @Test
-  def testCreateSavepoint(): Unit = {
+  def testSavepoint(): Unit = {
     val env = StreamExecutionEnvironment.getExecutionEnvironment
     env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
 
-    StatefulJobSavepointMigrationITCase.GENERATE_SAVEPOINT_BACKEND_TYPE match {
+    migrationVersionAndBackend._2 match {
       case StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME =>
         env.setStateBackend(new EmbeddedRocksDBStateBackend())
       case StateBackendLoader.MEMORY_STATE_BACKEND_NAME =>
@@ -111,7 +121,7 @@ class StatefulJobSavepointMigrationITCase(
       case _ => throw new UnsupportedOperationException
     }
 
-    env.setStateBackend(new MemoryStateBackend)
+    env.enableChangelogStateBackend(false)
     env.enableCheckpointing(500)
     env.setParallelism(4)
     env.setMaxParallelism(4)
@@ -127,60 +137,29 @@ class StatefulJobSavepointMigrationITCase(
       .flatMap(new StatefulFlatMapper)
       .addSink(new AccumulatorCountingSink)
 
-    executeAndSavepoint(
-      env,
-      s"src/test/resources/stateful-scala-udf-migration-itcase-flink" +
-        s"${StatefulJobSavepointMigrationITCase.GENERATE_SAVEPOINT_VER}" +
-        s"-${StatefulJobSavepointMigrationITCase.GENERATE_SAVEPOINT_BACKEND_TYPE}-savepoint",
-      new Tuple2(
-        AccumulatorCountingSink.NUM_ELEMENTS_ACCUMULATOR,
-        StatefulJobSavepointMigrationITCase.NUM_ELEMENTS
+    if (StatefulJobSavepointMigrationITCase.executionMode == ExecutionMode.CREATE_SNAPSHOT) {
+      executeAndSavepoint(
+        env,
+        s"src/test/resources/"
+          + StatefulJobSavepointMigrationITCase.getSnapshotPath(migrationVersionAndBackend),
+        new Tuple2(
+          AccumulatorCountingSink.NUM_ELEMENTS_ACCUMULATOR,
+          StatefulJobSavepointMigrationITCase.NUM_ELEMENTS
+        )
       )
-    )
-  }
-
-  @Test
-  def testRestoreSavepoint(): Unit = {
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
-
-    migrationVersionAndBackend._2 match {
-      case StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME =>
-        env.setStateBackend(new EmbeddedRocksDBStateBackend())
-      case StateBackendLoader.MEMORY_STATE_BACKEND_NAME =>
-        env.setStateBackend(new MemoryStateBackend())
-      case StateBackendLoader.HASHMAP_STATE_BACKEND_NAME =>
-        env.setStateBackend(new HashMapStateBackend())
-      case _ => throw new UnsupportedOperationException
-    }
-    env.enableChangelogStateBackend(false);
-
-    env.setStateBackend(new MemoryStateBackend)
-    env.enableCheckpointing(500)
-    env.setParallelism(4)
-    env.setMaxParallelism(4)
-
-    env
-      .addSource(
-        new CheckpointedSource(4)).setMaxParallelism(1).uid("checkpointedSource")
-      .keyBy(
-        new KeySelector[(Long, Long), Long] {
-          override def getKey(value: (Long, Long)): Long = value._1
-        }
+    } else if (
+      StatefulJobSavepointMigrationITCase.executionMode == ExecutionMode.VERIFY_SNAPSHOT) {
+      restoreAndExecute(
+        env,
+        SnapshotMigrationTestBase.getResourceFilename(
+          StatefulJobSavepointMigrationITCase.getSnapshotPath(migrationVersionAndBackend)),
+        new Tuple2(
+          AccumulatorCountingSink.NUM_ELEMENTS_ACCUMULATOR,
+          StatefulJobSavepointMigrationITCase.NUM_ELEMENTS)
       )
-      .flatMap(new StatefulFlatMapper)
-      .addSink(new AccumulatorCountingSink)
-
-    restoreAndExecute(
-      env,
-      SavepointMigrationTestBase.getResourceFilename(
-        s"stateful-scala" +
-          s"-udf-migration-itcase-flink${migrationVersionAndBackend._1}" +
-          s"-${migrationVersionAndBackend._2}-savepoint"),
-      new Tuple2(
-        AccumulatorCountingSink.NUM_ELEMENTS_ACCUMULATOR,
-        StatefulJobSavepointMigrationITCase.NUM_ELEMENTS)
-    )
+    } else {
+      throw new UnsupportedOperationException("Unsupported execution mode.")
+    }
   }
 
   @SerialVersionUID(1L)
@@ -190,7 +169,7 @@ class StatefulJobSavepointMigrationITCase(
 
   @SerialVersionUID(1L)
   private class CheckpointedSource(val numElements: Int)
-      extends SourceFunction[(Long, Long)] with CheckpointedFunction {
+    extends SourceFunction[(Long, Long)] with CheckpointedFunction {
 
     private var isRunning = true
     private var state: ListState[CustomCaseClass] = _
@@ -201,8 +180,8 @@ class StatefulJobSavepointMigrationITCase(
       ctx.getCheckpointLock synchronized {
         var i = 0
         while (i < numElements) {
-            ctx.collect(i, i)
-            i += 1
+          ctx.collect(i, i)
+          i += 1
         }
       }
       // don't emit a final watermark so that we don't trigger the registered event-time
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/migration/StatefulJobWBroadcastStateMigrationITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/migration/StatefulJobWBroadcastStateMigrationITCase.scala
index 78e646e..47dc3ae 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/migration/StatefulJobWBroadcastStateMigrationITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/migration/StatefulJobWBroadcastStateMigrationITCase.scala
@@ -41,132 +41,73 @@ import org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction
 import org.apache.flink.streaming.api.functions.sink.RichSinkFunction
 import org.apache.flink.streaming.api.functions.source.SourceFunction
 import org.apache.flink.streaming.api.watermark.Watermark
-import org.apache.flink.test.checkpointing.utils.SavepointMigrationTestBase
+import org.apache.flink.test.checkpointing.utils.SnapshotMigrationTestBase
+import org.apache.flink.test.checkpointing.utils.SnapshotMigrationTestBase.ExecutionMode
 import org.apache.flink.util.Collector
-
 import org.junit.runner.RunWith
 import org.junit.runners.Parameterized
-import org.junit.{Assert, Ignore, Test}
+import org.junit.{Assert, Test}
 
+import java.util.stream.Collectors
 import scala.util.{Failure, Try}
 
 object StatefulJobWBroadcastStateMigrationITCase {
 
+  // TODO increase this to newer version to create and test snapshot migration for newer versions
+  val currentVersion = FlinkVersion.v1_14
+
+  // TODO change this to CREATE_SNAPSHOT to (re)create binary snapshots
+  // TODO Note: You should generate the snapshot based on the release branch instead of the
+  // master.
+  val executionMode = ExecutionMode.VERIFY_SNAPSHOT
+
   @Parameterized.Parameters(name = "Migrate Savepoint / Backend: {0}")
   def parameters: util.Collection[(FlinkVersion, String)] = {
-    util.Arrays.asList(
-      (FlinkVersion.v1_5, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
-      (FlinkVersion.v1_5, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME),
-      (FlinkVersion.v1_6, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
-      (FlinkVersion.v1_6, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME),
-      (FlinkVersion.v1_7, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
-      (FlinkVersion.v1_7, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME),
+    var parameters = util.Arrays.asList(
+      //      (FlinkVersion.v1_5, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
+      //      (FlinkVersion.v1_6, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
+      //      (FlinkVersion.v1_7, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
+      // Note: It is not safe to restore savepoints created in a Scala applications with Flink
+      // version 1.7 or below. The reason is that up to version 1.7 the underlying Scala serializer
+      // used names of anonymous classes that depend on the relative position/order in code, e.g.,
+      // if two anonymous classes, instantiated inside the same class and from the same base class,
+      // change order in the code their names are switched.
+      // As a consequence, changes in code may result in restore failures.
+      // This was fixed in version 1.8, see: https://issues.apache.org/jira/browse/FLINK-10493
       (FlinkVersion.v1_8, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
-      (FlinkVersion.v1_8, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME),
       (FlinkVersion.v1_9, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
-      (FlinkVersion.v1_9, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME),
       (FlinkVersion.v1_10, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
-      (FlinkVersion.v1_10, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME),
       (FlinkVersion.v1_11, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
-      (FlinkVersion.v1_11, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME),
       (FlinkVersion.v1_12, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
-      (FlinkVersion.v1_12, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME),
       (FlinkVersion.v1_13, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
-      (FlinkVersion.v1_13, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME),
       (FlinkVersion.v1_14, StateBackendLoader.HASHMAP_STATE_BACKEND_NAME),
-      (FlinkVersion.v1_14, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME))
+      (FlinkVersion.v1_15, StateBackendLoader.HASHMAP_STATE_BACKEND_NAME),
+    )
+    if (executionMode == ExecutionMode.CREATE_SNAPSHOT) {
+      parameters = parameters.stream().filter(x => x._1 == currentVersion)
+        .collect(Collectors.toList())
+    }
+    parameters
   }
 
-  // TODO to generate savepoints for a specific Flink version / backend type,
-  // TODO change these values accordingly, e.g. to generate for 1.3 with RocksDB,
-  // TODO set as (FlinkVersion.v1_3, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME)
-  // TODO Note: You should generate the savepoint based on the release branch instead of the master.
-  val GENERATE_SAVEPOINT_VER: FlinkVersion = FlinkVersion.v1_14
-  val GENERATE_SAVEPOINT_BACKEND_TYPE: String = StateBackendLoader.HASHMAP_STATE_BACKEND_NAME
+  def getSnapshotPath(migrationVersionAndBackend: (FlinkVersion, String)): String = {
+    s"stateful-scala-with-broadcast-udf-migration-itcase" +
+      s"-flink${migrationVersionAndBackend._1}" +
+      s"-${migrationVersionAndBackend._2}-savepoint"
+  }
 
   val NUM_ELEMENTS = 4
 }
 
 /**
-  * ITCase for migration Scala state types across different Flink versions.
-  */
+ * ITCase for migration Scala state types across different Flink versions.
+ */
 @RunWith(classOf[Parameterized])
-class StatefulJobWBroadcastStateMigrationITCase(
-                                        migrationVersionAndBackend: (FlinkVersion, String))
-  extends SavepointMigrationTestBase with Serializable {
+class StatefulJobWBroadcastStateMigrationITCase(migrationVersionAndBackend: (FlinkVersion, String))
+  extends SnapshotMigrationTestBase with Serializable {
 
   @Test
-  @Ignore
-  def testCreateSavepointWithBroadcastState(): Unit = {
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
-
-    StatefulJobWBroadcastStateMigrationITCase.GENERATE_SAVEPOINT_BACKEND_TYPE match {
-      case StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME =>
-        env.setStateBackend(new EmbeddedRocksDBStateBackend())
-      case StateBackendLoader.MEMORY_STATE_BACKEND_NAME =>
-        env.setStateBackend(new MemoryStateBackend())
-      case StateBackendLoader.HASHMAP_STATE_BACKEND_NAME =>
-        env.setStateBackend(new HashMapStateBackend())
-      case _ => throw new UnsupportedOperationException
-    }
-    env.enableChangelogStateBackend(false)
-
-    lazy val firstBroadcastStateDesc = new MapStateDescriptor[Long, Long](
-      "broadcast-state-1",
-      BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]],
-      BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]])
-
-    lazy val secondBroadcastStateDesc = new MapStateDescriptor[String, String](
-      "broadcast-state-2",
-      BasicTypeInfo.STRING_TYPE_INFO,
-      BasicTypeInfo.STRING_TYPE_INFO)
-
-    env.setStateBackend(new MemoryStateBackend)
-    env.enableCheckpointing(500)
-    env.setParallelism(4)
-    env.setMaxParallelism(4)
-
-    val stream = env
-      .addSource(
-        new CheckpointedSource(4)).setMaxParallelism(1).uid("checkpointedSource")
-      .keyBy(
-        new KeySelector[(Long, Long), Long] {
-          override def getKey(value: (Long, Long)): Long = value._1
-        }
-      )
-      .flatMap(new StatefulFlatMapper)
-      .keyBy(
-        new KeySelector[(Long, Long), Long] {
-          override def getKey(value: (Long, Long)): Long = value._1
-        }
-      )
-
-    val broadcastStream = env
-      .addSource(
-        new CheckpointedSource(4)).setMaxParallelism(1).uid("checkpointedBroadcastSource")
-      .broadcast(firstBroadcastStateDesc, secondBroadcastStateDesc)
-
-    stream
-      .connect(broadcastStream)
-      .process(new TestBroadcastProcessFunction)
-      .addSink(new AccumulatorCountingSink)
-
-    executeAndSavepoint(
-      env,
-      s"src/test/resources/stateful-scala-with-broadcast" +
-        s"-udf-migration-itcase-flink" +
-        s"${StatefulJobWBroadcastStateMigrationITCase.GENERATE_SAVEPOINT_VER}" +
-        s"-${StatefulJobWBroadcastStateMigrationITCase.GENERATE_SAVEPOINT_BACKEND_TYPE}-savepoint",
-      new Tuple2(
-        AccumulatorCountingSink.NUM_ELEMENTS_ACCUMULATOR,
-        StatefulJobWBroadcastStateMigrationITCase.NUM_ELEMENTS
-      )
-    )
-  }
-
-  @Test
-  def testRestoreSavepointWithBroadcast(): Unit = {
+  def testSavepointWithBroadcast(): Unit = {
 
     val env = StreamExecutionEnvironment.getExecutionEnvironment
     env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
@@ -192,7 +133,6 @@ class StatefulJobWBroadcastStateMigrationITCase(
       BasicTypeInfo.STRING_TYPE_INFO,
       BasicTypeInfo.STRING_TYPE_INFO)
 
-    env.setStateBackend(new MemoryStateBackend)
     env.enableCheckpointing(500)
     env.setParallelism(4)
     env.setMaxParallelism(4)
@@ -217,26 +157,44 @@ class StatefulJobWBroadcastStateMigrationITCase(
         new CheckpointedSource(4)).setMaxParallelism(1).uid("checkpointedBroadcastSource")
       .broadcast(firstBroadcastStateDesc, secondBroadcastStateDesc)
 
-    val expectedFirstState: Map[Long, Long] =
-      Map(0L -> 0L, 1L -> 1L, 2L -> 2L, 3L -> 3L)
-    val expectedSecondState: Map[String, String] =
-      Map("0" -> "0", "1" -> "1", "2" -> "2", "3" -> "3")
-
-    stream
-      .connect(broadcastStream)
-      .process(new VerifyingBroadcastProcessFunction(expectedFirstState, expectedSecondState))
-      .addSink(new AccumulatorCountingSink)
-
-    restoreAndExecute(
-      env,
-      SavepointMigrationTestBase.getResourceFilename(
-        s"stateful-scala-with-broadcast" +
-          s"-udf-migration-itcase-flink${migrationVersionAndBackend._1}" +
-          s"-${migrationVersionAndBackend._2}-savepoint"),
-      new Tuple2(
-        AccumulatorCountingSink.NUM_ELEMENTS_ACCUMULATOR,
-        StatefulJobWBroadcastStateMigrationITCase.NUM_ELEMENTS)
-    )
+    if (StatefulJobWBroadcastStateMigrationITCase.executionMode == ExecutionMode.CREATE_SNAPSHOT) {
+      stream
+        .connect(broadcastStream)
+        .process(new TestBroadcastProcessFunction)
+        .addSink(new AccumulatorCountingSink)
+
+      executeAndSavepoint(
+        env,
+        s"src/test/resources/"
+          + StatefulJobWBroadcastStateMigrationITCase.getSnapshotPath(migrationVersionAndBackend),
+        new Tuple2(
+          AccumulatorCountingSink.NUM_ELEMENTS_ACCUMULATOR,
+          StatefulJobWBroadcastStateMigrationITCase.NUM_ELEMENTS
+        )
+      )
+    } else if (
+      StatefulJobWBroadcastStateMigrationITCase.executionMode == ExecutionMode.VERIFY_SNAPSHOT) {
+      val expectedFirstState: Map[Long, Long] =
+        Map(0L -> 0L, 1L -> 1L, 2L -> 2L, 3L -> 3L)
+      val expectedSecondState: Map[String, String] =
+        Map("0" -> "0", "1" -> "1", "2" -> "2", "3" -> "3")
+
+      stream
+        .connect(broadcastStream)
+        .process(new VerifyingBroadcastProcessFunction(expectedFirstState, expectedSecondState))
+        .addSink(new AccumulatorCountingSink)
+
+      restoreAndExecute(
+        env,
+        SnapshotMigrationTestBase.getResourceFilename(
+          StatefulJobWBroadcastStateMigrationITCase.getSnapshotPath(migrationVersionAndBackend)),
+        new Tuple2(
+          AccumulatorCountingSink.NUM_ELEMENTS_ACCUMULATOR,
+          StatefulJobWBroadcastStateMigrationITCase.NUM_ELEMENTS)
+      )
+    } else {
+      throw new UnsupportedOperationException("Unsupported execution mode.")
+    }
   }
 }