You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by dw...@apache.org on 2022/02/22 08:27:32 UTC
[flink] 03/03: [FLINK-26146] Adapt scala tests to cover native snapshots migration
This is an automated email from the ASF dual-hosted git repository.
dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit c88189f0245630b7b16e7e5dd63b8b7fbaecd92a
Author: Sebastian Mattheis <se...@ververica.com>
AuthorDate: Fri Feb 18 10:18:03 2022 +0100
[FLINK-26146] Adapt scala tests to cover native snapshots migration
Adapt scala savepoint migration tests to cover native savepoints and checkpoints.
This closes #18850
---
.../utils/SnapshotMigrationTestBase.java | 11 ---
.../StatefulJobSavepointMigrationITCase.scala | 103 ++++++++++++++-------
...StatefulJobWBroadcastStateMigrationITCase.scala | 101 +++++++++++++-------
3 files changed, 136 insertions(+), 79 deletions(-)
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SnapshotMigrationTestBase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SnapshotMigrationTestBase.java
index 57a4894..2cb6b6a 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SnapshotMigrationTestBase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SnapshotMigrationTestBase.java
@@ -257,17 +257,6 @@ public abstract class SnapshotMigrationTestBase extends TestBaseUtils {
return config;
}
- @Deprecated
- @SafeVarargs
- protected final void executeAndSavepoint(
- StreamExecutionEnvironment env,
- String snapshotPath,
- Tuple2<String, Integer>... expectedAccumulators)
- throws Exception {
- executeAndSnapshot(
- env, snapshotPath, SnapshotType.SAVEPOINT_CANONICAL, expectedAccumulators);
- }
-
@SafeVarargs
protected final void executeAndSnapshot(
StreamExecutionEnvironment env,
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/migration/StatefulJobSavepointMigrationITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/migration/StatefulJobSavepointMigrationITCase.scala
index 3ff91ee..cf88f39 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/migration/StatefulJobSavepointMigrationITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/migration/StatefulJobSavepointMigrationITCase.scala
@@ -41,7 +41,7 @@ import org.apache.flink.api.java.tuple.Tuple2
import org.apache.flink.runtime.state.{FunctionInitializationContext, FunctionSnapshotContext, StateBackendLoader}
import org.apache.flink.api.scala._
import org.apache.flink.api.scala.migration.CustomEnum.CustomEnum
-import org.apache.flink.test.checkpointing.utils.SnapshotMigrationTestBase.ExecutionMode
+import org.apache.flink.test.checkpointing.utils.SnapshotMigrationTestBase.{ExecutionMode, SnapshotSpec, SnapshotType}
import org.junit.runner.RunWith
import org.junit.runners.Parameterized
import org.junit.Test
@@ -59,31 +59,54 @@ object StatefulJobSavepointMigrationITCase {
// master.
val executionMode = ExecutionMode.VERIFY_SNAPSHOT
- @Parameterized.Parameters(name = "Migrate Savepoint / Backend: {0}")
- def parameters: util.Collection[(FlinkVersion, String)] = {
- var parameters = util.Arrays.asList(
- // (FlinkVersion.v1_3, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
- // (FlinkVersion.v1_4, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
- // (FlinkVersion.v1_6, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
- // (FlinkVersion.v1_7, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
- // Note: It is not safe to restore savepoints created in a Scala applications with Flink
- // version 1.7 or below. The reason is that up to version 1.7 the underlying Scala serializer
- // used names of anonymous classes that depend on the relative position/order in code, e.g.,
- // if two anonymous classes, instantiated inside the same class and from the same base class,
- // change order in the code their names are switched.
- // As a consequence, changes in code may result in restore failures.
- // This was fixed in version 1.8, see: https://issues.apache.org/jira/browse/FLINK-10493
- (FlinkVersion.v1_8, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
- (FlinkVersion.v1_9, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
- (FlinkVersion.v1_10, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
- (FlinkVersion.v1_11, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
- (FlinkVersion.v1_12, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
- (FlinkVersion.v1_13, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
- (FlinkVersion.v1_14, StateBackendLoader.HASHMAP_STATE_BACKEND_NAME),
- (FlinkVersion.v1_15, StateBackendLoader.HASHMAP_STATE_BACKEND_NAME),
- )
+ @Parameterized.Parameters(name = "Test snapshot: {0}")
+ def parameters: util.Collection[SnapshotSpec] = {
+ // Note: It is not safe to restore savepoints created in a Scala applications with Flink
+ // version 1.7 or below. The reason is that up to version 1.7 the underlying Scala serializer
+ // used names of anonymous classes that depend on the relative position/order in code, e.g.,
+ // if two anonymous classes, instantiated inside the same class and from the same base class,
+ // change order in the code their names are switched.
+ // As a consequence, changes in code may result in restore failures.
+ // This was fixed in version 1.8, see: https://issues.apache.org/jira/browse/FLINK-10493
+ var parameters: util.List[SnapshotSpec] = new util.LinkedList[SnapshotSpec]()
+ parameters.addAll(
+ SnapshotSpec.withVersions(
+ StateBackendLoader.MEMORY_STATE_BACKEND_NAME,
+ SnapshotType.SAVEPOINT_CANONICAL,
+ FlinkVersion.rangeOf(FlinkVersion.v1_8, FlinkVersion.v1_13)))
+ parameters.addAll(
+ SnapshotSpec.withVersions(
+ StateBackendLoader.HASHMAP_STATE_BACKEND_NAME,
+ SnapshotType.SAVEPOINT_CANONICAL,
+ FlinkVersion.rangeOf(FlinkVersion.v1_14, currentVersion)))
+ parameters.addAll(
+ SnapshotSpec.withVersions(
+ StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME,
+ SnapshotType.SAVEPOINT_CANONICAL,
+ FlinkVersion.rangeOf(FlinkVersion.v1_15, currentVersion)))
+ parameters.addAll(
+ SnapshotSpec.withVersions(
+ StateBackendLoader.HASHMAP_STATE_BACKEND_NAME,
+ SnapshotType.SAVEPOINT_NATIVE,
+ FlinkVersion.rangeOf(FlinkVersion.v1_15, currentVersion)))
+ parameters.addAll(
+ SnapshotSpec.withVersions(
+ StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME,
+ SnapshotType.SAVEPOINT_NATIVE,
+ FlinkVersion.rangeOf(FlinkVersion.v1_15, currentVersion)))
+ parameters.addAll(
+ SnapshotSpec.withVersions(
+ StateBackendLoader.HASHMAP_STATE_BACKEND_NAME,
+ SnapshotType.CHECKPOINT,
+ FlinkVersion.rangeOf(FlinkVersion.v1_15, currentVersion)))
+ parameters.addAll(
+ SnapshotSpec.withVersions(
+ StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME,
+ SnapshotType.CHECKPOINT,
+ FlinkVersion.rangeOf(FlinkVersion.v1_15, currentVersion)))
if (executionMode == ExecutionMode.CREATE_SNAPSHOT) {
- parameters = parameters.stream().filter(x => x._1 == currentVersion)
+ parameters = parameters.stream()
+ .filter(x => x.getFlinkVersion().equals(currentVersion))
.collect(Collectors.toList())
}
parameters
@@ -91,10 +114,20 @@ object StatefulJobSavepointMigrationITCase {
val NUM_ELEMENTS = 4
- def getSnapshotPath(migrationVersionAndBackend: (FlinkVersion, String)): String = {
- s"stateful-scala-udf-migration-itcase" +
- s"-flink${migrationVersionAndBackend._1}" +
- s"-${migrationVersionAndBackend._2}-savepoint"
+ def getSnapshotPath(snapshotSpec: SnapshotSpec): String = {
+ val path = new StringBuilder(s"stateful-scala-udf-migration-itcase")
+ path ++= s"-flink${snapshotSpec.getFlinkVersion()}"
+ path ++= s"-${snapshotSpec.getStateBackendType()}"
+ snapshotSpec.getSnapshotType() match {
+ case SnapshotType.SAVEPOINT_CANONICAL =>
+ path ++= "-savepoint"
+ case SnapshotType.SAVEPOINT_NATIVE =>
+ path ++= "-savepoint-native"
+ case SnapshotType.CHECKPOINT =>
+ path ++= "-checkpoint"
+ case _ => throw new UnsupportedOperationException
+ }
+ path.toString()
}
}
@@ -102,8 +135,7 @@ object StatefulJobSavepointMigrationITCase {
* ITCase for migration Scala state types across different Flink versions.
*/
@RunWith(classOf[Parameterized])
-class StatefulJobSavepointMigrationITCase(
- migrationVersionAndBackend: (FlinkVersion, String))
+class StatefulJobSavepointMigrationITCase(snapshotSpec: SnapshotSpec)
extends SnapshotMigrationTestBase with Serializable {
@Test
@@ -111,7 +143,7 @@ class StatefulJobSavepointMigrationITCase(
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
- migrationVersionAndBackend._2 match {
+ snapshotSpec.getStateBackendType match {
case StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME =>
env.setStateBackend(new EmbeddedRocksDBStateBackend())
case StateBackendLoader.MEMORY_STATE_BACKEND_NAME =>
@@ -138,10 +170,11 @@ class StatefulJobSavepointMigrationITCase(
.addSink(new AccumulatorCountingSink)
if (StatefulJobSavepointMigrationITCase.executionMode == ExecutionMode.CREATE_SNAPSHOT) {
- executeAndSavepoint(
+ executeAndSnapshot(
env,
s"src/test/resources/"
- + StatefulJobSavepointMigrationITCase.getSnapshotPath(migrationVersionAndBackend),
+ + StatefulJobSavepointMigrationITCase.getSnapshotPath(snapshotSpec),
+ snapshotSpec.getSnapshotType(),
new Tuple2(
AccumulatorCountingSink.NUM_ELEMENTS_ACCUMULATOR,
StatefulJobSavepointMigrationITCase.NUM_ELEMENTS
@@ -152,7 +185,7 @@ class StatefulJobSavepointMigrationITCase(
restoreAndExecute(
env,
SnapshotMigrationTestBase.getResourceFilename(
- StatefulJobSavepointMigrationITCase.getSnapshotPath(migrationVersionAndBackend)),
+ StatefulJobSavepointMigrationITCase.getSnapshotPath(snapshotSpec)),
new Tuple2(
AccumulatorCountingSink.NUM_ELEMENTS_ACCUMULATOR,
StatefulJobSavepointMigrationITCase.NUM_ELEMENTS)
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/migration/StatefulJobWBroadcastStateMigrationITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/migration/StatefulJobWBroadcastStateMigrationITCase.scala
index 47dc3ae..dc9b540 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/migration/StatefulJobWBroadcastStateMigrationITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/migration/StatefulJobWBroadcastStateMigrationITCase.scala
@@ -42,7 +42,7 @@ import org.apache.flink.streaming.api.functions.sink.RichSinkFunction
import org.apache.flink.streaming.api.functions.source.SourceFunction
import org.apache.flink.streaming.api.watermark.Watermark
import org.apache.flink.test.checkpointing.utils.SnapshotMigrationTestBase
-import org.apache.flink.test.checkpointing.utils.SnapshotMigrationTestBase.ExecutionMode
+import org.apache.flink.test.checkpointing.utils.SnapshotMigrationTestBase.{ExecutionMode, SnapshotSpec, SnapshotType}
import org.apache.flink.util.Collector
import org.junit.runner.RunWith
import org.junit.runners.Parameterized
@@ -61,39 +61,73 @@ object StatefulJobWBroadcastStateMigrationITCase {
// master.
val executionMode = ExecutionMode.VERIFY_SNAPSHOT
- @Parameterized.Parameters(name = "Migrate Savepoint / Backend: {0}")
- def parameters: util.Collection[(FlinkVersion, String)] = {
- var parameters = util.Arrays.asList(
- // (FlinkVersion.v1_5, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
- // (FlinkVersion.v1_6, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
- // (FlinkVersion.v1_7, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
- // Note: It is not safe to restore savepoints created in a Scala applications with Flink
- // version 1.7 or below. The reason is that up to version 1.7 the underlying Scala serializer
- // used names of anonymous classes that depend on the relative position/order in code, e.g.,
- // if two anonymous classes, instantiated inside the same class and from the same base class,
- // change order in the code their names are switched.
- // As a consequence, changes in code may result in restore failures.
- // This was fixed in version 1.8, see: https://issues.apache.org/jira/browse/FLINK-10493
- (FlinkVersion.v1_8, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
- (FlinkVersion.v1_9, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
- (FlinkVersion.v1_10, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
- (FlinkVersion.v1_11, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
- (FlinkVersion.v1_12, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
- (FlinkVersion.v1_13, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
- (FlinkVersion.v1_14, StateBackendLoader.HASHMAP_STATE_BACKEND_NAME),
- (FlinkVersion.v1_15, StateBackendLoader.HASHMAP_STATE_BACKEND_NAME),
- )
+ @Parameterized.Parameters(name = "Test snapshot: {0}")
+ def parameters: util.Collection[SnapshotSpec] = {
+ // Note: It is not safe to restore savepoints created in a Scala applications with Flink
+ // version 1.7 or below. The reason is that up to version 1.7 the underlying Scala serializer
+ // used names of anonymous classes that depend on the relative position/order in code, e.g.,
+ // if two anonymous classes, instantiated inside the same class and from the same base class,
+ // change order in the code their names are switched.
+ // As a consequence, changes in code may result in restore failures.
+ // This was fixed in version 1.8, see: https://issues.apache.org/jira/browse/FLINK-10493
+ var parameters: util.List[SnapshotSpec] = new util.LinkedList[SnapshotSpec]()
+ parameters.addAll(
+ SnapshotSpec.withVersions(
+ StateBackendLoader.MEMORY_STATE_BACKEND_NAME,
+ SnapshotType.SAVEPOINT_CANONICAL,
+ FlinkVersion.rangeOf(FlinkVersion.v1_8, FlinkVersion.v1_13)))
+ parameters.addAll(
+ SnapshotSpec.withVersions(
+ StateBackendLoader.HASHMAP_STATE_BACKEND_NAME,
+ SnapshotType.SAVEPOINT_CANONICAL,
+ FlinkVersion.rangeOf(FlinkVersion.v1_14, currentVersion)))
+ parameters.addAll(
+ SnapshotSpec.withVersions(
+ StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME,
+ SnapshotType.SAVEPOINT_CANONICAL,
+ FlinkVersion.rangeOf(FlinkVersion.v1_15, currentVersion)))
+ parameters.addAll(
+ SnapshotSpec.withVersions(
+ StateBackendLoader.HASHMAP_STATE_BACKEND_NAME,
+ SnapshotType.SAVEPOINT_NATIVE,
+ FlinkVersion.rangeOf(FlinkVersion.v1_15, currentVersion)))
+ parameters.addAll(
+ SnapshotSpec.withVersions(
+ StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME,
+ SnapshotType.SAVEPOINT_NATIVE,
+ FlinkVersion.rangeOf(FlinkVersion.v1_15, currentVersion)))
+ parameters.addAll(
+ SnapshotSpec.withVersions(
+ StateBackendLoader.HASHMAP_STATE_BACKEND_NAME,
+ SnapshotType.CHECKPOINT,
+ FlinkVersion.rangeOf(FlinkVersion.v1_15, currentVersion)))
+ parameters.addAll(
+ SnapshotSpec.withVersions(
+ StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME,
+ SnapshotType.CHECKPOINT,
+ FlinkVersion.rangeOf(FlinkVersion.v1_15, currentVersion)))
if (executionMode == ExecutionMode.CREATE_SNAPSHOT) {
- parameters = parameters.stream().filter(x => x._1 == currentVersion)
+ parameters = parameters.stream()
+ .filter(x => x.getFlinkVersion().equals(currentVersion))
.collect(Collectors.toList())
}
parameters
}
- def getSnapshotPath(migrationVersionAndBackend: (FlinkVersion, String)): String = {
- s"stateful-scala-with-broadcast-udf-migration-itcase" +
- s"-flink${migrationVersionAndBackend._1}" +
- s"-${migrationVersionAndBackend._2}-savepoint"
+ def getSnapshotPath(snapshotSpec: SnapshotSpec): String = {
+ val path = new StringBuilder(s"stateful-scala-with-broadcast-udf-migration-itcase")
+ path ++= s"-flink${snapshotSpec.getFlinkVersion()}"
+ path ++= s"-${snapshotSpec.getStateBackendType()}"
+ snapshotSpec.getSnapshotType() match {
+ case SnapshotType.SAVEPOINT_CANONICAL =>
+ path ++= "-savepoint"
+ case SnapshotType.SAVEPOINT_NATIVE =>
+ path ++= "-savepoint-native"
+ case SnapshotType.CHECKPOINT =>
+ path ++= "-checkpoint"
+ case _ => throw new UnsupportedOperationException
+ }
+ path.toString()
}
val NUM_ELEMENTS = 4
@@ -103,7 +137,7 @@ object StatefulJobWBroadcastStateMigrationITCase {
* ITCase for migration Scala state types across different Flink versions.
*/
@RunWith(classOf[Parameterized])
-class StatefulJobWBroadcastStateMigrationITCase(migrationVersionAndBackend: (FlinkVersion, String))
+class StatefulJobWBroadcastStateMigrationITCase(snapshotSpec: SnapshotSpec)
extends SnapshotMigrationTestBase with Serializable {
@Test
@@ -112,7 +146,7 @@ class StatefulJobWBroadcastStateMigrationITCase(migrationVersionAndBackend: (Fli
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
- migrationVersionAndBackend._2 match {
+ snapshotSpec.getStateBackendType match {
case StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME =>
env.setStateBackend(new EmbeddedRocksDBStateBackend())
case StateBackendLoader.MEMORY_STATE_BACKEND_NAME =>
@@ -163,10 +197,11 @@ class StatefulJobWBroadcastStateMigrationITCase(migrationVersionAndBackend: (Fli
.process(new TestBroadcastProcessFunction)
.addSink(new AccumulatorCountingSink)
- executeAndSavepoint(
+ executeAndSnapshot(
env,
s"src/test/resources/"
- + StatefulJobWBroadcastStateMigrationITCase.getSnapshotPath(migrationVersionAndBackend),
+ + StatefulJobWBroadcastStateMigrationITCase.getSnapshotPath(snapshotSpec),
+ snapshotSpec.getSnapshotType(),
new Tuple2(
AccumulatorCountingSink.NUM_ELEMENTS_ACCUMULATOR,
StatefulJobWBroadcastStateMigrationITCase.NUM_ELEMENTS
@@ -187,7 +222,7 @@ class StatefulJobWBroadcastStateMigrationITCase(migrationVersionAndBackend: (Fli
restoreAndExecute(
env,
SnapshotMigrationTestBase.getResourceFilename(
- StatefulJobWBroadcastStateMigrationITCase.getSnapshotPath(migrationVersionAndBackend)),
+ StatefulJobWBroadcastStateMigrationITCase.getSnapshotPath(snapshotSpec)),
new Tuple2(
AccumulatorCountingSink.NUM_ELEMENTS_ACCUMULATOR,
StatefulJobWBroadcastStateMigrationITCase.NUM_ELEMENTS)