You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by dw...@apache.org on 2022/02/22 08:27:31 UTC

[flink] 02/03: [FLINK-26176] Fix scala savepoint migration tests

This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 9dd6043f4e474658cd6830a011d74f0147fbcc07
Author: Sebastian Mattheis <se...@ververica.com>
AuthorDate: Fri Feb 18 10:17:59 2022 +0100

    [FLINK-26176] Fix scala savepoint migration tests
    
    - Remove test parameters and artifacts for tests of rocksb state backend since artifacts are no
      rocksdb savepoints but actually memory state savepoints
    - Remove test parameters and artifacts for tests of savepoints with Flink version 1.7 or below
      as those are not safe to restore due to bug FLINK-10493
---
 .../_metadata                                      | Bin 41481 -> 0 bytes
 .../_metadata                                      | Bin 41584 -> 0 bytes
 .../_metadata                                      | Bin 41436 -> 0 bytes
 .../_metadata                                      | Bin 41962 -> 0 bytes
 .../_metadata                                      | Bin 42002 -> 0 bytes
 .../_metadata                                      | Bin 213759 -> 0 bytes
 .../_metadata                                      | Bin 213759 -> 0 bytes
 .../_metadata                                      | Bin 204543 -> 0 bytes
 .../_metadata                                      | Bin 204543 -> 0 bytes
 .../_metadata                                      | Bin 205638 -> 0 bytes
 .../_metadata                                      | Bin 205638 -> 0 bytes
 .../_metadata                                      | Bin 229848 -> 0 bytes
 .../_metadata                                      | Bin 229848 -> 0 bytes
 .../_metadata                                      | Bin 39985 -> 0 bytes
 .../_metadata                                      | Bin 40081 -> 0 bytes
 .../_metadata                                      | Bin 53342 -> 0 bytes
 .../_metadata                                      | Bin 53419 -> 0 bytes
 .../_metadata                                      | Bin 53271 -> 0 bytes
 .../_metadata                                      | Bin 53733 -> 0 bytes
 .../_metadata                                      | Bin 53773 -> 0 bytes
 .../_metadata                                      | Bin 220470 -> 0 bytes
 .../_metadata                                      | Bin 220470 -> 0 bytes
 .../_metadata                                      | Bin 253956 -> 0 bytes
 .../_metadata                                      | Bin 253956 -> 0 bytes
 .../_metadata                                      | Bin 287008 -> 0 bytes
 .../_metadata                                      | Bin 287008 -> 0 bytes
 .../_metadata                                      | Bin 51846 -> 0 bytes
 .../_metadata                                      | Bin 51942 -> 0 bytes
 .../StatefulJobSavepointMigrationITCase.scala      | 153 +++++++---------
 ...StatefulJobWBroadcastStateMigrationITCase.scala | 198 ++++++++-------------
 30 files changed, 144 insertions(+), 207 deletions(-)

