You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by dw...@apache.org on 2022/02/22 08:27:30 UTC
[flink] 01/03: [FLINK-26146] Add tests of flink version upgrades to cover native snapshots
This is an automated email from the ASF dual-hosted git repository.
dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 0b9f8956a1452a8b1259257af926eac7ad9e3bec
Author: Sebastian Mattheis <se...@ververica.com>
AuthorDate: Fri Feb 18 10:17:49 2022 +0100
[FLINK-26146] Add tests of flink version upgrades to cover native snapshots
Turned SavepointMigrationTestBase into SnapshotMigrationTestBase that supports testing canconical
savepoints, native savepoints, and checkpoints; and adapted sub classes accordingly.
---
.../LegacyStatefulJobSavepointMigrationITCase.java | 8 +-
...ava => StatefulJobSnapshotMigrationITCase.java} | 145 ++++++++-------
.../StatefulJobWBroadcastStateMigrationITCase.java | 139 +++++++-------
.../checkpointing/utils/MigrationTestUtils.java | 4 +-
...estBase.java => SnapshotMigrationTestBase.java} | 200 +++++++++++++++++++--
.../TypeSerializerSnapshotMigrationITCase.java | 155 ++++++++--------
pom.xml | 4 +-
7 files changed, 430 insertions(+), 225 deletions(-)
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/LegacyStatefulJobSavepointMigrationITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/LegacyStatefulJobSavepointMigrationITCase.java
similarity index 99%
rename from flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/LegacyStatefulJobSavepointMigrationITCase.java
rename to flink-tests/src/test/java/org/apache/flink/test/checkpointing/LegacyStatefulJobSavepointMigrationITCase.java
index e59be25..fe16a25 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/LegacyStatefulJobSavepointMigrationITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/LegacyStatefulJobSavepointMigrationITCase.java
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.flink.test.checkpointing.utils;
+package org.apache.flink.test.checkpointing;
import org.apache.flink.FlinkVersion;
import org.apache.flink.api.common.accumulators.IntCounter;
@@ -45,6 +45,7 @@ import org.apache.flink.streaming.api.operators.TimestampedCollector;
import org.apache.flink.streaming.api.operators.Triggerable;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.test.checkpointing.utils.SnapshotMigrationTestBase;
import org.apache.flink.util.Collector;
import org.junit.Ignore;
@@ -62,7 +63,7 @@ import static org.junit.Assert.assertEquals;
* previous Flink versions, as well as for different state backends.
*/
@RunWith(Parameterized.class)
-public class LegacyStatefulJobSavepointMigrationITCase extends SavepointMigrationTestBase {
+public class LegacyStatefulJobSavepointMigrationITCase extends SnapshotMigrationTestBase {
private static final int NUM_SOURCE_ELEMENTS = 4;
@@ -143,11 +144,12 @@ public class LegacyStatefulJobSavepointMigrationITCase extends SavepointMigratio
.uid("TimelyStatefulOperator")
.addSink(new AccumulatorCountingSink<Tuple2<Long, Long>>());
- executeAndSavepoint(
+ executeAndSnapshot(
env,
"src/test/resources/"
+ getSavepointPath(
flinkGenerateSavepointVersion, flinkGenerateSavepointBackendType),
+ SnapshotType.SAVEPOINT_CANONICAL,
new Tuple2<>(
AccumulatorCountingSink.NUM_ELEMENTS_ACCUMULATOR, NUM_SOURCE_ELEMENTS));
}
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobSavepointMigrationITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StatefulJobSnapshotMigrationITCase.java
similarity index 76%
rename from flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobSavepointMigrationITCase.java
rename to flink-tests/src/test/java/org/apache/flink/test/checkpointing/StatefulJobSnapshotMigrationITCase.java
index 260c1f2..df2a20b 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobSavepointMigrationITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StatefulJobSnapshotMigrationITCase.java
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.flink.test.checkpointing.utils;
+package org.apache.flink.test.checkpointing;
import org.apache.flink.FlinkVersion;
import org.apache.flink.api.common.accumulators.IntCounter;
@@ -28,8 +28,9 @@ import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
+import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;
import org.apache.flink.runtime.state.StateBackendLoader;
+import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
@@ -40,14 +41,17 @@ import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.Triggerable;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.test.checkpointing.utils.MigrationTestUtils;
+import org.apache.flink.test.checkpointing.utils.SnapshotMigrationTestBase;
import org.apache.flink.util.Collector;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
-import java.util.Arrays;
import java.util.Collection;
+import java.util.LinkedList;
+import java.util.stream.Collectors;
import static org.junit.Assert.assertEquals;
@@ -56,59 +60,69 @@ import static org.junit.Assert.assertEquals;
* previous Flink versions, as well as for different state backends.
*/
@RunWith(Parameterized.class)
-public class StatefulJobSavepointMigrationITCase extends SavepointMigrationTestBase {
+public class StatefulJobSnapshotMigrationITCase extends SnapshotMigrationTestBase {
private static final int NUM_SOURCE_ELEMENTS = 4;
- /**
- * This test runs in either of two modes: 1) we want to generate the binary savepoint, i.e. we
- * have to run the checkpointing functions 2) we want to verify restoring, so we have to run the
- * checking functions.
- */
- public enum ExecutionMode {
- PERFORM_SAVEPOINT,
- VERIFY_SAVEPOINT
- }
+ // TODO increase this to newer version to create and test snapshot migration for newer versions
+ private static final FlinkVersion currentVersion = FlinkVersion.v1_14;
- // TODO change this to PERFORM_SAVEPOINT to regenerate binary savepoints
- // TODO Note: You should generate the savepoint based on the release branch instead of the
+ // TODO change this to CREATE_SNAPSHOT to (re)create binary snapshots
+ // TODO Note: You should generate the snapshot based on the release branch instead of the
// master.
- private final ExecutionMode executionMode = ExecutionMode.VERIFY_SAVEPOINT;
-
- @Parameterized.Parameters(name = "Migrate Savepoint / Backend: {0}")
- public static Collection<Tuple2<FlinkVersion, String>> parameters() {
- return Arrays.asList(
- Tuple2.of(FlinkVersion.v1_4, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
- Tuple2.of(FlinkVersion.v1_4, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME),
- Tuple2.of(FlinkVersion.v1_5, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
- Tuple2.of(FlinkVersion.v1_5, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME),
- Tuple2.of(FlinkVersion.v1_6, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
- Tuple2.of(FlinkVersion.v1_6, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME),
- Tuple2.of(FlinkVersion.v1_7, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
- Tuple2.of(FlinkVersion.v1_7, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME),
- Tuple2.of(FlinkVersion.v1_8, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
- Tuple2.of(FlinkVersion.v1_8, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME),
- Tuple2.of(FlinkVersion.v1_9, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
- Tuple2.of(FlinkVersion.v1_9, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME),
- Tuple2.of(FlinkVersion.v1_10, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
- Tuple2.of(FlinkVersion.v1_10, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME),
- Tuple2.of(FlinkVersion.v1_11, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
- Tuple2.of(FlinkVersion.v1_11, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME),
- Tuple2.of(FlinkVersion.v1_12, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
- Tuple2.of(FlinkVersion.v1_12, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME),
- Tuple2.of(FlinkVersion.v1_13, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
- Tuple2.of(FlinkVersion.v1_13, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME),
- Tuple2.of(FlinkVersion.v1_14, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
- Tuple2.of(FlinkVersion.v1_14, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME));
+ private static final ExecutionMode executionMode = ExecutionMode.VERIFY_SNAPSHOT;
+
+ @Parameterized.Parameters(name = "Test snapshot: {0}")
+ public static Collection<SnapshotSpec> parameters() {
+ Collection<SnapshotSpec> parameters = new LinkedList<>();
+ parameters.addAll(
+ SnapshotSpec.withVersions(
+ StateBackendLoader.MEMORY_STATE_BACKEND_NAME,
+ SnapshotType.SAVEPOINT_CANONICAL,
+ FlinkVersion.rangeOf(FlinkVersion.v1_4, FlinkVersion.v1_14)));
+ parameters.addAll(
+ SnapshotSpec.withVersions(
+ StateBackendLoader.HASHMAP_STATE_BACKEND_NAME,
+ SnapshotType.SAVEPOINT_CANONICAL,
+ FlinkVersion.rangeOf(FlinkVersion.v1_15, currentVersion)));
+ parameters.addAll(
+ SnapshotSpec.withVersions(
+ StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME,
+ SnapshotType.SAVEPOINT_CANONICAL,
+ FlinkVersion.rangeOf(FlinkVersion.v1_4, currentVersion)));
+ parameters.addAll(
+ SnapshotSpec.withVersions(
+ StateBackendLoader.HASHMAP_STATE_BACKEND_NAME,
+ SnapshotType.SAVEPOINT_NATIVE,
+ FlinkVersion.rangeOf(FlinkVersion.v1_15, currentVersion)));
+ parameters.addAll(
+ SnapshotSpec.withVersions(
+ StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME,
+ SnapshotType.SAVEPOINT_NATIVE,
+ FlinkVersion.rangeOf(FlinkVersion.v1_15, currentVersion)));
+ parameters.addAll(
+ SnapshotSpec.withVersions(
+ StateBackendLoader.HASHMAP_STATE_BACKEND_NAME,
+ SnapshotType.CHECKPOINT,
+ FlinkVersion.rangeOf(FlinkVersion.v1_15, currentVersion)));
+ parameters.addAll(
+ SnapshotSpec.withVersions(
+ StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME,
+ SnapshotType.CHECKPOINT,
+ FlinkVersion.rangeOf(FlinkVersion.v1_15, currentVersion)));
+ if (executionMode == ExecutionMode.CREATE_SNAPSHOT) {
+ parameters =
+ parameters.stream()
+ .filter(x -> x.getFlinkVersion().equals(currentVersion))
+ .collect(Collectors.toList());
+ }
+ return parameters;
}
- private final FlinkVersion testMigrateVersion;
- private final String testStateBackend;
+ private final SnapshotSpec snapshotSpec;
- public StatefulJobSavepointMigrationITCase(
- Tuple2<FlinkVersion, String> testMigrateVersionAndBackend) throws Exception {
- this.testMigrateVersion = testMigrateVersionAndBackend.f0;
- this.testStateBackend = testMigrateVersionAndBackend.f1;
+ public StatefulJobSnapshotMigrationITCase(SnapshotSpec snapshotSpec) throws Exception {
+ this.snapshotSpec = snapshotSpec;
}
@Test
@@ -119,13 +133,16 @@ public class StatefulJobSavepointMigrationITCase extends SavepointMigrationTestB
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRestartStrategy(RestartStrategies.noRestart());
- switch (testStateBackend) {
+ switch (snapshotSpec.getStateBackendType()) {
case StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME:
- env.setStateBackend(new RocksDBStateBackend(new MemoryStateBackend()));
+ env.setStateBackend(new EmbeddedRocksDBStateBackend());
break;
case StateBackendLoader.MEMORY_STATE_BACKEND_NAME:
env.setStateBackend(new MemoryStateBackend());
break;
+ case StateBackendLoader.HASHMAP_STATE_BACKEND_NAME:
+ env.setStateBackend(new HashMapStateBackend());
+ break;
default:
throw new UnsupportedOperationException();
}
@@ -140,7 +157,7 @@ public class StatefulJobSavepointMigrationITCase extends SavepointMigrationTestB
RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> flatMap;
OneInputStreamOperator<Tuple2<Long, Long>, Tuple2<Long, Long>> timelyOperator;
- if (executionMode == ExecutionMode.PERFORM_SAVEPOINT) {
+ if (executionMode == ExecutionMode.CREATE_SNAPSHOT) {
nonParallelSource =
new MigrationTestUtils.CheckpointingNonParallelSourceWithListState(
NUM_SOURCE_ELEMENTS);
@@ -149,7 +166,7 @@ public class StatefulJobSavepointMigrationITCase extends SavepointMigrationTestB
NUM_SOURCE_ELEMENTS);
flatMap = new CheckpointingKeyedStateFlatMap();
timelyOperator = new CheckpointingTimelyStatefulOperator();
- } else if (executionMode == ExecutionMode.VERIFY_SAVEPOINT) {
+ } else if (executionMode == ExecutionMode.VERIFY_SNAPSHOT) {
nonParallelSource =
new MigrationTestUtils.CheckingNonParallelSourceWithListState(
NUM_SOURCE_ELEMENTS);
@@ -190,17 +207,20 @@ public class StatefulJobSavepointMigrationITCase extends SavepointMigrationTestB
.uid("CheckpointingTimelyStatefulOperator2")
.addSink(new MigrationTestUtils.AccumulatorCountingSink<>());
- if (executionMode == ExecutionMode.PERFORM_SAVEPOINT) {
- executeAndSavepoint(
+ final String snapshotPath = getSnapshotPath(snapshotSpec);
+
+ if (executionMode == ExecutionMode.CREATE_SNAPSHOT) {
+ executeAndSnapshot(
env,
- "src/test/resources/" + getSavepointPath(testMigrateVersion, testStateBackend),
+ "src/test/resources/" + snapshotPath,
+ snapshotSpec.getSnapshotType(),
new Tuple2<>(
MigrationTestUtils.AccumulatorCountingSink.NUM_ELEMENTS_ACCUMULATOR,
NUM_SOURCE_ELEMENTS * 2));
} else {
restoreAndExecute(
env,
- getResourceFilename(getSavepointPath(testMigrateVersion, testStateBackend)),
+ getResourceFilename(snapshotPath),
new Tuple2<>(
MigrationTestUtils.CheckingNonParallelSourceWithListState
.SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR,
@@ -228,17 +248,8 @@ public class StatefulJobSavepointMigrationITCase extends SavepointMigrationTestB
}
}
- private String getSavepointPath(FlinkVersion savepointVersion, String backendType) {
- switch (backendType) {
- case StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME:
- return "new-stateful-udf-migration-itcase-flink"
- + savepointVersion
- + "-rocksdb-savepoint";
- case StateBackendLoader.MEMORY_STATE_BACKEND_NAME:
- return "new-stateful-udf-migration-itcase-flink" + savepointVersion + "-savepoint";
- default:
- throw new UnsupportedOperationException();
- }
+ private static String getSnapshotPath(SnapshotSpec snapshotSpec) {
+ return "new-stateful-udf-migration-itcase-" + snapshotSpec;
}
private static class CheckpointingKeyedStateFlatMap
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobWBroadcastStateMigrationITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StatefulJobWBroadcastStateMigrationITCase.java
similarity index 80%
rename from flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobWBroadcastStateMigrationITCase.java
rename to flink-tests/src/test/java/org/apache/flink/test/checkpointing/StatefulJobWBroadcastStateMigrationITCase.java
index 4470b85..459ce1b 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobWBroadcastStateMigrationITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StatefulJobWBroadcastStateMigrationITCase.java
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.flink.test.checkpointing.utils;
+package org.apache.flink.test.checkpointing;
import org.apache.flink.FlinkVersion;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
@@ -25,14 +25,17 @@ import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
+import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;
import org.apache.flink.runtime.state.StateBackendLoader;
+import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.streaming.api.datastream.BroadcastStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.test.checkpointing.utils.MigrationTestUtils;
+import org.apache.flink.test.checkpointing.utils.SnapshotMigrationTestBase;
import org.apache.flink.util.Collector;
import org.junit.Assert;
@@ -40,10 +43,11 @@ import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
-import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
+import java.util.LinkedList;
import java.util.Map;
+import java.util.stream.Collectors;
/**
* Migration ITCases for a stateful job with broadcast state. The tests are parameterized to
@@ -51,48 +55,69 @@ import java.util.Map;
* state backends.
*/
@RunWith(Parameterized.class)
-public class StatefulJobWBroadcastStateMigrationITCase extends SavepointMigrationTestBase {
+public class StatefulJobWBroadcastStateMigrationITCase extends SnapshotMigrationTestBase {
private static final int NUM_SOURCE_ELEMENTS = 4;
- // TODO change this to PERFORM_SAVEPOINT to regenerate binary savepoints
- // TODO Note: You should generate the savepoint based on the release branch instead of the
+ // TODO increase this to newer version to create and test snapshot migration for newer versions
+ private static final FlinkVersion currentVersion = FlinkVersion.v1_14;
+
+ // TODO change this to CREATE_SNAPSHOT to (re)create binary snapshots
+ // TODO Note: You should generate the snapshot based on the release branch instead of the
// master.
- private final StatefulJobSavepointMigrationITCase.ExecutionMode executionMode =
- StatefulJobSavepointMigrationITCase.ExecutionMode.VERIFY_SAVEPOINT;
-
- @Parameterized.Parameters(name = "Migrate Savepoint / Backend: {0}")
- public static Collection<Tuple2<FlinkVersion, String>> parameters() {
- return Arrays.asList(
- Tuple2.of(FlinkVersion.v1_5, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
- Tuple2.of(FlinkVersion.v1_5, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME),
- Tuple2.of(FlinkVersion.v1_6, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
- Tuple2.of(FlinkVersion.v1_6, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME),
- Tuple2.of(FlinkVersion.v1_7, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
- Tuple2.of(FlinkVersion.v1_7, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME),
- Tuple2.of(FlinkVersion.v1_8, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
- Tuple2.of(FlinkVersion.v1_8, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME),
- Tuple2.of(FlinkVersion.v1_9, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
- Tuple2.of(FlinkVersion.v1_9, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME),
- Tuple2.of(FlinkVersion.v1_10, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
- Tuple2.of(FlinkVersion.v1_10, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME),
- Tuple2.of(FlinkVersion.v1_11, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
- Tuple2.of(FlinkVersion.v1_11, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME),
- Tuple2.of(FlinkVersion.v1_12, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
- Tuple2.of(FlinkVersion.v1_12, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME),
- Tuple2.of(FlinkVersion.v1_13, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
- Tuple2.of(FlinkVersion.v1_13, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME),
- Tuple2.of(FlinkVersion.v1_14, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
- Tuple2.of(FlinkVersion.v1_14, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME));
+ private static final ExecutionMode executionMode = ExecutionMode.VERIFY_SNAPSHOT;
+
+ @Parameterized.Parameters(name = "Test snapshot: {0}")
+ public static Collection<SnapshotSpec> parameters() {
+ Collection<SnapshotSpec> parameters = new LinkedList<>();
+ parameters.addAll(
+ SnapshotSpec.withVersions(
+ StateBackendLoader.MEMORY_STATE_BACKEND_NAME,
+ SnapshotType.SAVEPOINT_CANONICAL,
+ FlinkVersion.rangeOf(FlinkVersion.v1_5, FlinkVersion.v1_14)));
+ parameters.addAll(
+ SnapshotSpec.withVersions(
+ StateBackendLoader.HASHMAP_STATE_BACKEND_NAME,
+ SnapshotType.SAVEPOINT_CANONICAL,
+ FlinkVersion.rangeOf(FlinkVersion.v1_15, currentVersion)));
+ parameters.addAll(
+ SnapshotSpec.withVersions(
+ StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME,
+ SnapshotType.SAVEPOINT_CANONICAL,
+ FlinkVersion.rangeOf(FlinkVersion.v1_5, currentVersion)));
+ parameters.addAll(
+ SnapshotSpec.withVersions(
+ StateBackendLoader.HASHMAP_STATE_BACKEND_NAME,
+ SnapshotType.SAVEPOINT_NATIVE,
+ FlinkVersion.rangeOf(FlinkVersion.v1_15, currentVersion)));
+ parameters.addAll(
+ SnapshotSpec.withVersions(
+ StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME,
+ SnapshotType.SAVEPOINT_NATIVE,
+ FlinkVersion.rangeOf(FlinkVersion.v1_15, currentVersion)));
+ parameters.addAll(
+ SnapshotSpec.withVersions(
+ StateBackendLoader.HASHMAP_STATE_BACKEND_NAME,
+ SnapshotType.CHECKPOINT,
+ FlinkVersion.rangeOf(FlinkVersion.v1_15, currentVersion)));
+ parameters.addAll(
+ SnapshotSpec.withVersions(
+ StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME,
+ SnapshotType.CHECKPOINT,
+ FlinkVersion.rangeOf(FlinkVersion.v1_15, currentVersion)));
+ if (executionMode == ExecutionMode.CREATE_SNAPSHOT) {
+ parameters =
+ parameters.stream()
+ .filter(x -> x.getFlinkVersion().equals(currentVersion))
+ .collect(Collectors.toList());
+ }
+ return parameters;
}
- private final FlinkVersion testMigrateVersion;
- private final String testStateBackend;
+ private final SnapshotSpec snapshotSpec;
- public StatefulJobWBroadcastStateMigrationITCase(
- Tuple2<FlinkVersion, String> testMigrateVersionAndBackend) throws Exception {
- this.testMigrateVersion = testMigrateVersionAndBackend.f0;
- this.testStateBackend = testMigrateVersionAndBackend.f1;
+ public StatefulJobWBroadcastStateMigrationITCase(SnapshotSpec snapshotSpec) throws Exception {
+ this.snapshotSpec = snapshotSpec;
}
@Test
@@ -103,13 +128,16 @@ public class StatefulJobWBroadcastStateMigrationITCase extends SavepointMigratio
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRestartStrategy(RestartStrategies.noRestart());
- switch (testStateBackend) {
+ switch (snapshotSpec.getStateBackendType()) {
case StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME:
- env.setStateBackend(new RocksDBStateBackend(new MemoryStateBackend()));
+ env.setStateBackend(new EmbeddedRocksDBStateBackend());
break;
case StateBackendLoader.MEMORY_STATE_BACKEND_NAME:
env.setStateBackend(new MemoryStateBackend());
break;
+ case StateBackendLoader.HASHMAP_STATE_BACKEND_NAME:
+ env.setStateBackend(new HashMapStateBackend());
+ break;
default:
throw new UnsupportedOperationException();
}
@@ -148,7 +176,7 @@ public class StatefulJobWBroadcastStateMigrationITCase extends SavepointMigratio
expectedThirdState.put(2L, "2");
expectedThirdState.put(3L, "3");
- if (executionMode == StatefulJobSavepointMigrationITCase.ExecutionMode.PERFORM_SAVEPOINT) {
+ if (executionMode == ExecutionMode.CREATE_SNAPSHOT) {
nonParallelSource =
new MigrationTestUtils.CheckpointingNonParallelSourceWithListState(
NUM_SOURCE_ELEMENTS);
@@ -163,8 +191,7 @@ public class StatefulJobWBroadcastStateMigrationITCase extends SavepointMigratio
NUM_SOURCE_ELEMENTS);
firstBroadcastFunction = new CheckpointingKeyedBroadcastFunction();
secondBroadcastFunction = new CheckpointingKeyedSingleBroadcastFunction();
- } else if (executionMode
- == StatefulJobSavepointMigrationITCase.ExecutionMode.VERIFY_SAVEPOINT) {
+ } else if (executionMode == ExecutionMode.VERIFY_SNAPSHOT) {
nonParallelSource =
new MigrationTestUtils.CheckingNonParallelSourceWithListState(
NUM_SOURCE_ELEMENTS);
@@ -252,19 +279,18 @@ public class StatefulJobWBroadcastStateMigrationITCase extends SavepointMigratio
.uid("BrProcess2")
.addSink(new MigrationTestUtils.AccumulatorCountingSink<>());
- if (executionMode == StatefulJobSavepointMigrationITCase.ExecutionMode.PERFORM_SAVEPOINT) {
- executeAndSavepoint(
+ if (executionMode == ExecutionMode.CREATE_SNAPSHOT) {
+ executeAndSnapshot(
env,
- "src/test/resources/"
- + getBroadcastSavepointPath(testMigrateVersion, testStateBackend),
+ "src/test/resources/" + getSnapshotPath(snapshotSpec),
+ snapshotSpec.getSnapshotType(),
new Tuple2<>(
MigrationTestUtils.AccumulatorCountingSink.NUM_ELEMENTS_ACCUMULATOR,
2 * NUM_SOURCE_ELEMENTS));
} else {
restoreAndExecute(
env,
- getResourceFilename(
- getBroadcastSavepointPath(testMigrateVersion, testStateBackend)),
+ getResourceFilename(getSnapshotPath(snapshotSpec)),
new Tuple2<>(
MigrationTestUtils.CheckingNonParallelSourceWithListState
.SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR,
@@ -279,19 +305,8 @@ public class StatefulJobWBroadcastStateMigrationITCase extends SavepointMigratio
}
}
- private String getBroadcastSavepointPath(FlinkVersion savepointVersion, String backendType) {
- switch (backendType) {
- case StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME:
- return "new-stateful-broadcast-udf-migration-itcase-flink"
- + savepointVersion
- + "-rocksdb-savepoint";
- case StateBackendLoader.MEMORY_STATE_BACKEND_NAME:
- return "new-stateful-broadcast-udf-migration-itcase-flink"
- + savepointVersion
- + "-savepoint";
- default:
- throw new UnsupportedOperationException();
- }
+ private String getSnapshotPath(SnapshotSpec snapshotSpec) {
+ return "new-stateful-broadcast-udf-migration-itcase-" + snapshotSpec;
}
/**
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/MigrationTestUtils.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/MigrationTestUtils.java
index 85ff84c..9fd021e 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/MigrationTestUtils.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/MigrationTestUtils.java
@@ -110,7 +110,7 @@ public class MigrationTestUtils {
private static final long serialVersionUID = 1L;
- static final String SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR =
+ public static final String SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR =
CheckingNonParallelSourceWithListState.class + "_RESTORE_CHECK";
private volatile boolean isRunning = true;
@@ -251,7 +251,7 @@ public class MigrationTestUtils {
private static final long serialVersionUID = 1L;
- static final String SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR =
+ public static final String SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR =
CheckingParallelSourceWithUnionListState.class + "_RESTORE_CHECK";
private volatile boolean isRunning = true;
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SnapshotMigrationTestBase.java
similarity index 53%
rename from flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java
rename to flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SnapshotMigrationTestBase.java
index f17f59c..57a4894 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SnapshotMigrationTestBase.java
@@ -18,6 +18,7 @@
package org.apache.flink.test.checkpointing.utils;
+import org.apache.flink.FlinkVersion;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.time.Deadline;
@@ -33,6 +34,7 @@ import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.core.execution.SavepointFormatType;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
+import org.apache.flink.runtime.state.StateBackendLoader;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.test.util.MiniClusterWithClientResource;
@@ -46,9 +48,13 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
+import java.io.Serializable;
import java.net.URI;
import java.net.URL;
import java.time.Duration;
+import java.util.Collection;
+import java.util.LinkedList;
+import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
@@ -58,18 +64,154 @@ import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.fail;
/** Test savepoint migration. */
-public abstract class SavepointMigrationTestBase extends TestBaseUtils {
+
+/**
+ * Base for testing snapshot migration. The base test supports snapshots types as defined in {@link
+ * SnapshotType}.
+ */
+public abstract class SnapshotMigrationTestBase extends TestBaseUtils {
@ClassRule public static final TemporaryFolder TEMP_FOLDER = new TemporaryFolder();
@Rule public final MiniClusterWithClientResource miniClusterResource;
- private static final Logger LOG = LoggerFactory.getLogger(SavepointMigrationTestBase.class);
+ private static final Logger LOG = LoggerFactory.getLogger(SnapshotMigrationTestBase.class);
protected static final int DEFAULT_PARALLELISM = 4;
+ /**
+ * Modes for migration test execution. This enum is supposed to serve as a switch between two
+ * modes of test execution: 1) create snapshots and 2) verify snapshots:
+ */
+ public enum ExecutionMode {
+ /** Create binary snapshot(s), i.e. run the checkpointing functions. */
+ CREATE_SNAPSHOT,
+ /** Verify snapshot(s), i.e, restore snapshot and check execution result. */
+ VERIFY_SNAPSHOT
+ }
+
+ /** Types of snapshot supported by this base test. */
+ public enum SnapshotType {
+ /** Savepoints with Flink canonical format. */
+ SAVEPOINT_CANONICAL,
+ /** Savepoint with native format of respective state backend. */
+ SAVEPOINT_NATIVE,
+ /** Checkpoint. */
+ CHECKPOINT
+ }
+
+ /**
+ * A snapshot specification (immutable) for migration tests that consists of {@link
+ * FlinkVersion} that the snapshot has been created with, {@link
+ * SnapshotMigrationTestBase.SnapshotType}, and state backend type that the snapshot has been
+ * ctreated from.
+ */
+ public static class SnapshotSpec implements Serializable {
+ private final FlinkVersion flinkVersion;
+ private final String stateBackendType;
+ private final SnapshotMigrationTestBase.SnapshotType snapshotType;
+
+ /**
+ * Creates a {@link SnapshotSpec} with specified parameters.
+ *
+ * @param flinkVersion Specifies the {@link FlinkVersion}.
+ * @param stateBackendType Specifies the state backend type.
+ * @param snapshotType Specifies the {@link SnapshotMigrationTestBase.SnapshotType}.
+ */
+ public SnapshotSpec(
+ FlinkVersion flinkVersion,
+ String stateBackendType,
+ SnapshotMigrationTestBase.SnapshotType snapshotType) {
+ this.flinkVersion = flinkVersion;
+ this.stateBackendType = stateBackendType;
+ this.snapshotType = snapshotType;
+ }
+
+ /**
+ * Gets the {@link FlinkVersion} that the snapshot has been created with.
+ *
+ * @return {@link FlinkVersion}
+ */
+ public FlinkVersion getFlinkVersion() {
+ return flinkVersion;
+ }
+
+ /**
+ * Gets the state backend type that the snapshot has been created from.
+ *
+ * @return State backend type.
+ */
+ public String getStateBackendType() {
+ return stateBackendType;
+ }
+
+ /**
+ * Gets the {@link SnapshotMigrationTestBase.SnapshotType}.
+ *
+ * @return {@link SnapshotMigrationTestBase.SnapshotType}
+ */
+ public SnapshotMigrationTestBase.SnapshotType getSnapshotType() {
+ return snapshotType;
+ }
+
+ /**
+ * Creates a collection of {@link SnapshotSpec} for a given collection of {@link
+ * FlinkVersion} with the same parameters but different {@link FlinkVersion}.
+ *
+ * @param stateBackendType Specifies the state backend type.
+ * @param snapshotType Specifies the snapshot type.
+ * @param flinkVersions A collection of {@link FlinkVersion}.
+ * @return A collection of {@link SnapshotSpec} that differ only by means of {@link
+ * FlinkVersion} FlinkVersion}.
+ */
+ public static Collection<SnapshotSpec> withVersions(
+ String stateBackendType,
+ SnapshotMigrationTestBase.SnapshotType snapshotType,
+ Collection<FlinkVersion> flinkVersions) {
+ List<SnapshotSpec> snapshotSpecCollection = new LinkedList<>();
+ for (FlinkVersion version : flinkVersions) {
+ snapshotSpecCollection.add(
+ new SnapshotSpec(version, stateBackendType, snapshotType));
+ }
+ return snapshotSpecCollection;
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder str = new StringBuilder("flink" + flinkVersion);
+ switch (stateBackendType) {
+ case StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME:
+ str.append("-rocksdb");
+ break;
+ case StateBackendLoader.MEMORY_STATE_BACKEND_NAME:
+ // This is implicit due to backwards compatibility with legacy artifact names.
+ break;
+ case StateBackendLoader.HASHMAP_STATE_BACKEND_NAME:
+ str.append("-hashmap");
+ break;
+ default:
+ throw new UnsupportedOperationException("State backend type not supported.");
+ }
+ switch (snapshotType) {
+ case SAVEPOINT_CANONICAL:
+ str.append("-savepoint");
+ // Canonical implicit due to backwards compatibility with legacy artifact names.
+ break;
+ case SAVEPOINT_NATIVE:
+ str.append("-savepoint-native");
+ break;
+ case CHECKPOINT:
+ str.append("-checkpoint");
+ break;
+ default:
+ throw new UnsupportedOperationException("Snapshot type not supported.");
+ }
+ return str.toString();
+ }
+ }
+
protected static String getResourceFilename(String filename) {
- ClassLoader cl = SavepointMigrationTestBase.class.getClassLoader();
+ ClassLoader cl = SnapshotMigrationTestBase.class.getClassLoader();
URL resource = cl.getResource(filename);
if (resource == null) {
throw new NullPointerException("Missing snapshot resource.");
@@ -77,7 +219,7 @@ public abstract class SavepointMigrationTestBase extends TestBaseUtils {
return resource.getFile();
}
- protected SavepointMigrationTestBase() throws Exception {
+ protected SnapshotMigrationTestBase() throws Exception {
miniClusterResource =
new MiniClusterWithClientResource(
new MiniClusterResourceConfiguration.Builder()
@@ -115,10 +257,22 @@ public abstract class SavepointMigrationTestBase extends TestBaseUtils {
return config;
}
+ @Deprecated
@SafeVarargs
protected final void executeAndSavepoint(
StreamExecutionEnvironment env,
- String savepointPath,
+ String snapshotPath,
+ Tuple2<String, Integer>... expectedAccumulators)
+ throws Exception {
+ executeAndSnapshot(
+ env, snapshotPath, SnapshotType.SAVEPOINT_CANONICAL, expectedAccumulators);
+ }
+
+ @SafeVarargs
+ protected final void executeAndSnapshot(
+ StreamExecutionEnvironment env,
+ String snapshotPath,
+ SnapshotType snapshotType,
Tuple2<String, Integer>... expectedAccumulators)
throws Exception {
@@ -162,27 +316,41 @@ public abstract class SavepointMigrationTestBase extends TestBaseUtils {
fail("Did not see the expected accumulator results within time limit.");
}
- LOG.info("Triggering savepoint.");
+ LOG.info("Triggering snapshot.");
- CompletableFuture<String> savepointPathFuture =
- client.triggerSavepoint(jobID, null, SavepointFormatType.CANONICAL);
+ CompletableFuture<String> snapshotPathFuture;
+ switch (snapshotType) {
+ case SAVEPOINT_CANONICAL:
+ snapshotPathFuture =
+ client.triggerSavepoint(jobID, null, SavepointFormatType.CANONICAL);
+ break;
+ case SAVEPOINT_NATIVE:
+ snapshotPathFuture =
+ client.triggerSavepoint(jobID, null, SavepointFormatType.NATIVE);
+ break;
+ case CHECKPOINT:
+ snapshotPathFuture = miniClusterResource.getMiniCluster().triggerCheckpoint(jobID);
+ break;
+ default:
+ throw new UnsupportedOperationException("Snapshot type not supported/implemented.");
+ }
- String jobmanagerSavepointPath =
- savepointPathFuture.get(deadLine.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
+ String jobmanagerSnapshotPath =
+ snapshotPathFuture.get(deadLine.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
- File jobManagerSavepoint = new File(new URI(jobmanagerSavepointPath).getPath());
+ File jobManagerSnapshot = new File(new URI(jobmanagerSnapshotPath).getPath());
// savepoints were changed to be directories in Flink 1.3
- if (jobManagerSavepoint.isDirectory()) {
- FileUtils.moveDirectory(jobManagerSavepoint, new File(savepointPath));
+ if (jobManagerSnapshot.isDirectory()) {
+ FileUtils.moveDirectory(jobManagerSnapshot, new File(snapshotPath));
} else {
- FileUtils.moveFile(jobManagerSavepoint, new File(savepointPath));
+ FileUtils.moveFile(jobManagerSnapshot, new File(snapshotPath));
}
}
@SafeVarargs
protected final void restoreAndExecute(
StreamExecutionEnvironment env,
- String savepointPath,
+ String snapshotPath,
Tuple2<String, Integer>... expectedAccumulators)
throws Exception {
@@ -193,7 +361,7 @@ public abstract class SavepointMigrationTestBase extends TestBaseUtils {
// Submit the job
JobGraph jobGraph = env.getStreamGraph().getJobGraph();
- jobGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(savepointPath));
+ jobGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(snapshotPath));
JobID jobID = client.submitJob(jobGraph).get();
diff --git a/flink-tests/src/test/java/org/apache/flink/test/migration/TypeSerializerSnapshotMigrationITCase.java b/flink-tests/src/test/java/org/apache/flink/test/migration/TypeSerializerSnapshotMigrationITCase.java
index aa22407..96d607b 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/migration/TypeSerializerSnapshotMigrationITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/migration/TypeSerializerSnapshotMigrationITCase.java
@@ -29,104 +29,117 @@ import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
+import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.runtime.state.StateBackendLoader;
+import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.test.checkpointing.utils.MigrationTestUtils;
-import org.apache.flink.test.checkpointing.utils.SavepointMigrationTestBase;
+import org.apache.flink.test.checkpointing.utils.SnapshotMigrationTestBase;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import java.io.IOException;
-import java.util.Arrays;
import java.util.Collection;
+import java.util.LinkedList;
+import java.util.stream.Collectors;
/**
* Migration IT cases for upgrading a legacy {@link TypeSerializerConfigSnapshot} that is written in
* checkpoints to {@link TypeSerializerSnapshot} interface.
*
- * <p>The savepoints used by this test were written with a serializer snapshot class that extends
+ * <p>The snapshots used by this test were written with a serializer snapshot class that extends
* {@link TypeSerializerConfigSnapshot}, as can be seen in the commented out code at the end of this
* class. On restore, we change the snapshot to implement directly a {@link TypeSerializerSnapshot}.
*/
@RunWith(Parameterized.class)
-public class TypeSerializerSnapshotMigrationITCase extends SavepointMigrationTestBase {
+public class TypeSerializerSnapshotMigrationITCase extends SnapshotMigrationTestBase {
private static final int NUM_SOURCE_ELEMENTS = 4;
- /**
- * This test runs in either of two modes: 1) we want to generate the binary savepoint, i.e. we
- * have to run the checkpointing functions 2) we want to verify restoring, so we have to run the
- * checking functions.
- */
- public enum ExecutionMode {
- PERFORM_SAVEPOINT,
- VERIFY_SAVEPOINT
- }
+ // TODO increase this to newer version to create and test snapshot migration for newer versions
+ private static final FlinkVersion currentVersion = FlinkVersion.v1_14;
- // TODO change this to PERFORM_SAVEPOINT to regenerate binary savepoints
- // TODO Note: You should generate the savepoint based on the release branch instead of the
+ // TODO change this to CREATE_SNAPSHOT to (re)create binary snapshots
+ // TODO Note: You should generate the snapshot based on the release branch instead of the
// master.
- private final ExecutionMode executionMode = ExecutionMode.VERIFY_SAVEPOINT;
-
- @Parameterized.Parameters(name = "Migrate Savepoint / Backend: {0}")
- public static Collection<Tuple2<FlinkVersion, String>> parameters() {
- return Arrays.asList(
- Tuple2.of(FlinkVersion.v1_3, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
- Tuple2.of(FlinkVersion.v1_3, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME),
- Tuple2.of(FlinkVersion.v1_4, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
- Tuple2.of(FlinkVersion.v1_4, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME),
- Tuple2.of(FlinkVersion.v1_5, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
- Tuple2.of(FlinkVersion.v1_5, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME),
- Tuple2.of(FlinkVersion.v1_6, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
- Tuple2.of(FlinkVersion.v1_6, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME),
- Tuple2.of(FlinkVersion.v1_7, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
- Tuple2.of(FlinkVersion.v1_7, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME),
- Tuple2.of(FlinkVersion.v1_8, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
- Tuple2.of(FlinkVersion.v1_8, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME),
- Tuple2.of(FlinkVersion.v1_9, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
- Tuple2.of(FlinkVersion.v1_9, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME),
- Tuple2.of(FlinkVersion.v1_10, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
- Tuple2.of(FlinkVersion.v1_10, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME),
- Tuple2.of(FlinkVersion.v1_11, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
- Tuple2.of(FlinkVersion.v1_11, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME),
- Tuple2.of(FlinkVersion.v1_12, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
- Tuple2.of(FlinkVersion.v1_12, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME),
- Tuple2.of(FlinkVersion.v1_13, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
- Tuple2.of(FlinkVersion.v1_13, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME),
- Tuple2.of(FlinkVersion.v1_14, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
- Tuple2.of(FlinkVersion.v1_14, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME));
+ private static final ExecutionMode executionMode = ExecutionMode.VERIFY_SNAPSHOT;
+
+ @Parameterized.Parameters(name = "Test snapshot: {0}")
+ public static Collection<SnapshotSpec> parameters() {
+ Collection<SnapshotSpec> parameters = new LinkedList<>();
+ parameters.addAll(
+ SnapshotSpec.withVersions(
+ StateBackendLoader.MEMORY_STATE_BACKEND_NAME,
+ SnapshotType.SAVEPOINT_CANONICAL,
+ FlinkVersion.rangeOf(FlinkVersion.v1_3, FlinkVersion.v1_14)));
+ parameters.addAll(
+ SnapshotSpec.withVersions(
+ StateBackendLoader.HASHMAP_STATE_BACKEND_NAME,
+ SnapshotType.SAVEPOINT_CANONICAL,
+ FlinkVersion.rangeOf(FlinkVersion.v1_15, currentVersion)));
+ parameters.addAll(
+ SnapshotSpec.withVersions(
+ StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME,
+ SnapshotType.SAVEPOINT_CANONICAL,
+ FlinkVersion.rangeOf(FlinkVersion.v1_3, currentVersion)));
+ parameters.addAll(
+ SnapshotSpec.withVersions(
+ StateBackendLoader.HASHMAP_STATE_BACKEND_NAME,
+ SnapshotType.SAVEPOINT_NATIVE,
+ FlinkVersion.rangeOf(FlinkVersion.v1_15, currentVersion)));
+ parameters.addAll(
+ SnapshotSpec.withVersions(
+ StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME,
+ SnapshotType.SAVEPOINT_NATIVE,
+ FlinkVersion.rangeOf(FlinkVersion.v1_15, currentVersion)));
+ parameters.addAll(
+ SnapshotSpec.withVersions(
+ StateBackendLoader.HASHMAP_STATE_BACKEND_NAME,
+ SnapshotType.CHECKPOINT,
+ FlinkVersion.rangeOf(FlinkVersion.v1_15, currentVersion)));
+ parameters.addAll(
+ SnapshotSpec.withVersions(
+ StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME,
+ SnapshotType.CHECKPOINT,
+ FlinkVersion.rangeOf(FlinkVersion.v1_15, currentVersion)));
+ if (executionMode == ExecutionMode.CREATE_SNAPSHOT) {
+ parameters =
+ parameters.stream()
+ .filter(x -> x.getFlinkVersion().equals(currentVersion))
+ .collect(Collectors.toList());
+ }
+ return parameters;
}
- private final FlinkVersion testMigrateVersion;
- private final String testStateBackend;
+ private final SnapshotSpec snapshotSpec;
- public TypeSerializerSnapshotMigrationITCase(
- Tuple2<FlinkVersion, String> testMigrateVersionAndBackend) throws Exception {
- this.testMigrateVersion = testMigrateVersionAndBackend.f0;
- this.testStateBackend = testMigrateVersionAndBackend.f1;
+ public TypeSerializerSnapshotMigrationITCase(SnapshotSpec snapshotSpec) throws Exception {
+ this.snapshotSpec = snapshotSpec;
}
@Test
- public void testSavepoint() throws Exception {
+ public void testSnapshot() throws Exception {
final int parallelism = 1;
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRestartStrategy(RestartStrategies.noRestart());
- switch (testStateBackend) {
+ switch (snapshotSpec.getStateBackendType()) {
case StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME:
- env.setStateBackend(new RocksDBStateBackend(new MemoryStateBackend()));
+ env.setStateBackend(new EmbeddedRocksDBStateBackend());
break;
case StateBackendLoader.MEMORY_STATE_BACKEND_NAME:
env.setStateBackend(new MemoryStateBackend());
break;
+ case StateBackendLoader.HASHMAP_STATE_BACKEND_NAME:
+ env.setStateBackend(new HashMapStateBackend());
+ break;
default:
throw new UnsupportedOperationException();
}
@@ -145,36 +158,30 @@ public class TypeSerializerSnapshotMigrationITCase extends SavepointMigrationTes
.map(new TestMapFunction())
.addSink(new MigrationTestUtils.AccumulatorCountingSink<>());
- if (executionMode == ExecutionMode.PERFORM_SAVEPOINT) {
- executeAndSavepoint(
+ final String snapshotPath = getSnapshotPath(snapshotSpec);
+
+ if (executionMode == ExecutionMode.CREATE_SNAPSHOT) {
+ executeAndSnapshot(
env,
- "src/test/resources/" + getSavepointPath(testMigrateVersion, testStateBackend),
- new Tuple2<>(
+ "src/test/resources/" + snapshotPath,
+ snapshotSpec.getSnapshotType(),
+ Tuple2.of(
MigrationTestUtils.AccumulatorCountingSink.NUM_ELEMENTS_ACCUMULATOR,
NUM_SOURCE_ELEMENTS));
- } else {
+ } else if (executionMode == ExecutionMode.VERIFY_SNAPSHOT) {
restoreAndExecute(
env,
- getResourceFilename(getSavepointPath(testMigrateVersion, testStateBackend)),
- new Tuple2<>(
+ getResourceFilename(snapshotPath),
+ Tuple2.of(
MigrationTestUtils.AccumulatorCountingSink.NUM_ELEMENTS_ACCUMULATOR,
NUM_SOURCE_ELEMENTS));
+ } else {
+ throw new IllegalStateException("Unknown ExecutionMode " + executionMode);
}
}
- private String getSavepointPath(FlinkVersion savepointVersion, String backendType) {
- switch (backendType) {
- case "rocksdb":
- return "type-serializer-snapshot-migration-itcase-flink"
- + savepointVersion
- + "-rocksdb-savepoint";
- case "jobmanager":
- return "type-serializer-snapshot-migration-itcase-flink"
- + savepointVersion
- + "-savepoint";
- default:
- throw new UnsupportedOperationException();
- }
+ private String getSnapshotPath(SnapshotSpec snapshotSpec) {
+ return "type-serializer-snapshot-migration-itcase-" + snapshotSpec;
}
private static class TestMapFunction
diff --git a/pom.xml b/pom.xml
index 14de924..d471a8e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1494,7 +1494,9 @@ under the License.
<exclude>**/src/test/resources/**/test-data</exclude>
<exclude>**/src/test/resources/*-snapshot</exclude>
<exclude>**/src/test/resources/*.snapshot</exclude>
- <exclude>**/src/test/resources/*-savepoint</exclude>
+ <exclude>**/src/test/resources/*-savepoint/**</exclude>
+ <exclude>**/src/test/resources/*-savepoint-native/**</exclude>
+ <exclude>**/src/test/resources/*-checkpoint/**</exclude>
<exclude>flink-core/src/test/resources/serialized-kryo-serializer-1.3</exclude>
<exclude>flink-core/src/test/resources/type-without-avro-serialized-using-kryo</exclude>
<exclude>flink-formats/flink-avro/src/test/resources/flink-1.4-serializer-java-serialized</exclude>