You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by dw...@apache.org on 2022/02/22 08:27:31 UTC
[flink] 02/03: [FLINK-26176] Fix scala savepoint migration tests
This is an automated email from the ASF dual-hosted git repository.
dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 9dd6043f4e474658cd6830a011d74f0147fbcc07
Author: Sebastian Mattheis <se...@ververica.com>
AuthorDate: Fri Feb 18 10:17:59 2022 +0100
[FLINK-26176] Fix scala savepoint migration tests
- Remove test parameters and artifacts for tests of rocksb state backend since artifacts are no
rocksdb savepoints but actually memory state savepoints
- Remove test parameters and artifacts for tests of savepoints with Flink version 1.7 or below
as those are not safe to restore due to bug FLINK-10493
---
.../_metadata | Bin 41481 -> 0 bytes
.../_metadata | Bin 41584 -> 0 bytes
.../_metadata | Bin 41436 -> 0 bytes
.../_metadata | Bin 41962 -> 0 bytes
.../_metadata | Bin 42002 -> 0 bytes
.../_metadata | Bin 213759 -> 0 bytes
.../_metadata | Bin 213759 -> 0 bytes
.../_metadata | Bin 204543 -> 0 bytes
.../_metadata | Bin 204543 -> 0 bytes
.../_metadata | Bin 205638 -> 0 bytes
.../_metadata | Bin 205638 -> 0 bytes
.../_metadata | Bin 229848 -> 0 bytes
.../_metadata | Bin 229848 -> 0 bytes
.../_metadata | Bin 39985 -> 0 bytes
.../_metadata | Bin 40081 -> 0 bytes
.../_metadata | Bin 53342 -> 0 bytes
.../_metadata | Bin 53419 -> 0 bytes
.../_metadata | Bin 53271 -> 0 bytes
.../_metadata | Bin 53733 -> 0 bytes
.../_metadata | Bin 53773 -> 0 bytes
.../_metadata | Bin 220470 -> 0 bytes
.../_metadata | Bin 220470 -> 0 bytes
.../_metadata | Bin 253956 -> 0 bytes
.../_metadata | Bin 253956 -> 0 bytes
.../_metadata | Bin 287008 -> 0 bytes
.../_metadata | Bin 287008 -> 0 bytes
.../_metadata | Bin 51846 -> 0 bytes
.../_metadata | Bin 51942 -> 0 bytes
.../StatefulJobSavepointMigrationITCase.scala | 153 +++++++---------
...StatefulJobWBroadcastStateMigrationITCase.scala | 198 ++++++++-------------
30 files changed, 144 insertions(+), 207 deletions(-)
diff --git a/flink-tests/src/test/resources/stateful-scala-udf-migration-itcase-flink1.10-rocksdb-savepoint/_metadata b/flink-tests/src/test/resources/stateful-scala-udf-migration-itcase-flink1.10-rocksdb-savepoint/_metadata
deleted file mode 100644
index d6ade08..0000000
Binary files a/flink-tests/src/test/resources/stateful-scala-udf-migration-itcase-flink1.10-rocksdb-savepoint/_metadata and /dev/null differ
diff --git a/flink-tests/src/test/resources/stateful-scala-udf-migration-itcase-flink1.11-rocksdb-savepoint/_metadata b/flink-tests/src/test/resources/stateful-scala-udf-migration-itcase-flink1.11-rocksdb-savepoint/_metadata
deleted file mode 100644
index 56d6d77..0000000
Binary files a/flink-tests/src/test/resources/stateful-scala-udf-migration-itcase-flink1.11-rocksdb-savepoint/_metadata and /dev/null differ
diff --git a/flink-tests/src/test/resources/stateful-scala-udf-migration-itcase-flink1.12-rocksdb-savepoint/_metadata b/flink-tests/src/test/resources/stateful-scala-udf-migration-itcase-flink1.12-rocksdb-savepoint/_metadata
deleted file mode 100644
index dd2ef62..0000000
Binary files a/flink-tests/src/test/resources/stateful-scala-udf-migration-itcase-flink1.12-rocksdb-savepoint/_metadata and /dev/null differ
diff --git a/flink-tests/src/test/resources/stateful-scala-udf-migration-itcase-flink1.13-rocksdb-savepoint/_metadata b/flink-tests/src/test/resources/stateful-scala-udf-migration-itcase-flink1.13-rocksdb-savepoint/_metadata
deleted file mode 100644
index 24636f5..0000000
Binary files a/flink-tests/src/test/resources/stateful-scala-udf-migration-itcase-flink1.13-rocksdb-savepoint/_metadata and /dev/null differ
diff --git a/flink-tests/src/test/resources/stateful-scala-udf-migration-itcase-flink1.14-rocksdb-savepoint/_metadata b/flink-tests/src/test/resources/stateful-scala-udf-migration-itcase-flink1.14-rocksdb-savepoint/_metadata
deleted file mode 100644
index fbbf888..0000000
Binary files a/flink-tests/src/test/resources/stateful-scala-udf-migration-itcase-flink1.14-rocksdb-savepoint/_metadata and /dev/null differ
diff --git a/flink-tests/src/test/resources/stateful-scala-udf-migration-itcase-flink1.3-jobmanager-savepoint/_metadata b/flink-tests/src/test/resources/stateful-scala-udf-migration-itcase-flink1.3-jobmanager-savepoint/_metadata
deleted file mode 100644
index e0c6da6..0000000
Binary files a/flink-tests/src/test/resources/stateful-scala-udf-migration-itcase-flink1.3-jobmanager-savepoint/_metadata and /dev/null differ
diff --git a/flink-tests/src/test/resources/stateful-scala-udf-migration-itcase-flink1.3-rocksdb-savepoint/_metadata b/flink-tests/src/test/resources/stateful-scala-udf-migration-itcase-flink1.3-rocksdb-savepoint/_metadata
deleted file mode 100644
index f719eac..0000000
Binary files a/flink-tests/src/test/resources/stateful-scala-udf-migration-itcase-flink1.3-rocksdb-savepoint/_metadata and /dev/null differ
diff --git a/flink-tests/src/test/resources/stateful-scala-udf-migration-itcase-flink1.4-jobmanager-savepoint/_metadata b/flink-tests/src/test/resources/stateful-scala-udf-migration-itcase-flink1.4-jobmanager-savepoint/_metadata
deleted file mode 100644
index 69aa8d4..0000000
Binary files a/flink-tests/src/test/resources/stateful-scala-udf-migration-itcase-flink1.4-jobmanager-savepoint/_metadata and /dev/null differ
diff --git a/flink-tests/src/test/resources/stateful-scala-udf-migration-itcase-flink1.4-rocksdb-savepoint/_metadata b/flink-tests/src/test/resources/stateful-scala-udf-migration-itcase-flink1.4-rocksdb-savepoint/_metadata
deleted file mode 100644
index 7dcef9c..0000000
Binary files a/flink-tests/src/test/resources/stateful-scala-udf-migration-itcase-flink1.4-rocksdb-savepoint/_metadata and /dev/null differ
diff --git a/flink-tests/src/test/resources/stateful-scala-udf-migration-itcase-flink1.6-jobmanager-savepoint/_metadata b/flink-tests/src/test/resources/stateful-scala-udf-migration-itcase-flink1.6-jobmanager-savepoint/_metadata
deleted file mode 100644
index 55243f3..0000000
Binary files a/flink-tests/src/test/resources/stateful-scala-udf-migration-itcase-flink1.6-jobmanager-savepoint/_metadata and /dev/null differ
diff --git a/flink-tests/src/test/resources/stateful-scala-udf-migration-itcase-flink1.6-rocksdb-savepoint/_metadata b/flink-tests/src/test/resources/stateful-scala-udf-migration-itcase-flink1.6-rocksdb-savepoint/_metadata
deleted file mode 100644
index defe3b1..0000000
Binary files a/flink-tests/src/test/resources/stateful-scala-udf-migration-itcase-flink1.6-rocksdb-savepoint/_metadata and /dev/null differ
diff --git a/flink-tests/src/test/resources/stateful-scala-udf-migration-itcase-flink1.7-jobmanager-savepoint/_metadata b/flink-tests/src/test/resources/stateful-scala-udf-migration-itcase-flink1.7-jobmanager-savepoint/_metadata
deleted file mode 100644
index b421e64..0000000
Binary files a/flink-tests/src/test/resources/stateful-scala-udf-migration-itcase-flink1.7-jobmanager-savepoint/_metadata and /dev/null differ
diff --git a/flink-tests/src/test/resources/stateful-scala-udf-migration-itcase-flink1.7-rocksdb-savepoint/_metadata b/flink-tests/src/test/resources/stateful-scala-udf-migration-itcase-flink1.7-rocksdb-savepoint/_metadata
deleted file mode 100644
index 53e493c..0000000
Binary files a/flink-tests/src/test/resources/stateful-scala-udf-migration-itcase-flink1.7-rocksdb-savepoint/_metadata and /dev/null differ
diff --git a/flink-tests/src/test/resources/stateful-scala-udf-migration-itcase-flink1.8-rocksdb-savepoint/_metadata b/flink-tests/src/test/resources/stateful-scala-udf-migration-itcase-flink1.8-rocksdb-savepoint/_metadata
deleted file mode 100644
index 3f41ed6..0000000
Binary files a/flink-tests/src/test/resources/stateful-scala-udf-migration-itcase-flink1.8-rocksdb-savepoint/_metadata and /dev/null differ
diff --git a/flink-tests/src/test/resources/stateful-scala-udf-migration-itcase-flink1.9-rocksdb-savepoint/_metadata b/flink-tests/src/test/resources/stateful-scala-udf-migration-itcase-flink1.9-rocksdb-savepoint/_metadata
deleted file mode 100644
index 2c191ef..0000000
Binary files a/flink-tests/src/test/resources/stateful-scala-udf-migration-itcase-flink1.9-rocksdb-savepoint/_metadata and /dev/null differ
diff --git a/flink-tests/src/test/resources/stateful-scala-with-broadcast-udf-migration-itcase-flink1.10-rocksdb-savepoint/_metadata b/flink-tests/src/test/resources/stateful-scala-with-broadcast-udf-migration-itcase-flink1.10-rocksdb-savepoint/_metadata
deleted file mode 100644
index e558ab9..0000000
Binary files a/flink-tests/src/test/resources/stateful-scala-with-broadcast-udf-migration-itcase-flink1.10-rocksdb-savepoint/_metadata and /dev/null differ
diff --git a/flink-tests/src/test/resources/stateful-scala-with-broadcast-udf-migration-itcase-flink1.11-rocksdb-savepoint/_metadata b/flink-tests/src/test/resources/stateful-scala-with-broadcast-udf-migration-itcase-flink1.11-rocksdb-savepoint/_metadata
deleted file mode 100644
index b4ffaa1..0000000
Binary files a/flink-tests/src/test/resources/stateful-scala-with-broadcast-udf-migration-itcase-flink1.11-rocksdb-savepoint/_metadata and /dev/null differ
diff --git a/flink-tests/src/test/resources/stateful-scala-with-broadcast-udf-migration-itcase-flink1.12-rocksdb-savepoint/_metadata b/flink-tests/src/test/resources/stateful-scala-with-broadcast-udf-migration-itcase-flink1.12-rocksdb-savepoint/_metadata
deleted file mode 100644
index e36c884..0000000
Binary files a/flink-tests/src/test/resources/stateful-scala-with-broadcast-udf-migration-itcase-flink1.12-rocksdb-savepoint/_metadata and /dev/null differ
diff --git a/flink-tests/src/test/resources/stateful-scala-with-broadcast-udf-migration-itcase-flink1.13-rocksdb-savepoint/_metadata b/flink-tests/src/test/resources/stateful-scala-with-broadcast-udf-migration-itcase-flink1.13-rocksdb-savepoint/_metadata
deleted file mode 100644
index 0615aa8..0000000
Binary files a/flink-tests/src/test/resources/stateful-scala-with-broadcast-udf-migration-itcase-flink1.13-rocksdb-savepoint/_metadata and /dev/null differ
diff --git a/flink-tests/src/test/resources/stateful-scala-with-broadcast-udf-migration-itcase-flink1.14-rocksdb-savepoint/_metadata b/flink-tests/src/test/resources/stateful-scala-with-broadcast-udf-migration-itcase-flink1.14-rocksdb-savepoint/_metadata
deleted file mode 100644
index 7436fca..0000000
Binary files a/flink-tests/src/test/resources/stateful-scala-with-broadcast-udf-migration-itcase-flink1.14-rocksdb-savepoint/_metadata and /dev/null differ
diff --git a/flink-tests/src/test/resources/stateful-scala-with-broadcast-udf-migration-itcase-flink1.5-jobmanager-savepoint/_metadata b/flink-tests/src/test/resources/stateful-scala-with-broadcast-udf-migration-itcase-flink1.5-jobmanager-savepoint/_metadata
deleted file mode 100644
index f845ee3..0000000
Binary files a/flink-tests/src/test/resources/stateful-scala-with-broadcast-udf-migration-itcase-flink1.5-jobmanager-savepoint/_metadata and /dev/null differ
diff --git a/flink-tests/src/test/resources/stateful-scala-with-broadcast-udf-migration-itcase-flink1.5-rocksdb-savepoint/_metadata b/flink-tests/src/test/resources/stateful-scala-with-broadcast-udf-migration-itcase-flink1.5-rocksdb-savepoint/_metadata
deleted file mode 100644
index 6b05ef9..0000000
Binary files a/flink-tests/src/test/resources/stateful-scala-with-broadcast-udf-migration-itcase-flink1.5-rocksdb-savepoint/_metadata and /dev/null differ
diff --git a/flink-tests/src/test/resources/stateful-scala-with-broadcast-udf-migration-itcase-flink1.6-jobmanager-savepoint/_metadata b/flink-tests/src/test/resources/stateful-scala-with-broadcast-udf-migration-itcase-flink1.6-jobmanager-savepoint/_metadata
deleted file mode 100644
index b64e810..0000000
Binary files a/flink-tests/src/test/resources/stateful-scala-with-broadcast-udf-migration-itcase-flink1.6-jobmanager-savepoint/_metadata and /dev/null differ
diff --git a/flink-tests/src/test/resources/stateful-scala-with-broadcast-udf-migration-itcase-flink1.6-rocksdb-savepoint/_metadata b/flink-tests/src/test/resources/stateful-scala-with-broadcast-udf-migration-itcase-flink1.6-rocksdb-savepoint/_metadata
deleted file mode 100644
index 59bebc9..0000000
Binary files a/flink-tests/src/test/resources/stateful-scala-with-broadcast-udf-migration-itcase-flink1.6-rocksdb-savepoint/_metadata and /dev/null differ
diff --git a/flink-tests/src/test/resources/stateful-scala-with-broadcast-udf-migration-itcase-flink1.7-jobmanager-savepoint/_metadata b/flink-tests/src/test/resources/stateful-scala-with-broadcast-udf-migration-itcase-flink1.7-jobmanager-savepoint/_metadata
deleted file mode 100644
index 88cee1b..0000000
Binary files a/flink-tests/src/test/resources/stateful-scala-with-broadcast-udf-migration-itcase-flink1.7-jobmanager-savepoint/_metadata and /dev/null differ
diff --git a/flink-tests/src/test/resources/stateful-scala-with-broadcast-udf-migration-itcase-flink1.7-rocksdb-savepoint/_metadata b/flink-tests/src/test/resources/stateful-scala-with-broadcast-udf-migration-itcase-flink1.7-rocksdb-savepoint/_metadata
deleted file mode 100644
index e4b058a..0000000
Binary files a/flink-tests/src/test/resources/stateful-scala-with-broadcast-udf-migration-itcase-flink1.7-rocksdb-savepoint/_metadata and /dev/null differ
diff --git a/flink-tests/src/test/resources/stateful-scala-with-broadcast-udf-migration-itcase-flink1.8-rocksdb-savepoint/_metadata b/flink-tests/src/test/resources/stateful-scala-with-broadcast-udf-migration-itcase-flink1.8-rocksdb-savepoint/_metadata
deleted file mode 100644
index f2870a4..0000000
Binary files a/flink-tests/src/test/resources/stateful-scala-with-broadcast-udf-migration-itcase-flink1.8-rocksdb-savepoint/_metadata and /dev/null differ
diff --git a/flink-tests/src/test/resources/stateful-scala-with-broadcast-udf-migration-itcase-flink1.9-rocksdb-savepoint/_metadata b/flink-tests/src/test/resources/stateful-scala-with-broadcast-udf-migration-itcase-flink1.9-rocksdb-savepoint/_metadata
deleted file mode 100644
index 5f20171..0000000
Binary files a/flink-tests/src/test/resources/stateful-scala-with-broadcast-udf-migration-itcase-flink1.9-rocksdb-savepoint/_metadata and /dev/null differ
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/migration/StatefulJobSavepointMigrationITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/migration/StatefulJobSavepointMigrationITCase.scala
index c68abf3..3ff91ee 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/migration/StatefulJobSavepointMigrationITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/migration/StatefulJobSavepointMigrationITCase.scala
@@ -35,56 +35,67 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction
import org.apache.flink.streaming.api.functions.source.SourceFunction
import org.apache.flink.streaming.api.watermark.Watermark
-import org.apache.flink.test.checkpointing.utils.SavepointMigrationTestBase
+import org.apache.flink.test.checkpointing.utils.SnapshotMigrationTestBase
import org.apache.flink.util.Collector
import org.apache.flink.api.java.tuple.Tuple2
import org.apache.flink.runtime.state.{FunctionInitializationContext, FunctionSnapshotContext, StateBackendLoader}
import org.apache.flink.api.scala._
import org.apache.flink.api.scala.migration.CustomEnum.CustomEnum
-
+import org.apache.flink.test.checkpointing.utils.SnapshotMigrationTestBase.ExecutionMode
import org.junit.runner.RunWith
import org.junit.runners.Parameterized
-import org.junit.{Ignore, Test}
+import org.junit.Test
+import java.util.stream.Collectors
import scala.util.{Failure, Try}
object StatefulJobSavepointMigrationITCase {
+ // TODO increase this to newer version to create and test snapshot migration for newer versions
+ val currentVersion = FlinkVersion.v1_14
+
+ // TODO change this to CREATE_SNAPSHOT to (re)create binary snapshots
+ // TODO Note: You should generate the snapshot based on the release branch instead of the
+ // master.
+ val executionMode = ExecutionMode.VERIFY_SNAPSHOT
+
@Parameterized.Parameters(name = "Migrate Savepoint / Backend: {0}")
def parameters: util.Collection[(FlinkVersion, String)] = {
- util.Arrays.asList(
- (FlinkVersion.v1_3, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
- (FlinkVersion.v1_3, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME),
- (FlinkVersion.v1_4, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
- (FlinkVersion.v1_4, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME),
- (FlinkVersion.v1_6, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
- (FlinkVersion.v1_6, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME),
- (FlinkVersion.v1_7, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
- (FlinkVersion.v1_7, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME),
+ var parameters = util.Arrays.asList(
+ // (FlinkVersion.v1_3, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
+ // (FlinkVersion.v1_4, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
+ // (FlinkVersion.v1_6, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
+ // (FlinkVersion.v1_7, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
+ // Note: It is not safe to restore savepoints created in a Scala applications with Flink
+ // version 1.7 or below. The reason is that up to version 1.7 the underlying Scala serializer
+ // used names of anonymous classes that depend on the relative position/order in code, e.g.,
+ // if two anonymous classes, instantiated inside the same class and from the same base class,
+ // change order in the code their names are switched.
+ // As a consequence, changes in code may result in restore failures.
+ // This was fixed in version 1.8, see: https://issues.apache.org/jira/browse/FLINK-10493
(FlinkVersion.v1_8, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
- (FlinkVersion.v1_8, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME),
(FlinkVersion.v1_9, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
- (FlinkVersion.v1_9, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME),
(FlinkVersion.v1_10, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
- (FlinkVersion.v1_10, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME),
(FlinkVersion.v1_11, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
- (FlinkVersion.v1_11, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME),
(FlinkVersion.v1_12, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
- (FlinkVersion.v1_12, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME),
(FlinkVersion.v1_13, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
- (FlinkVersion.v1_13, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME),
(FlinkVersion.v1_14, StateBackendLoader.HASHMAP_STATE_BACKEND_NAME),
- (FlinkVersion.v1_14, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME))
+ (FlinkVersion.v1_15, StateBackendLoader.HASHMAP_STATE_BACKEND_NAME),
+ )
+ if (executionMode == ExecutionMode.CREATE_SNAPSHOT) {
+ parameters = parameters.stream().filter(x => x._1 == currentVersion)
+ .collect(Collectors.toList())
+ }
+ parameters
}
- // TODO to generate savepoints for a specific Flink version / backend type,
- // TODO change these values accordingly, e.g. to generate for 1.3 with RocksDB,
- // TODO set as (FlinkVersion.v1_3, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME)
- // TODO Note: You should generate the savepoint based on the release branch instead of the master.
- val GENERATE_SAVEPOINT_VER: FlinkVersion = FlinkVersion.v1_14
- val GENERATE_SAVEPOINT_BACKEND_TYPE: String = StateBackendLoader.HASHMAP_STATE_BACKEND_NAME
-
val NUM_ELEMENTS = 4
+
+ def getSnapshotPath(migrationVersionAndBackend: (FlinkVersion, String)): String = {
+ s"stateful-scala-udf-migration-itcase" +
+ s"-flink${migrationVersionAndBackend._1}" +
+ s"-${migrationVersionAndBackend._2}-savepoint"
+ }
}
/**
@@ -92,16 +103,15 @@ object StatefulJobSavepointMigrationITCase {
*/
@RunWith(classOf[Parameterized])
class StatefulJobSavepointMigrationITCase(
- migrationVersionAndBackend: (FlinkVersion, String))
- extends SavepointMigrationTestBase with Serializable {
+ migrationVersionAndBackend: (FlinkVersion, String))
+ extends SnapshotMigrationTestBase with Serializable {
- @Ignore
@Test
- def testCreateSavepoint(): Unit = {
+ def testSavepoint(): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
- StatefulJobSavepointMigrationITCase.GENERATE_SAVEPOINT_BACKEND_TYPE match {
+ migrationVersionAndBackend._2 match {
case StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME =>
env.setStateBackend(new EmbeddedRocksDBStateBackend())
case StateBackendLoader.MEMORY_STATE_BACKEND_NAME =>
@@ -111,7 +121,7 @@ class StatefulJobSavepointMigrationITCase(
case _ => throw new UnsupportedOperationException
}
- env.setStateBackend(new MemoryStateBackend)
+ env.enableChangelogStateBackend(false)
env.enableCheckpointing(500)
env.setParallelism(4)
env.setMaxParallelism(4)
@@ -127,60 +137,29 @@ class StatefulJobSavepointMigrationITCase(
.flatMap(new StatefulFlatMapper)
.addSink(new AccumulatorCountingSink)
- executeAndSavepoint(
- env,
- s"src/test/resources/stateful-scala-udf-migration-itcase-flink" +
- s"${StatefulJobSavepointMigrationITCase.GENERATE_SAVEPOINT_VER}" +
- s"-${StatefulJobSavepointMigrationITCase.GENERATE_SAVEPOINT_BACKEND_TYPE}-savepoint",
- new Tuple2(
- AccumulatorCountingSink.NUM_ELEMENTS_ACCUMULATOR,
- StatefulJobSavepointMigrationITCase.NUM_ELEMENTS
+ if (StatefulJobSavepointMigrationITCase.executionMode == ExecutionMode.CREATE_SNAPSHOT) {
+ executeAndSavepoint(
+ env,
+ s"src/test/resources/"
+ + StatefulJobSavepointMigrationITCase.getSnapshotPath(migrationVersionAndBackend),
+ new Tuple2(
+ AccumulatorCountingSink.NUM_ELEMENTS_ACCUMULATOR,
+ StatefulJobSavepointMigrationITCase.NUM_ELEMENTS
+ )
)
- )
- }
-
- @Test
- def testRestoreSavepoint(): Unit = {
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
-
- migrationVersionAndBackend._2 match {
- case StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME =>
- env.setStateBackend(new EmbeddedRocksDBStateBackend())
- case StateBackendLoader.MEMORY_STATE_BACKEND_NAME =>
- env.setStateBackend(new MemoryStateBackend())
- case StateBackendLoader.HASHMAP_STATE_BACKEND_NAME =>
- env.setStateBackend(new HashMapStateBackend())
- case _ => throw new UnsupportedOperationException
- }
- env.enableChangelogStateBackend(false);
-
- env.setStateBackend(new MemoryStateBackend)
- env.enableCheckpointing(500)
- env.setParallelism(4)
- env.setMaxParallelism(4)
-
- env
- .addSource(
- new CheckpointedSource(4)).setMaxParallelism(1).uid("checkpointedSource")
- .keyBy(
- new KeySelector[(Long, Long), Long] {
- override def getKey(value: (Long, Long)): Long = value._1
- }
+ } else if (
+ StatefulJobSavepointMigrationITCase.executionMode == ExecutionMode.VERIFY_SNAPSHOT) {
+ restoreAndExecute(
+ env,
+ SnapshotMigrationTestBase.getResourceFilename(
+ StatefulJobSavepointMigrationITCase.getSnapshotPath(migrationVersionAndBackend)),
+ new Tuple2(
+ AccumulatorCountingSink.NUM_ELEMENTS_ACCUMULATOR,
+ StatefulJobSavepointMigrationITCase.NUM_ELEMENTS)
)
- .flatMap(new StatefulFlatMapper)
- .addSink(new AccumulatorCountingSink)
-
- restoreAndExecute(
- env,
- SavepointMigrationTestBase.getResourceFilename(
- s"stateful-scala" +
- s"-udf-migration-itcase-flink${migrationVersionAndBackend._1}" +
- s"-${migrationVersionAndBackend._2}-savepoint"),
- new Tuple2(
- AccumulatorCountingSink.NUM_ELEMENTS_ACCUMULATOR,
- StatefulJobSavepointMigrationITCase.NUM_ELEMENTS)
- )
+ } else {
+ throw new UnsupportedOperationException("Unsupported execution mode.")
+ }
}
@SerialVersionUID(1L)
@@ -190,7 +169,7 @@ class StatefulJobSavepointMigrationITCase(
@SerialVersionUID(1L)
private class CheckpointedSource(val numElements: Int)
- extends SourceFunction[(Long, Long)] with CheckpointedFunction {
+ extends SourceFunction[(Long, Long)] with CheckpointedFunction {
private var isRunning = true
private var state: ListState[CustomCaseClass] = _
@@ -201,8 +180,8 @@ class StatefulJobSavepointMigrationITCase(
ctx.getCheckpointLock synchronized {
var i = 0
while (i < numElements) {
- ctx.collect(i, i)
- i += 1
+ ctx.collect(i, i)
+ i += 1
}
}
// don't emit a final watermark so that we don't trigger the registered event-time
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/migration/StatefulJobWBroadcastStateMigrationITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/migration/StatefulJobWBroadcastStateMigrationITCase.scala
index 78e646e..47dc3ae 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/migration/StatefulJobWBroadcastStateMigrationITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/migration/StatefulJobWBroadcastStateMigrationITCase.scala
@@ -41,132 +41,73 @@ import org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction
import org.apache.flink.streaming.api.functions.source.SourceFunction
import org.apache.flink.streaming.api.watermark.Watermark
-import org.apache.flink.test.checkpointing.utils.SavepointMigrationTestBase
+import org.apache.flink.test.checkpointing.utils.SnapshotMigrationTestBase
+import org.apache.flink.test.checkpointing.utils.SnapshotMigrationTestBase.ExecutionMode
import org.apache.flink.util.Collector
-
import org.junit.runner.RunWith
import org.junit.runners.Parameterized
-import org.junit.{Assert, Ignore, Test}
+import org.junit.{Assert, Test}
+import java.util.stream.Collectors
import scala.util.{Failure, Try}
object StatefulJobWBroadcastStateMigrationITCase {
+ // TODO increase this to newer version to create and test snapshot migration for newer versions
+ val currentVersion = FlinkVersion.v1_14
+
+ // TODO change this to CREATE_SNAPSHOT to (re)create binary snapshots
+ // TODO Note: You should generate the snapshot based on the release branch instead of the
+ // master.
+ val executionMode = ExecutionMode.VERIFY_SNAPSHOT
+
@Parameterized.Parameters(name = "Migrate Savepoint / Backend: {0}")
def parameters: util.Collection[(FlinkVersion, String)] = {
- util.Arrays.asList(
- (FlinkVersion.v1_5, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
- (FlinkVersion.v1_5, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME),
- (FlinkVersion.v1_6, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
- (FlinkVersion.v1_6, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME),
- (FlinkVersion.v1_7, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
- (FlinkVersion.v1_7, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME),
+ var parameters = util.Arrays.asList(
+ // (FlinkVersion.v1_5, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
+ // (FlinkVersion.v1_6, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
+ // (FlinkVersion.v1_7, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
+ // Note: It is not safe to restore savepoints created in a Scala applications with Flink
+ // version 1.7 or below. The reason is that up to version 1.7 the underlying Scala serializer
+ // used names of anonymous classes that depend on the relative position/order in code, e.g.,
+ // if two anonymous classes, instantiated inside the same class and from the same base class,
+ // change order in the code their names are switched.
+ // As a consequence, changes in code may result in restore failures.
+ // This was fixed in version 1.8, see: https://issues.apache.org/jira/browse/FLINK-10493
(FlinkVersion.v1_8, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
- (FlinkVersion.v1_8, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME),
(FlinkVersion.v1_9, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
- (FlinkVersion.v1_9, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME),
(FlinkVersion.v1_10, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
- (FlinkVersion.v1_10, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME),
(FlinkVersion.v1_11, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
- (FlinkVersion.v1_11, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME),
(FlinkVersion.v1_12, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
- (FlinkVersion.v1_12, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME),
(FlinkVersion.v1_13, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
- (FlinkVersion.v1_13, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME),
(FlinkVersion.v1_14, StateBackendLoader.HASHMAP_STATE_BACKEND_NAME),
- (FlinkVersion.v1_14, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME))
+ (FlinkVersion.v1_15, StateBackendLoader.HASHMAP_STATE_BACKEND_NAME),
+ )
+ if (executionMode == ExecutionMode.CREATE_SNAPSHOT) {
+ parameters = parameters.stream().filter(x => x._1 == currentVersion)
+ .collect(Collectors.toList())
+ }
+ parameters
}
- // TODO to generate savepoints for a specific Flink version / backend type,
- // TODO change these values accordingly, e.g. to generate for 1.3 with RocksDB,
- // TODO set as (FlinkVersion.v1_3, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME)
- // TODO Note: You should generate the savepoint based on the release branch instead of the master.
- val GENERATE_SAVEPOINT_VER: FlinkVersion = FlinkVersion.v1_14
- val GENERATE_SAVEPOINT_BACKEND_TYPE: String = StateBackendLoader.HASHMAP_STATE_BACKEND_NAME
+ def getSnapshotPath(migrationVersionAndBackend: (FlinkVersion, String)): String = {
+ s"stateful-scala-with-broadcast-udf-migration-itcase" +
+ s"-flink${migrationVersionAndBackend._1}" +
+ s"-${migrationVersionAndBackend._2}-savepoint"
+ }
val NUM_ELEMENTS = 4
}
/**
- * ITCase for migration Scala state types across different Flink versions.
- */
+ * ITCase for migration Scala state types across different Flink versions.
+ */
@RunWith(classOf[Parameterized])
-class StatefulJobWBroadcastStateMigrationITCase(
- migrationVersionAndBackend: (FlinkVersion, String))
- extends SavepointMigrationTestBase with Serializable {
+class StatefulJobWBroadcastStateMigrationITCase(migrationVersionAndBackend: (FlinkVersion, String))
+ extends SnapshotMigrationTestBase with Serializable {
@Test
- @Ignore
- def testCreateSavepointWithBroadcastState(): Unit = {
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
-
- StatefulJobWBroadcastStateMigrationITCase.GENERATE_SAVEPOINT_BACKEND_TYPE match {
- case StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME =>
- env.setStateBackend(new EmbeddedRocksDBStateBackend())
- case StateBackendLoader.MEMORY_STATE_BACKEND_NAME =>
- env.setStateBackend(new MemoryStateBackend())
- case StateBackendLoader.HASHMAP_STATE_BACKEND_NAME =>
- env.setStateBackend(new HashMapStateBackend())
- case _ => throw new UnsupportedOperationException
- }
- env.enableChangelogStateBackend(false)
-
- lazy val firstBroadcastStateDesc = new MapStateDescriptor[Long, Long](
- "broadcast-state-1",
- BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]],
- BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]])
-
- lazy val secondBroadcastStateDesc = new MapStateDescriptor[String, String](
- "broadcast-state-2",
- BasicTypeInfo.STRING_TYPE_INFO,
- BasicTypeInfo.STRING_TYPE_INFO)
-
- env.setStateBackend(new MemoryStateBackend)
- env.enableCheckpointing(500)
- env.setParallelism(4)
- env.setMaxParallelism(4)
-
- val stream = env
- .addSource(
- new CheckpointedSource(4)).setMaxParallelism(1).uid("checkpointedSource")
- .keyBy(
- new KeySelector[(Long, Long), Long] {
- override def getKey(value: (Long, Long)): Long = value._1
- }
- )
- .flatMap(new StatefulFlatMapper)
- .keyBy(
- new KeySelector[(Long, Long), Long] {
- override def getKey(value: (Long, Long)): Long = value._1
- }
- )
-
- val broadcastStream = env
- .addSource(
- new CheckpointedSource(4)).setMaxParallelism(1).uid("checkpointedBroadcastSource")
- .broadcast(firstBroadcastStateDesc, secondBroadcastStateDesc)
-
- stream
- .connect(broadcastStream)
- .process(new TestBroadcastProcessFunction)
- .addSink(new AccumulatorCountingSink)
-
- executeAndSavepoint(
- env,
- s"src/test/resources/stateful-scala-with-broadcast" +
- s"-udf-migration-itcase-flink" +
- s"${StatefulJobWBroadcastStateMigrationITCase.GENERATE_SAVEPOINT_VER}" +
- s"-${StatefulJobWBroadcastStateMigrationITCase.GENERATE_SAVEPOINT_BACKEND_TYPE}-savepoint",
- new Tuple2(
- AccumulatorCountingSink.NUM_ELEMENTS_ACCUMULATOR,
- StatefulJobWBroadcastStateMigrationITCase.NUM_ELEMENTS
- )
- )
- }
-
- @Test
- def testRestoreSavepointWithBroadcast(): Unit = {
+ def testSavepointWithBroadcast(): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
@@ -192,7 +133,6 @@ class StatefulJobWBroadcastStateMigrationITCase(
BasicTypeInfo.STRING_TYPE_INFO,
BasicTypeInfo.STRING_TYPE_INFO)
- env.setStateBackend(new MemoryStateBackend)
env.enableCheckpointing(500)
env.setParallelism(4)
env.setMaxParallelism(4)
@@ -217,26 +157,44 @@ class StatefulJobWBroadcastStateMigrationITCase(
new CheckpointedSource(4)).setMaxParallelism(1).uid("checkpointedBroadcastSource")
.broadcast(firstBroadcastStateDesc, secondBroadcastStateDesc)
- val expectedFirstState: Map[Long, Long] =
- Map(0L -> 0L, 1L -> 1L, 2L -> 2L, 3L -> 3L)
- val expectedSecondState: Map[String, String] =
- Map("0" -> "0", "1" -> "1", "2" -> "2", "3" -> "3")
-
- stream
- .connect(broadcastStream)
- .process(new VerifyingBroadcastProcessFunction(expectedFirstState, expectedSecondState))
- .addSink(new AccumulatorCountingSink)
-
- restoreAndExecute(
- env,
- SavepointMigrationTestBase.getResourceFilename(
- s"stateful-scala-with-broadcast" +
- s"-udf-migration-itcase-flink${migrationVersionAndBackend._1}" +
- s"-${migrationVersionAndBackend._2}-savepoint"),
- new Tuple2(
- AccumulatorCountingSink.NUM_ELEMENTS_ACCUMULATOR,
- StatefulJobWBroadcastStateMigrationITCase.NUM_ELEMENTS)
- )
+ if (StatefulJobWBroadcastStateMigrationITCase.executionMode == ExecutionMode.CREATE_SNAPSHOT) {
+ stream
+ .connect(broadcastStream)
+ .process(new TestBroadcastProcessFunction)
+ .addSink(new AccumulatorCountingSink)
+
+ executeAndSavepoint(
+ env,
+ s"src/test/resources/"
+ + StatefulJobWBroadcastStateMigrationITCase.getSnapshotPath(migrationVersionAndBackend),
+ new Tuple2(
+ AccumulatorCountingSink.NUM_ELEMENTS_ACCUMULATOR,
+ StatefulJobWBroadcastStateMigrationITCase.NUM_ELEMENTS
+ )
+ )
+ } else if (
+ StatefulJobWBroadcastStateMigrationITCase.executionMode == ExecutionMode.VERIFY_SNAPSHOT) {
+ val expectedFirstState: Map[Long, Long] =
+ Map(0L -> 0L, 1L -> 1L, 2L -> 2L, 3L -> 3L)
+ val expectedSecondState: Map[String, String] =
+ Map("0" -> "0", "1" -> "1", "2" -> "2", "3" -> "3")
+
+ stream
+ .connect(broadcastStream)
+ .process(new VerifyingBroadcastProcessFunction(expectedFirstState, expectedSecondState))
+ .addSink(new AccumulatorCountingSink)
+
+ restoreAndExecute(
+ env,
+ SnapshotMigrationTestBase.getResourceFilename(
+ StatefulJobWBroadcastStateMigrationITCase.getSnapshotPath(migrationVersionAndBackend)),
+ new Tuple2(
+ AccumulatorCountingSink.NUM_ELEMENTS_ACCUMULATOR,
+ StatefulJobWBroadcastStateMigrationITCase.NUM_ELEMENTS)
+ )
+ } else {
+ throw new UnsupportedOperationException("Unsupported execution mode.")
+ }
}
}