diff --git a/flink-tests/src/test/resources/stateful-scala-udf-migration-itcase-flink1.10-rocksdb-savepoint/_metadata b/flink-tests/src/test/resources/stateful-scala-udf-migration-itcase-flink1.10-rocksdb-savepoint/_metadata
deleted file mode 100644
index d6ade08..0000000
Binary files a/flink-tests/src/test/resources/stateful-scala-udf-migration-itcase-flink1.10-rocksdb-savepoint/_metadata and /dev/null differ
diff --git a/flink-tests/src/test/resources/stateful-scala-udf-migration-itcase-flink1.11-rocksdb-savepoint/_metadata b/flink-tests/src/test/resources/stateful-scala-udf-migration-itcase-flink1.11-rocksdb-savepoint/_metadata
deleted file mode 100644
index 56d6d77..0000000
Binary files a/flink-tests/src/test/resources/stateful-scala-udf-migration-itcase-flink1.11-rocksdb-savepoint/_metadata and /dev/null differ
diff --git a/flink-tests/src/test/resources/stateful-scala-udf-migration-itcase-flink1.12-rocksdb-savepoint/_metadata b/flink-tests/src/test/resources/stateful-scala-udf-migration-itcase-flink1.12-rocksdb-savepoint/_metadata
deleted file mode 100644
index dd2ef62..0000000
Binary files a/flink-tests/src/test/resources/stateful-scala-udf-migration-itcase-flink1.12-rocksdb-savepoint/_metadata and /dev/null differ
diff --git a/flink-tests/src/test/resources/stateful-scala-udf-migration-itcase-flink1.13-rocksdb-savepoint/_metadata b/flink-tests/src/test/resources/stateful-scala-udf-migration-itcase-flink1.13-rocksdb-savepoint/_metadata
deleted file mode 100644
index 24636f5..0000000
Binary files a/flink-tests/src/test/resources/stateful-scala-udf-migration-itcase-flink1.13-rocksdb-savepoint/_metadata and /dev/null differ
diff --git a/flink-tests/src/test/resources/stateful-scala-udf-migration-itcase-flink1.14-rocksdb-savepoint/_metadata b/flink-tests/src/test/resources/stateful-scala-udf-migration-itcase-flink1.14-rocksdb-savepoint/_metadata
deleted file mode 100644
index fbbf888..0000000
Binary files a/flink-tests/src/test/resources/stateful-scala-udf-migration-itcase-flink1.14-rocksdb-savepoint/_metadata and /dev/null differ
diff --git a/flink-tests/src/test/resources/stateful-scala-udf-migration-itcase-flink1.3-jobmanager-savepoint/_metadata b/flink-tests/src/test/resources/stateful-scala-udf-migration-itcase-flink1.3-jobmanager-savepoint/_metadata
deleted file mode 100644
index e0c6da6..0000000
Binary files a/flink-tests/src/test/resources/stateful-scala-udf-migration-itcase-flink1.3-jobmanager-savepoint/_metadata and /dev/null differ
diff --git a/flink-tests/src/test/resources/stateful-scala-udf-migration-itcase-flink1.3-rocksdb-savepoint/_metadata b/flink-tests/src/test/resources/stateful-scala-udf-migration-itcase-flink1.3-rocksdb-savepoint/_metadata
deleted file mode 100644
index f719eac..0000000
Binary files a/flink-tests/src/test/resources/stateful-scala-udf-migration-itcase-flink1.3-rocksdb-savepoint/_metadata and /dev/null differ
diff --git a/flink-tests/src/test/resources/stateful-scala-udf-migration-itcase-flink1.4-jobmanager-savepoint/_metadata b/flink-tests/src/test/resources/stateful-scala-udf-migration-itcase-flink1.4-jobmanager-savepoint/_metadata
deleted file mode 100644
index 69aa8d4..0000000
Binary files a/flink-tests/src/test/resources/stateful-scala-udf-migration-itcase-flink1.4-jobmanager-savepoint/_metadata and /dev/null differ
diff --git a/flink-tests/src/test/resources/stateful-scala-udf-migration-itcase-flink1.4-rocksdb-savepoint/_metadata b/flink-tests/src/test/resources/stateful-scala-udf-migration-itcase-flink1.4-rocksdb-savepoint/_metadata
deleted file mode 100644
index 7dcef9c..0000000
Binary files a/flink-tests/src/test/resources/stateful-scala-udf-migration-itcase-flink1.4-rocksdb-savepoint/_metadata and /dev/null differ
diff --git a/flink-tests/src/test/resources/stateful-scala-udf-migration-itcase-flink1.6-jobmanager-savepoint/_metadata b/flink-tests/src/test/resources/stateful-scala-udf-migration-itcase-flink1.6-jobmanager-savepoint/_metadata
deleted file mode 100644
index 55243f3..0000000
Binary files a/flink-tests/src/test/resources/stateful-scala-udf-migration-itcase-flink1.6-jobmanager-savepoint/_metadata and /dev/null differ
diff --git a/flink-tests/src/test/resources/stateful-scala-udf-migration-itcase-flink1.6-rocksdb-savepoint/_metadata b/flink-tests/src/test/resources/stateful-scala-udf-migration-itcase-flink1.6-rocksdb-savepoint/_metadata
deleted file mode 100644
index defe3b1..0000000
Binary files a/flink-tests/src/test/resources/stateful-scala-udf-migration-itcase-flink1.6-rocksdb-savepoint/_metadata and /dev/null differ
diff --git a/flink-tests/src/test/resources/stateful-scala-udf-migration-itcase-flink1.7-jobmanager-savepoint/_metadata b/flink-tests/src/test/resources/stateful-scala-udf-migration-itcase-flink1.7-jobmanager-savepoint/_metadata
deleted file mode 100644
index b421e64..0000000
Binary files a/flink-tests/src/test/resources/stateful-scala-udf-migration-itcase-flink1.7-jobmanager-savepoint/_metadata and /dev/null differ
diff --git a/flink-tests/src/test/resources/stateful-scala-udf-migration-itcase-flink1.7-rocksdb-savepoint/_metadata b/flink-tests/src/test/resources/stateful-scala-udf-migration-itcase-flink1.7-rocksdb-savepoint/_metadata
deleted file mode 100644
index 53e493c..0000000
Binary files a/flink-tests/src/test/resources/stateful-scala-udf-migration-itcase-flink1.7-rocksdb-savepoint/_metadata and /dev/null differ
diff --git a/flink-tests/src/test/resources/stateful-scala-udf-migration-itcase-flink1.8-rocksdb-savepoint/_metadata b/flink-tests/src/test/resources/stateful-scala-udf-migration-itcase-flink1.8-rocksdb-savepoint/_metadata
deleted file mode 100644
index 3f41ed6..0000000
Binary files a/flink-tests/src/test/resources/stateful-scala-udf-migration-itcase-flink1.8-rocksdb-savepoint/_metadata and /dev/null differ
diff --git a/flink-tests/src/test/resources/stateful-scala-udf-migration-itcase-flink1.9-rocksdb-savepoint/_metadata b/flink-tests/src/test/resources/stateful-scala-udf-migration-itcase-flink1.9-rocksdb-savepoint/_metadata
deleted file mode 100644
index 2c191ef..0000000
Binary files a/flink-tests/src/test/resources/stateful-scala-udf-migration-itcase-flink1.9-rocksdb-savepoint/_metadata and /dev/null differ
diff --git a/flink-tests/src/test/resources/stateful-scala-with-broadcast-udf-migration-itcase-flink1.10-rocksdb-savepoint/_metadata b/flink-tests/src/test/resources/stateful-scala-with-broadcast-udf-migration-itcase-flink1.10-rocksdb-savepoint/_metadata
deleted file mode 100644
index e558ab9..0000000
Binary files a/flink-tests/src/test/resources/stateful-scala-with-broadcast-udf-migration-itcase-flink1.10-rocksdb-savepoint/_metadata and /dev/null differ
diff --git a/flink-tests/src/test/resources/stateful-scala-with-broadcast-udf-migration-itcase-flink1.11-rocksdb-savepoint/_metadata b/flink-tests/src/test/resources/stateful-scala-with-broadcast-udf-migration-itcase-flink1.11-rocksdb-savepoint/_metadata
deleted file mode 100644
index b4ffaa1..0000000
Binary files a/flink-tests/src/test/resources/stateful-scala-with-broadcast-udf-migration-itcase-flink1.11-rocksdb-savepoint/_metadata and /dev/null differ
diff --git a/flink-tests/src/test/resources/stateful-scala-with-broadcast-udf-migration-itcase-flink1.12-rocksdb-savepoint/_metadata b/flink-tests/src/test/resources/stateful-scala-with-broadcast-udf-migration-itcase-flink1.12-rocksdb-savepoint/_metadata
deleted file mode 100644
index e36c884..0000000
Binary files a/flink-tests/src/test/resources/stateful-scala-with-broadcast-udf-migration-itcase-flink1.12-rocksdb-savepoint/_metadata and /dev/null differ
diff --git a/flink-tests/src/test/resources/stateful-scala-with-broadcast-udf-migration-itcase-flink1.13-rocksdb-savepoint/_metadata b/flink-tests/src/test/resources/stateful-scala-with-broadcast-udf-migration-itcase-flink1.13-rocksdb-savepoint/_metadata
deleted file mode 100644
index 0615aa8..0000000
Binary files a/flink-tests/src/test/resources/stateful-scala-with-broadcast-udf-migration-itcase-flink1.13-rocksdb-savepoint/_metadata and /dev/null differ
diff --git a/flink-tests/src/test/resources/stateful-scala-with-broadcast-udf-migration-itcase-flink1.14-rocksdb-savepoint/_metadata b/flink-tests/src/test/resources/stateful-scala-with-broadcast-udf-migration-itcase-flink1.14-rocksdb-savepoint/_metadata
deleted file mode 100644
index 7436fca..0000000
Binary files a/flink-tests/src/test/resources/stateful-scala-with-broadcast-udf-migration-itcase-flink1.14-rocksdb-savepoint/_metadata and /dev/null differ
diff --git a/flink-tests/src/test/resources/stateful-scala-with-broadcast-udf-migration-itcase-flink1.5-jobmanager-savepoint/_metadata b/flink-tests/src/test/resources/stateful-scala-with-broadcast-udf-migration-itcase-flink1.5-jobmanager-savepoint/_metadata
deleted file mode 100644
index f845ee3..0000000
Binary files a/flink-tests/src/test/resources/stateful-scala-with-broadcast-udf-migration-itcase-flink1.5-jobmanager-savepoint/_metadata and /dev/null differ
diff --git a/flink-tests/src/test/resources/stateful-scala-with-broadcast-udf-migration-itcase-flink1.5-rocksdb-savepoint/_metadata b/flink-tests/src/test/resources/stateful-scala-with-broadcast-udf-migration-itcase-flink1.5-rocksdb-savepoint/_metadata
deleted file mode 100644
index 6b05ef9..0000000
Binary files a/flink-tests/src/test/resources/stateful-scala-with-broadcast-udf-migration-itcase-flink1.5-rocksdb-savepoint/_metadata and /dev/null differ
diff --git a/flink-tests/src/test/resources/stateful-scala-with-broadcast-udf-migration-itcase-flink1.6-jobmanager-savepoint/_metadata b/flink-tests/src/test/resources/stateful-scala-with-broadcast-udf-migration-itcase-flink1.6-jobmanager-savepoint/_metadata
deleted file mode 100644
index b64e810..0000000
Binary files a/flink-tests/src/test/resources/stateful-scala-with-broadcast-udf-migration-itcase-flink1.6-jobmanager-savepoint/_metadata and /dev/null differ
diff --git a/flink-tests/src/test/resources/stateful-scala-with-broadcast-udf-migration-itcase-flink1.6-rocksdb-savepoint/_metadata b/flink-tests/src/test/resources/stateful-scala-with-broadcast-udf-migration-itcase-flink1.6-rocksdb-savepoint/_metadata
deleted file mode 100644
index 59bebc9..0000000
Binary files a/flink-tests/src/test/resources/stateful-scala-with-broadcast-udf-migration-itcase-flink1.6-rocksdb-savepoint/_metadata and /dev/null differ
diff --git a/flink-tests/src/test/resources/stateful-scala-with-broadcast-udf-migration-itcase-flink1.7-jobmanager-savepoint/_metadata b/flink-tests/src/test/resources/stateful-scala-with-broadcast-udf-migration-itcase-flink1.7-jobmanager-savepoint/_metadata
deleted file mode 100644
index 88cee1b..0000000
Binary files a/flink-tests/src/test/resources/stateful-scala-with-broadcast-udf-migration-itcase-flink1.7-jobmanager-savepoint/_metadata and /dev/null differ
diff --git a/flink-tests/src/test/resources/stateful-scala-with-broadcast-udf-migration-itcase-flink1.7-rocksdb-savepoint/_metadata b/flink-tests/src/test/resources/stateful-scala-with-broadcast-udf-migration-itcase-flink1.7-rocksdb-savepoint/_metadata
deleted file mode 100644
index e4b058a..0000000
Binary files a/flink-tests/src/test/resources/stateful-scala-with-broadcast-udf-migration-itcase-flink1.7-rocksdb-savepoint/_metadata and /dev/null differ
diff --git a/flink-tests/src/test/resources/stateful-scala-with-broadcast-udf-migration-itcase-flink1.8-rocksdb-savepoint/_metadata b/flink-tests/src/test/resources/stateful-scala-with-broadcast-udf-migration-itcase-flink1.8-rocksdb-savepoint/_metadata
deleted file mode 100644
index f2870a4..0000000
Binary files a/flink-tests/src/test/resources/stateful-scala-with-broadcast-udf-migration-itcase-flink1.8-rocksdb-savepoint/_metadata and /dev/null differ
diff --git a/flink-tests/src/test/resources/stateful-scala-with-broadcast-udf-migration-itcase-flink1.9-rocksdb-savepoint/_metadata b/flink-tests/src/test/resources/stateful-scala-with-broadcast-udf-migration-itcase-flink1.9-rocksdb-savepoint/_metadata
deleted file mode 100644
index 5f20171..0000000
Binary files a/flink-tests/src/test/resources/stateful-scala-with-broadcast-udf-migration-itcase-flink1.9-rocksdb-savepoint/_metadata and /dev/null differ
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/migration/StatefulJobSavepointMigrationITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/migration/StatefulJobSavepointMigrationITCase.scala
index c68abf3..3ff91ee 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/migration/StatefulJobSavepointMigrationITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/migration/StatefulJobSavepointMigrationITCase.scala
@@ -35,56 +35,67 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
 import org.apache.flink.streaming.api.functions.sink.RichSinkFunction
 import org.apache.flink.streaming.api.functions.source.SourceFunction
 import org.apache.flink.streaming.api.watermark.Watermark
-import org.apache.flink.test.checkpointing.utils.SavepointMigrationTestBase
+import org.apache.flink.test.checkpointing.utils.SnapshotMigrationTestBase
 import org.apache.flink.util.Collector
 import org.apache.flink.api.java.tuple.Tuple2
 import org.apache.flink.runtime.state.{FunctionInitializationContext, FunctionSnapshotContext, StateBackendLoader}
 import org.apache.flink.api.scala._
 import org.apache.flink.api.scala.migration.CustomEnum.CustomEnum
-
+import org.apache.flink.test.checkpointing.utils.SnapshotMigrationTestBase.ExecutionMode
 import org.junit.runner.RunWith
 import org.junit.runners.Parameterized
-import org.junit.{Ignore, Test}
+import org.junit.Test
 
+import java.util.stream.Collectors
 import scala.util.{Failure, Try}
 
 object StatefulJobSavepointMigrationITCase {
 
+  // TODO increase this to newer version to create and test snapshot migration for newer versions
+  val currentVersion = FlinkVersion.v1_14
+
+  // TODO change this to CREATE_SNAPSHOT to (re)create binary snapshots
+  // TODO Note: You should generate the snapshot based on the release branch instead of the
+  // master.
+  val executionMode = ExecutionMode.VERIFY_SNAPSHOT
+
   @Parameterized.Parameters(name = "Migrate Savepoint / Backend: {0}")
   def parameters: util.Collection[(FlinkVersion, String)] = {
-    util.Arrays.asList(
-      (FlinkVersion.v1_3, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
-      (FlinkVersion.v1_3, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME),
-      (FlinkVersion.v1_4, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
-      (FlinkVersion.v1_4, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME),
-      (FlinkVersion.v1_6, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
-      (FlinkVersion.v1_6, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME),
-      (FlinkVersion.v1_7, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
-      (FlinkVersion.v1_7, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME),
+    var parameters = util.Arrays.asList(
+      //      (FlinkVersion.v1_3, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
+      //      (FlinkVersion.v1_4, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
+      //      (FlinkVersion.v1_6, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
+      //      (FlinkVersion.v1_7, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
+      // Note: It is not safe to restore savepoints created in a Scala applications with Flink
+      // version 1.7 or below. The reason is that up to version 1.7 the underlying Scala serializer
+      // used names of anonymous classes that depend on the relative position/order in code, e.g.,
+      // if two anonymous classes, instantiated inside the same class and from the same base class,
+      // change order in the code their names are switched.
+      // As a consequence, changes in code may result in restore failures.
+      // This was fixed in version 1.8, see: https://issues.apache.org/jira/browse/FLINK-10493
       (FlinkVersion.v1_8, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
-      (FlinkVersion.v1_8, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME),
       (FlinkVersion.v1_9, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
-      (FlinkVersion.v1_9, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME),
       (FlinkVersion.v1_10, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
-      (FlinkVersion.v1_10, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME),
       (FlinkVersion.v1_11, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
-      (FlinkVersion.v1_11, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME),
       (FlinkVersion.v1_12, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
-      (FlinkVersion.v1_12, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME),
       (FlinkVersion.v1_13, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
-      (FlinkVersion.v1_13, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME),
       (FlinkVersion.v1_14, StateBackendLoader.HASHMAP_STATE_BACKEND_NAME),
-      (FlinkVersion.v1_14, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME))
+      (FlinkVersion.v1_15, StateBackendLoader.HASHMAP_STATE_BACKEND_NAME),
+    )
+    if (executionMode == ExecutionMode.CREATE_SNAPSHOT) {
+      parameters = parameters.stream().filter(x => x._1 == currentVersion)
+        .collect(Collectors.toList())
+    }
+    parameters
   }
 
-  // TODO to generate savepoints for a specific Flink version / backend type,
-  // TODO change these values accordingly, e.g. to generate for 1.3 with RocksDB,
-  // TODO set as (FlinkVersion.v1_3, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME)
-  // TODO Note: You should generate the savepoint based on the release branch instead of the master.
-  val GENERATE_SAVEPOINT_VER: FlinkVersion = FlinkVersion.v1_14
-  val GENERATE_SAVEPOINT_BACKEND_TYPE: String = StateBackendLoader.HASHMAP_STATE_BACKEND_NAME
-
   val NUM_ELEMENTS = 4
+
+  def getSnapshotPath(migrationVersionAndBackend: (FlinkVersion, String)): String = {
+    s"stateful-scala-udf-migration-itcase" +
+      s"-flink${migrationVersionAndBackend._1}" +
+      s"-${migrationVersionAndBackend._2}-savepoint"
+  }
 }
 
 /**
@@ -92,16 +103,15 @@ object StatefulJobSavepointMigrationITCase {
  */
 @RunWith(classOf[Parameterized])
 class StatefulJobSavepointMigrationITCase(
-    migrationVersionAndBackend: (FlinkVersion, String))
-  extends SavepointMigrationTestBase with Serializable {
+                                           migrationVersionAndBackend: (FlinkVersion, String))
+  extends SnapshotMigrationTestBase with Serializable {
 
-  @Ignore
   @Test
-  def testCreateSavepoint(): Unit = {
+  def testSavepoint(): Unit = {
     val env = StreamExecutionEnvironment.getExecutionEnvironment
     env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
 
-    StatefulJobSavepointMigrationITCase.GENERATE_SAVEPOINT_BACKEND_TYPE match {
+    migrationVersionAndBackend._2 match {
       case StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME =>
         env.setStateBackend(new EmbeddedRocksDBStateBackend())
       case StateBackendLoader.MEMORY_STATE_BACKEND_NAME =>
@@ -111,7 +121,7 @@ class StatefulJobSavepointMigrationITCase(
       case _ => throw new UnsupportedOperationException
     }
 
-    env.setStateBackend(new MemoryStateBackend)
+    env.enableChangelogStateBackend(false)
     env.enableCheckpointing(500)
     env.setParallelism(4)
     env.setMaxParallelism(4)
@@ -127,60 +137,29 @@ class StatefulJobSavepointMigrationITCase(
       .flatMap(new StatefulFlatMapper)
       .addSink(new AccumulatorCountingSink)
 
-    executeAndSavepoint(
-      env,
-      s"src/test/resources/stateful-scala-udf-migration-itcase-flink" +
-        s"${StatefulJobSavepointMigrationITCase.GENERATE_SAVEPOINT_VER}" +
-        s"-${StatefulJobSavepointMigrationITCase.GENERATE_SAVEPOINT_BACKEND_TYPE}-savepoint",
-      new Tuple2(
-        AccumulatorCountingSink.NUM_ELEMENTS_ACCUMULATOR,
-        StatefulJobSavepointMigrationITCase.NUM_ELEMENTS
+    if (StatefulJobSavepointMigrationITCase.executionMode == ExecutionMode.CREATE_SNAPSHOT) {
+      executeAndSavepoint(
+        env,
+        s"src/test/resources/"
+          + StatefulJobSavepointMigrationITCase.getSnapshotPath(migrationVersionAndBackend),
+        new Tuple2(
+          AccumulatorCountingSink.NUM_ELEMENTS_ACCUMULATOR,
+          StatefulJobSavepointMigrationITCase.NUM_ELEMENTS
+        )
       )
-    )
-  }
-
-  @Test
-  def testRestoreSavepoint(): Unit = {
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
-
-    migrationVersionAndBackend._2 match {
-      case StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME =>
-        env.setStateBackend(new EmbeddedRocksDBStateBackend())
-      case StateBackendLoader.MEMORY_STATE_BACKEND_NAME =>
-        env.setStateBackend(new MemoryStateBackend())
-      case StateBackendLoader.HASHMAP_STATE_BACKEND_NAME =>
-        env.setStateBackend(new HashMapStateBackend())
-      case _ => throw new UnsupportedOperationException
-    }
-    env.enableChangelogStateBackend(false);
-
-    env.setStateBackend(new MemoryStateBackend)
-    env.enableCheckpointing(500)
-    env.setParallelism(4)
-    env.setMaxParallelism(4)
-
-    env
-      .addSource(
-        new CheckpointedSource(4)).setMaxParallelism(1).uid("checkpointedSource")
-      .keyBy(
-        new KeySelector[(Long, Long), Long] {
-          override def getKey(value: (Long, Long)): Long = value._1
-        }
+    } else if (
+      StatefulJobSavepointMigrationITCase.executionMode == ExecutionMode.VERIFY_SNAPSHOT) {
+      restoreAndExecute(
+        env,
+        SnapshotMigrationTestBase.getResourceFilename(
+          StatefulJobSavepointMigrationITCase.getSnapshotPath(migrationVersionAndBackend)),
+        new Tuple2(
+          AccumulatorCountingSink.NUM_ELEMENTS_ACCUMULATOR,
+          StatefulJobSavepointMigrationITCase.NUM_ELEMENTS)
       )
-      .flatMap(new StatefulFlatMapper)
-      .addSink(new AccumulatorCountingSink)
-
-    restoreAndExecute(
-      env,
-      SavepointMigrationTestBase.getResourceFilename(
-        s"stateful-scala" +
-          s"-udf-migration-itcase-flink${migrationVersionAndBackend._1}" +
-          s"-${migrationVersionAndBackend._2}-savepoint"),
-      new Tuple2(
-        AccumulatorCountingSink.NUM_ELEMENTS_ACCUMULATOR,
-        StatefulJobSavepointMigrationITCase.NUM_ELEMENTS)
-    )
+    } else {
+      throw new UnsupportedOperationException("Unsupported execution mode.")
+    }
   }
 
   @SerialVersionUID(1L)
@@ -190,7 +169,7 @@ class StatefulJobSavepointMigrationITCase(
 
   @SerialVersionUID(1L)
   private class CheckpointedSource(val numElements: Int)
-      extends SourceFunction[(Long, Long)] with CheckpointedFunction {
+    extends SourceFunction[(Long, Long)] with CheckpointedFunction {
 
     private var isRunning = true
     private var state: ListState[CustomCaseClass] = _
@@ -201,8 +180,8 @@ class StatefulJobSavepointMigrationITCase(
       ctx.getCheckpointLock synchronized {
         var i = 0
         while (i < numElements) {
-            ctx.collect(i, i)
-            i += 1
+          ctx.collect(i, i)
+          i += 1
         }
       }
       // don't emit a final watermark so that we don't trigger the registered event-time
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/migration/StatefulJobWBroadcastStateMigrationITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/migration/StatefulJobWBroadcastStateMigrationITCase.scala
index 78e646e..47dc3ae 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/migration/StatefulJobWBroadcastStateMigrationITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/migration/StatefulJobWBroadcastStateMigrationITCase.scala
@@ -41,132 +41,73 @@ import org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction
 import org.apache.flink.streaming.api.functions.sink.RichSinkFunction
 import org.apache.flink.streaming.api.functions.source.SourceFunction
 import org.apache.flink.streaming.api.watermark.Watermark
-import org.apache.flink.test.checkpointing.utils.SavepointMigrationTestBase
+import org.apache.flink.test.checkpointing.utils.SnapshotMigrationTestBase
+import org.apache.flink.test.checkpointing.utils.SnapshotMigrationTestBase.ExecutionMode
 import org.apache.flink.util.Collector
-
 import org.junit.runner.RunWith
 import org.junit.runners.Parameterized
-import org.junit.{Assert, Ignore, Test}
+import org.junit.{Assert, Test}
 
+import java.util.stream.Collectors
 import scala.util.{Failure, Try}
 
 object StatefulJobWBroadcastStateMigrationITCase {
 
+  // TODO increase this to newer version to create and test snapshot migration for newer versions
+  val currentVersion = FlinkVersion.v1_14
+
+  // TODO change this to CREATE_SNAPSHOT to (re)create binary snapshots
+  // TODO Note: You should generate the snapshot based on the release branch instead of the
+  // master.
+  val executionMode = ExecutionMode.VERIFY_SNAPSHOT
+
   @Parameterized.Parameters(name = "Migrate Savepoint / Backend: {0}")
   def parameters: util.Collection[(FlinkVersion, String)] = {
-    util.Arrays.asList(
-      (FlinkVersion.v1_5, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
-      (FlinkVersion.v1_5, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME),
-      (FlinkVersion.v1_6, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
-      (FlinkVersion.v1_6, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME),
-      (FlinkVersion.v1_7, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
-      (FlinkVersion.v1_7, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME),
+    var parameters = util.Arrays.asList(
+      //      (FlinkVersion.v1_5, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
+      //      (FlinkVersion.v1_6, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
+      //      (FlinkVersion.v1_7, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
+      // Note: It is not safe to restore savepoints created in a Scala applications with Flink
+      // version 1.7 or below. The reason is that up to version 1.7 the underlying Scala serializer
+      // used names of anonymous classes that depend on the relative position/order in code, e.g.,
+      // if two anonymous classes, instantiated inside the same class and from the same base class,
+      // change order in the code their names are switched.
+      // As a consequence, changes in code may result in restore failures.
+      // This was fixed in version 1.8, see: https://issues.apache.org/jira/browse/FLINK-10493
       (FlinkVersion.v1_8, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
-      (FlinkVersion.v1_8, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME),
       (FlinkVersion.v1_9, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
-      (FlinkVersion.v1_9, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME),
       (FlinkVersion.v1_10, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
-      (FlinkVersion.v1_10, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME),
       (FlinkVersion.v1_11, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
-      (FlinkVersion.v1_11, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME),
       (FlinkVersion.v1_12, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
-      (FlinkVersion.v1_12, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME),
       (FlinkVersion.v1_13, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
-      (FlinkVersion.v1_13, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME),
       (FlinkVersion.v1_14, StateBackendLoader.HASHMAP_STATE_BACKEND_NAME),
-      (FlinkVersion.v1_14, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME))
+      (FlinkVersion.v1_15, StateBackendLoader.HASHMAP_STATE_BACKEND_NAME),
+    )
+    if (executionMode == ExecutionMode.CREATE_SNAPSHOT) {
+      parameters = parameters.stream().filter(x => x._1 == currentVersion)
+        .collect(Collectors.toList())
+    }
+    parameters
   }
 
-  // TODO to generate savepoints for a specific Flink version / backend type,
-  // TODO change these values accordingly, e.g. to generate for 1.3 with RocksDB,
-  // TODO set as (FlinkVersion.v1_3, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME)
-  // TODO Note: You should generate the savepoint based on the release branch instead of the master.
-  val GENERATE_SAVEPOINT_VER: FlinkVersion = FlinkVersion.v1_14
-  val GENERATE_SAVEPOINT_BACKEND_TYPE: String = StateBackendLoader.HASHMAP_STATE_BACKEND_NAME
+  def getSnapshotPath(migrationVersionAndBackend: (FlinkVersion, String)): String = {
+    s"stateful-scala-with-broadcast-udf-migration-itcase" +
+      s"-flink${migrationVersionAndBackend._1}" +
+      s"-${migrationVersionAndBackend._2}-savepoint"
+  }
 
   val NUM_ELEMENTS = 4
 }
 
 /**
-  * ITCase for migration Scala state types across different Flink versions.
-  */
+ * ITCase for migration Scala state types across different Flink versions.
+ */
 @RunWith(classOf[Parameterized])
-class StatefulJobWBroadcastStateMigrationITCase(
-                                        migrationVersionAndBackend: (FlinkVersion, String))
-  extends SavepointMigrationTestBase with Serializable {
+class StatefulJobWBroadcastStateMigrationITCase(migrationVersionAndBackend: (FlinkVersion, String))
+  extends SnapshotMigrationTestBase with Serializable {
 
   @Test
-  @Ignore
-  def testCreateSavepointWithBroadcastState(): Unit = {
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
-
-    StatefulJobWBroadcastStateMigrationITCase.GENERATE_SAVEPOINT_BACKEND_TYPE match {
-      case StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME =>
-        env.setStateBackend(new EmbeddedRocksDBStateBackend())
-      case StateBackendLoader.MEMORY_STATE_BACKEND_NAME =>
-        env.setStateBackend(new MemoryStateBackend())
-      case StateBackendLoader.HASHMAP_STATE_BACKEND_NAME =>
-        env.setStateBackend(new HashMapStateBackend())
-      case _ => throw new UnsupportedOperationException
-    }
-    env.enableChangelogStateBackend(false)
-
-    lazy val firstBroadcastStateDesc = new MapStateDescriptor[Long, Long](
-      "broadcast-state-1",
-      BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]],
-      BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]])
-
-    lazy val secondBroadcastStateDesc = new MapStateDescriptor[String, String](
-      "broadcast-state-2",
-      BasicTypeInfo.STRING_TYPE_INFO,
-      BasicTypeInfo.STRING_TYPE_INFO)
-
-    env.setStateBackend(new MemoryStateBackend)
-    env.enableCheckpointing(500)
-    env.setParallelism(4)
-    env.setMaxParallelism(4)
-
-    val stream = env
-      .addSource(
-        new CheckpointedSource(4)).setMaxParallelism(1).uid("checkpointedSource")
-      .keyBy(
-        new KeySelector[(Long, Long), Long] {
-          override def getKey(value: (Long, Long)): Long = value._1
-        }
-      )
-      .flatMap(new StatefulFlatMapper)
-      .keyBy(
-        new KeySelector[(Long, Long), Long] {
-          override def getKey(value: (Long, Long)): Long = value._1
-        }
-      )
-
-    val broadcastStream = env
-      .addSource(
-        new CheckpointedSource(4)).setMaxParallelism(1).uid("checkpointedBroadcastSource")
-      .broadcast(firstBroadcastStateDesc, secondBroadcastStateDesc)
-
-    stream
-      .connect(broadcastStream)
-      .process(new TestBroadcastProcessFunction)
-      .addSink(new AccumulatorCountingSink)
-
-    executeAndSavepoint(
-      env,
-      s"src/test/resources/stateful-scala-with-broadcast" +
-        s"-udf-migration-itcase-flink" +
-        s"${StatefulJobWBroadcastStateMigrationITCase.GENERATE_SAVEPOINT_VER}" +
-        s"-${StatefulJobWBroadcastStateMigrationITCase.GENERATE_SAVEPOINT_BACKEND_TYPE}-savepoint",
-      new Tuple2(
-        AccumulatorCountingSink.NUM_ELEMENTS_ACCUMULATOR,
-        StatefulJobWBroadcastStateMigrationITCase.NUM_ELEMENTS
-      )
-    )
-  }
-
-  @Test
-  def testRestoreSavepointWithBroadcast(): Unit = {
+  def testSavepointWithBroadcast(): Unit = {
 
     val env = StreamExecutionEnvironment.getExecutionEnvironment
     env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
@@ -192,7 +133,6 @@ class StatefulJobWBroadcastStateMigrationITCase(
       BasicTypeInfo.STRING_TYPE_INFO,
       BasicTypeInfo.STRING_TYPE_INFO)
 
-    env.setStateBackend(new MemoryStateBackend)
     env.enableCheckpointing(500)
     env.setParallelism(4)
     env.setMaxParallelism(4)
@@ -217,26 +157,44 @@ class StatefulJobWBroadcastStateMigrationITCase(
         new CheckpointedSource(4)).setMaxParallelism(1).uid("checkpointedBroadcastSource")
       .broadcast(firstBroadcastStateDesc, secondBroadcastStateDesc)
 
-    val expectedFirstState: Map[Long, Long] =
-      Map(0L -> 0L, 1L -> 1L, 2L -> 2L, 3L -> 3L)
-    val expectedSecondState: Map[String, String] =
-      Map("0" -> "0", "1" -> "1", "2" -> "2", "3" -> "3")
-
-    stream
-      .connect(broadcastStream)
-      .process(new VerifyingBroadcastProcessFunction(expectedFirstState, expectedSecondState))
-      .addSink(new AccumulatorCountingSink)
-
-    restoreAndExecute(
-      env,
-      SavepointMigrationTestBase.getResourceFilename(
-        s"stateful-scala-with-broadcast" +
-          s"-udf-migration-itcase-flink${migrationVersionAndBackend._1}" +
-          s"-${migrationVersionAndBackend._2}-savepoint"),
-      new Tuple2(
-        AccumulatorCountingSink.NUM_ELEMENTS_ACCUMULATOR,
-        StatefulJobWBroadcastStateMigrationITCase.NUM_ELEMENTS)
-    )
+    if (StatefulJobWBroadcastStateMigrationITCase.executionMode == ExecutionMode.CREATE_SNAPSHOT) {
+      stream
+        .connect(broadcastStream)
+        .process(new TestBroadcastProcessFunction)
+        .addSink(new AccumulatorCountingSink)
+
+      executeAndSavepoint(
+        env,
+        s"src/test/resources/"
+          + StatefulJobWBroadcastStateMigrationITCase.getSnapshotPath(migrationVersionAndBackend),
+        new Tuple2(
+          AccumulatorCountingSink.NUM_ELEMENTS_ACCUMULATOR,
+          StatefulJobWBroadcastStateMigrationITCase.NUM_ELEMENTS
+        )
+      )
+    } else if (
+      StatefulJobWBroadcastStateMigrationITCase.executionMode == ExecutionMode.VERIFY_SNAPSHOT) {
+      val expectedFirstState: Map[Long, Long] =
+        Map(0L -> 0L, 1L -> 1L, 2L -> 2L, 3L -> 3L)
+      val expectedSecondState: Map[String, String] =
+        Map("0" -> "0", "1" -> "1", "2" -> "2", "3" -> "3")
+
+      stream
+        .connect(broadcastStream)
+        .process(new VerifyingBroadcastProcessFunction(expectedFirstState, expectedSecondState))
+        .addSink(new AccumulatorCountingSink)
+
+      restoreAndExecute(
+        env,
+        SnapshotMigrationTestBase.getResourceFilename(
+          StatefulJobWBroadcastStateMigrationITCase.getSnapshotPath(migrationVersionAndBackend)),
+        new Tuple2(
+          AccumulatorCountingSink.NUM_ELEMENTS_ACCUMULATOR,
+          StatefulJobWBroadcastStateMigrationITCase.NUM_ELEMENTS)
+      )
+    } else {
+      throw new UnsupportedOperationException("Unsupported execution mode.")
+    }
   }
 }