You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2020/02/25 22:55:44 UTC
[flink] 05/07: [FLINK-16192][checkpointing] Remove remaining bits
of "legacy state" and Savepoint 1.2 compatibility
This is an automated email from the ASF dual-hosted git repository.
sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit b8277f92cfd1fa80325194bd705d1a13293b30e3
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Thu Feb 20 18:19:51 2020 +0100
[FLINK-16192][checkpointing] Remove remaining bits of "legacy state" and Savepoint 1.2 compatibility
"Legacy State" Refers to the state originally created by the old "Checkpointed" interface, before state was re-scalable.
Because some of the tests for 1.3 compatibility used checkpoints with that legacy state, some 1.3 migration
tests had to be removed as well.
---
.../fs/bucketing/BucketingSinkMigrationTest.java | 1 -
...bucketing-sink-migration-test-flink1.2-snapshot | Bin 1623 -> 0 bytes
.../kafka/FlinkKafkaConsumerBaseMigrationTest.java | 5 +-
...er-migration-test-flink1.2-empty-state-snapshot | Bin 240 -> 0 bytes
...kafka-consumer-migration-test-flink1.2-snapshot | Bin 1022 -> 0 bytes
.../testutils/migration/MigrationVersion.java | 1 -
.../ContinuousFileProcessingMigrationTest.java | 1 -
...-migration-test-1493116191000-flink1.2-snapshot | Bin 373 -> 0 bytes
.../reader-migration-test-flink1.2-snapshot | Bin 2590 -> 0 bytes
.../flink/runtime/checkpoint/Checkpoints.java | 9 +-
.../runtime/checkpoint/savepoint/Savepoint.java | 13 -
.../checkpoint/savepoint/SavepointSerializers.java | 14 -
.../savepoint/SavepointV1Serializer.java | 322 +--------------------
.../runtime/checkpoint/savepoint/SavepointV2.java | 170 -----------
.../savepoint/SavepointV2Serializer.java | 19 +-
.../savepoint/SavepointV1SerializerTest.java | 64 ----
...HeapKeyedStateBackendSnapshotMigrationTest.java | 135 ---------
.../resources/heap_keyed_statebackend_1_2.snapshot | Bin 2068 -> 0 bytes
.../windowing/WindowOperatorMigrationTest.java | 1 -
...-migration-test-accum-aligned-flink1.2-snapshot | Bin 222 -> 0 bytes
...p-migration-test-aggr-aligned-flink1.2-snapshot | Bin 187 -> 0 bytes
...gration-test-apply-event-time-flink1.2-snapshot | Bin 2193 -> 0 bytes
...on-test-apply-processing-time-flink1.2-snapshot | Bin 2081 -> 0 bytes
...tion-test-kryo-serialized-key-flink1.2-snapshot | Bin 3686 -> 0 bytes
...ration-test-reduce-event-time-flink1.2-snapshot | Bin 1988 -> 0 bytes
...n-test-reduce-processing-time-flink1.2-snapshot | Bin 1925 -> 0 bytes
...session-with-stateful-trigger-flink1.2-snapshot | Bin 3703 -> 0 bytes
...on-with-stateful-trigger-mint-flink1.2-snapshot | Bin 494 -> 0 bytes
.../LegacyStatefulJobSavepointMigrationITCase.java | 4 -
.../utils/SavepointMigrationTestBase.java | 13 -
.../restore/AbstractOperatorRestoreTestBase.java | 13 -
.../AbstractKeyedOperatorRestoreTestBase.java | 1 -
.../AbstractNonKeyedOperatorRestoreTestBase.java | 1 -
.../operatorstate/complexKeyed-flink1.2/_metadata | Bin 134953 -> 0 bytes
.../operatorstate/nonKeyed-flink1.2/_metadata | Bin 3212 -> 0 bytes
...udf-migration-itcase-flink1.2-rocksdb-savepoint | Bin 25256 -> 0 bytes
...tateful-udf-migration-itcase-flink1.2-savepoint | Bin 25245 -> 0 bytes
.../_metadata | Bin 36467 -> 0 bytes
.../_metadata | Bin 36395 -> 0 bytes
39 files changed, 13 insertions(+), 774 deletions(-)
diff --git a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkMigrationTest.java b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkMigrationTest.java
index b9cdea0..5bd0fcf 100644
--- a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkMigrationTest.java
+++ b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkMigrationTest.java
@@ -86,7 +86,6 @@ public class BucketingSinkMigrationTest {
@Parameterized.Parameters(name = "Migration Savepoint / Bucket Files Prefix: {0}")
public static Collection<Tuple2<MigrationVersion, String>> parameters () {
return Arrays.asList(
- Tuple2.of(MigrationVersion.v1_2, "/var/folders/v_/ry2wp5fx0y7c1rvr41xy9_700000gn/T/junit9160378385359106772/junit479663758539998903/1970-01-01--01/part-0-"),
Tuple2.of(MigrationVersion.v1_3, "/var/folders/tv/b_1d8fvx23dgk1_xs8db_95h0000gn/T/junit4273542175898623023/junit3801102997056424640/1970-01-01--01/part-0-"),
Tuple2.of(MigrationVersion.v1_4, "/var/folders/tv/b_1d8fvx23dgk1_xs8db_95h0000gn/T/junit3198043255809479705/junit8947526563966405708/1970-01-01--01/part-0-"),
Tuple2.of(MigrationVersion.v1_5, "/tmp/junit4927100426019463155/junit2465610012100182280/1970-01-01--00/part-0-"),
diff --git a/flink-connectors/flink-connector-filesystem/src/test/resources/bucketing-sink-migration-test-flink1.2-snapshot b/flink-connectors/flink-connector-filesystem/src/test/resources/bucketing-sink-migration-test-flink1.2-snapshot
deleted file mode 100644
index a541bad..0000000
Binary files a/flink-connectors/flink-connector-filesystem/src/test/resources/bucketing-sink-migration-test-flink1.2-snapshot and /dev/null differ
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseMigrationTest.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseMigrationTest.java
index 4b888c6..4501590 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseMigrationTest.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseMigrationTest.java
@@ -100,7 +100,6 @@ public class FlinkKafkaConsumerBaseMigrationTest {
@Parameterized.Parameters(name = "Migration Savepoint: {0}")
public static Collection<MigrationVersion> parameters () {
return Arrays.asList(
- MigrationVersion.v1_2,
MigrationVersion.v1_3,
MigrationVersion.v1_4,
MigrationVersion.v1_5,
@@ -260,7 +259,7 @@ public class FlinkKafkaConsumerBaseMigrationTest {
testHarness.open();
- // the expected state in "kafka-consumer-migration-test-flink1.2-snapshot-empty-state";
+ // the expected state in "kafka-consumer-migration-test-flink1.x-snapshot-empty-state";
// all new partitions after the snapshot are considered as partitions that were created while the
// consumer wasn't running, and should start from the earliest offset.
final HashMap<KafkaTopicPartition, Long> expectedSubscribedPartitionsWithStartOffsets = new HashMap<>();
@@ -332,7 +331,7 @@ public class FlinkKafkaConsumerBaseMigrationTest {
*/
@Test
public void testRestoreFailsWithNonEmptyPreFlink13StatesIfDiscoveryEnabled() throws Exception {
- assumeTrue(testMigrateVersion == MigrationVersion.v1_3 || testMigrateVersion == MigrationVersion.v1_2);
+ assumeTrue(testMigrateVersion == MigrationVersion.v1_3);
final List<KafkaTopicPartition> partitions = new ArrayList<>(PARTITION_STATE.keySet());
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/resources/kafka-consumer-migration-test-flink1.2-empty-state-snapshot b/flink-connectors/flink-connector-kafka-base/src/test/resources/kafka-consumer-migration-test-flink1.2-empty-state-snapshot
deleted file mode 100644
index 45047ee..0000000
Binary files a/flink-connectors/flink-connector-kafka-base/src/test/resources/kafka-consumer-migration-test-flink1.2-empty-state-snapshot and /dev/null differ
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/resources/kafka-consumer-migration-test-flink1.2-snapshot b/flink-connectors/flink-connector-kafka-base/src/test/resources/kafka-consumer-migration-test-flink1.2-snapshot
deleted file mode 100644
index f0be11a..0000000
Binary files a/flink-connectors/flink-connector-kafka-base/src/test/resources/kafka-consumer-migration-test-flink1.2-snapshot and /dev/null differ
diff --git a/flink-core/src/test/java/org/apache/flink/testutils/migration/MigrationVersion.java b/flink-core/src/test/java/org/apache/flink/testutils/migration/MigrationVersion.java
index 4122c5d..5a6260a 100644
--- a/flink-core/src/test/java/org/apache/flink/testutils/migration/MigrationVersion.java
+++ b/flink-core/src/test/java/org/apache/flink/testutils/migration/MigrationVersion.java
@@ -26,7 +26,6 @@ public enum MigrationVersion {
// NOTE: the version strings must not change,
// as they are used to locate snapshot file paths
- v1_2("1.2"),
v1_3("1.3"),
v1_4("1.4"),
v1_5("1.5"),
diff --git a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingMigrationTest.java b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingMigrationTest.java
index 5f3bc48..8b81247 100644
--- a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingMigrationTest.java
+++ b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingMigrationTest.java
@@ -75,7 +75,6 @@ public class ContinuousFileProcessingMigrationTest {
@Parameterized.Parameters(name = "Migration Savepoint / Mod Time: {0}")
public static Collection<Tuple2<MigrationVersion, Long>> parameters () {
return Arrays.asList(
- Tuple2.of(MigrationVersion.v1_2, 1493116191000L),
Tuple2.of(MigrationVersion.v1_3, 1496532000000L),
Tuple2.of(MigrationVersion.v1_4, 1516897628000L),
Tuple2.of(MigrationVersion.v1_5, 1533639934000L),
diff --git a/flink-fs-tests/src/test/resources/monitoring-function-migration-test-1493116191000-flink1.2-snapshot b/flink-fs-tests/src/test/resources/monitoring-function-migration-test-1493116191000-flink1.2-snapshot
deleted file mode 100644
index 25451ac..0000000
Binary files a/flink-fs-tests/src/test/resources/monitoring-function-migration-test-1493116191000-flink1.2-snapshot and /dev/null differ
diff --git a/flink-fs-tests/src/test/resources/reader-migration-test-flink1.2-snapshot b/flink-fs-tests/src/test/resources/reader-migration-test-flink1.2-snapshot
deleted file mode 100644
index b22ceee..0000000
Binary files a/flink-fs-tests/src/test/resources/reader-migration-test-flink1.2-snapshot and /dev/null differ
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/Checkpoints.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/Checkpoints.java
index fa84742..f593fe8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/Checkpoints.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/Checkpoints.java
@@ -113,7 +113,6 @@ public class Checkpoints {
}
}
- @SuppressWarnings("deprecation")
public static CompletedCheckpoint loadAndValidateCheckpoint(
JobID jobId,
Map<JobVertexID, ExecutionJobVertex> tasks,
@@ -130,16 +129,12 @@ public class Checkpoints {
final String checkpointPointer = location.getExternalPointer();
// (1) load the savepoint
- final Savepoint rawCheckpointMetadata;
+ final Savepoint checkpointMetadata;
try (InputStream in = metadataHandle.openInputStream()) {
DataInputStream dis = new DataInputStream(in);
- rawCheckpointMetadata = loadCheckpointMetadata(dis, classLoader);
+ checkpointMetadata = loadCheckpointMetadata(dis, classLoader);
}
- final Savepoint checkpointMetadata = rawCheckpointMetadata.getTaskStates() == null ?
- rawCheckpointMetadata :
- SavepointV2.convertToOperatorStateSavepointV2(tasks, rawCheckpointMetadata);
-
// generate mapping from operator to task
Map<OperatorID, ExecutionJobVertex> operatorToJobVertexMapping = new HashMap<>();
for (ExecutionJobVertex task : tasks.values()) {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/Savepoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/Savepoint.java
index 468b12f..0867c14 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/Savepoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/Savepoint.java
@@ -22,7 +22,6 @@ import org.apache.flink.core.io.Versioned;
import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
import org.apache.flink.runtime.checkpoint.MasterState;
import org.apache.flink.runtime.checkpoint.OperatorState;
-import org.apache.flink.runtime.checkpoint.TaskState;
import org.apache.flink.util.Disposable;
import java.util.Collection;
@@ -51,17 +50,6 @@ public interface Savepoint extends Disposable, Versioned {
long getCheckpointId();
/**
- * Returns the snapshotted task states.
- *
- * <p>These are used to restore the snapshot state.
- *
- * @deprecated Only kept for backwards-compatibility with versionS < 1.3. Will be removed in the future.
- * @return Snapshotted task states
- */
- @Deprecated
- Collection<TaskState> getTaskStates();
-
- /**
* Gets the checkpointed states generated by the master.
*/
Collection<MasterState> getMasterStates();
@@ -74,5 +62,4 @@ public interface Savepoint extends Disposable, Versioned {
* @return Snapshotted operator states
*/
Collection<OperatorState> getOperatorStates();
-
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointSerializers.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointSerializers.java
index 256aee1..078471f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointSerializers.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointSerializers.java
@@ -18,8 +18,6 @@
package org.apache.flink.runtime.checkpoint.savepoint;
-import org.apache.flink.annotation.VisibleForTesting;
-
import java.util.HashMap;
import java.util.Map;
@@ -30,9 +28,6 @@ import java.util.Map;
*/
public class SavepointSerializers {
- /** If this flag is true, restoring a savepoint fails if it contains legacy state (<= Flink 1.1 format). */
- static boolean failWhenLegacyStateDetected = true;
-
private static final Map<Integer, SavepointSerializer> SERIALIZERS = new HashMap<>(2);
static {
@@ -61,13 +56,4 @@ public class SavepointSerializers {
throw new IllegalArgumentException("Unrecognized checkpoint version number: " + version);
}
}
-
- /**
- * This is only visible as a temporary solution to keep the stateful job migration it cases working from binary
- * savepoints that still contain legacy state (<= Flink 1.1).
- */
- @VisibleForTesting
- public static void setFailWhenLegacyStateDetected(boolean fail) {
- failWhenLegacyStateDetected = fail;
- }
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Serializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Serializer.java
index e0dd8b9..9af5dde 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Serializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Serializer.java
@@ -19,341 +19,27 @@
package org.apache.flink.runtime.checkpoint.savepoint;
import org.apache.flink.annotation.Internal;
-import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.runtime.checkpoint.SubtaskState;
-import org.apache.flink.runtime.checkpoint.TaskState;
-import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.state.ChainedStateHandle;
-import org.apache.flink.runtime.state.KeyGroupRange;
-import org.apache.flink.runtime.state.KeyGroupRangeOffsets;
-import org.apache.flink.runtime.state.KeyGroupsStateHandle;
-import org.apache.flink.runtime.state.KeyedStateHandle;
-import org.apache.flink.runtime.state.OperatorStateHandle;
-import org.apache.flink.runtime.state.OperatorStreamStateHandle;
-import org.apache.flink.runtime.state.StreamStateHandle;
-import org.apache.flink.runtime.state.filesystem.FileStateHandle;
-import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
-import org.apache.flink.util.Preconditions;
import java.io.DataInputStream;
-import java.io.DataOutputStream;
import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
/**
- * Deserializer for checkpoints written in format {@code 1} (Flink 1.2.x format)
- *
- * <p>In contrast to the previous versions, this serializer makes sure that no Java
- * serialization is used for serialization. Therefore, we don't rely on any involved
- * classes to stay the same.
+ * Deserializer for checkpoints written in format {@code 1} (Flink 1.2.x format).
+ * This class is only retained to give a better error message: Rather than getting a "unknown version",
+ * the user gets a "version no longer supported".
*/
@Internal
-@SuppressWarnings("deprecation")
public class SavepointV1Serializer implements SavepointSerializer {
/** The savepoint version. */
public static final int VERSION = 1;
- private static final byte NULL_HANDLE = 0;
- private static final byte BYTE_STREAM_STATE_HANDLE = 1;
- private static final byte FILE_STREAM_STATE_HANDLE = 2;
- private static final byte KEY_GROUPS_HANDLE = 3;
- private static final byte PARTITIONABLE_OPERATOR_STATE_HANDLE = 4;
-
public static final SavepointV1Serializer INSTANCE = new SavepointV1Serializer();
private SavepointV1Serializer() {}
@Override
public SavepointV2 deserialize(DataInputStream dis, ClassLoader cl) throws IOException {
- long checkpointId = dis.readLong();
-
- // Task states
- int numTaskStates = dis.readInt();
- List<TaskState> taskStates = new ArrayList<>(numTaskStates);
-
- for (int i = 0; i < numTaskStates; i++) {
- JobVertexID jobVertexId = new JobVertexID(dis.readLong(), dis.readLong());
- int parallelism = dis.readInt();
- int maxParallelism = dis.readInt();
- int chainLength = dis.readInt();
-
- // Add task state
- TaskState taskState = new TaskState(jobVertexId, parallelism, maxParallelism, chainLength);
- taskStates.add(taskState);
-
- // Sub task states
- int numSubTaskStates = dis.readInt();
-
- for (int j = 0; j < numSubTaskStates; j++) {
- int subtaskIndex = dis.readInt();
- SubtaskState subtaskState = deserializeSubtaskState(dis);
- taskState.putState(subtaskIndex, subtaskState);
- }
- }
-
- return new SavepointV2(checkpointId, taskStates);
- }
-
- private static SubtaskState deserializeSubtaskState(DataInputStream dis) throws IOException {
- // Duration field has been removed from SubtaskState
- dis.readLong();
-
- int len = dis.readInt();
-
- if (SavepointSerializers.failWhenLegacyStateDetected) {
- Preconditions.checkState(len == 0,
- "Legacy state (from Flink <= 1.1, created through the 'Checkpointed' interface) is " +
- "no longer supported starting from Flink 1.4. Please rewrite your job to use " +
- "'CheckpointedFunction' instead!");
-
- } else {
- for (int i = 0; i < len; ++i) {
- // absorb bytes from stream and ignore result
- deserializeStreamStateHandle(dis);
- }
- }
-
- len = dis.readInt();
- List<OperatorStateHandle> operatorStateBackend = new ArrayList<>(len);
- for (int i = 0; i < len; ++i) {
- OperatorStateHandle streamStateHandle = deserializeOperatorStateHandle(dis);
- operatorStateBackend.add(streamStateHandle);
- }
-
- len = dis.readInt();
- List<OperatorStateHandle> operatorStateStream = new ArrayList<>(len);
- for (int i = 0; i < len; ++i) {
- OperatorStateHandle streamStateHandle = deserializeOperatorStateHandle(dis);
- operatorStateStream.add(streamStateHandle);
- }
-
- KeyedStateHandle keyedStateBackend = deserializeKeyedStateHandle(dis);
-
- KeyedStateHandle keyedStateStream = deserializeKeyedStateHandle(dis);
-
- ChainedStateHandle<OperatorStateHandle> operatorStateBackendChain =
- new ChainedStateHandle<>(operatorStateBackend);
-
- ChainedStateHandle<OperatorStateHandle> operatorStateStreamChain =
- new ChainedStateHandle<>(operatorStateStream);
-
- return new SubtaskState(
- operatorStateBackendChain,
- operatorStateStreamChain,
- keyedStateBackend,
- keyedStateStream);
- }
-
- private static KeyedStateHandle deserializeKeyedStateHandle(DataInputStream dis) throws IOException {
- final int type = dis.readByte();
- if (NULL_HANDLE == type) {
- return null;
- } else if (KEY_GROUPS_HANDLE == type) {
- int startKeyGroup = dis.readInt();
- int numKeyGroups = dis.readInt();
- KeyGroupRange keyGroupRange = KeyGroupRange.of(startKeyGroup, startKeyGroup + numKeyGroups - 1);
- long[] offsets = new long[numKeyGroups];
- for (int i = 0; i < numKeyGroups; ++i) {
- offsets[i] = dis.readLong();
- }
- KeyGroupRangeOffsets keyGroupRangeOffsets = new KeyGroupRangeOffsets(
- keyGroupRange, offsets);
- StreamStateHandle stateHandle = deserializeStreamStateHandle(dis);
- return new KeyGroupsStateHandle(keyGroupRangeOffsets, stateHandle);
- } else {
- throw new IllegalStateException("Reading invalid KeyedStateHandle, type: " + type);
- }
- }
-
- private static OperatorStateHandle deserializeOperatorStateHandle(DataInputStream dis) throws IOException {
- final int type = dis.readByte();
- if (NULL_HANDLE == type) {
- return null;
- } else if (PARTITIONABLE_OPERATOR_STATE_HANDLE == type) {
- int mapSize = dis.readInt();
- Map<String, OperatorStateHandle.StateMetaInfo> offsetsMap = new HashMap<>(mapSize);
- for (int i = 0; i < mapSize; ++i) {
- String key = dis.readUTF();
-
- int modeOrdinal = dis.readByte();
- OperatorStateHandle.Mode mode = OperatorStateHandle.Mode.values()[modeOrdinal];
-
- long[] offsets = new long[dis.readInt()];
- for (int j = 0; j < offsets.length; ++j) {
- offsets[j] = dis.readLong();
- }
-
- OperatorStateHandle.StateMetaInfo metaInfo =
- new OperatorStateHandle.StateMetaInfo(offsets, mode);
- offsetsMap.put(key, metaInfo);
- }
- StreamStateHandle stateHandle = deserializeStreamStateHandle(dis);
- return new OperatorStreamStateHandle(offsetsMap, stateHandle);
- } else {
- throw new IllegalStateException("Reading invalid OperatorStateHandle, type: " + type);
- }
- }
-
- private static StreamStateHandle deserializeStreamStateHandle(DataInputStream dis) throws IOException {
- int type = dis.read();
- if (NULL_HANDLE == type) {
- return null;
- } else if (FILE_STREAM_STATE_HANDLE == type) {
- long size = dis.readLong();
- String pathString = dis.readUTF();
- return new FileStateHandle(new Path(pathString), size);
- } else if (BYTE_STREAM_STATE_HANDLE == type) {
- String handleName = dis.readUTF();
- int numBytes = dis.readInt();
- byte[] data = new byte[numBytes];
- dis.readFully(data);
- return new ByteStreamStateHandle(handleName, data);
- } else {
- throw new IOException("Unknown implementation of StreamStateHandle, code: " + type);
- }
- }
-
- // ------------------------------------------------------------------------
- // old format serialization, for testing
- // ------------------------------------------------------------------------
-
- @VisibleForTesting
- public static void serializeVersion1(long checkpointId, Collection<TaskState> taskStates, DataOutputStream dos) throws IOException {
- dos.writeLong(checkpointId);
-
- dos.writeInt(taskStates.size());
-
- for (TaskState taskState : taskStates) {
- // Vertex ID
- dos.writeLong(taskState.getJobVertexID().getLowerPart());
- dos.writeLong(taskState.getJobVertexID().getUpperPart());
-
- // Parallelism
- int parallelism = taskState.getParallelism();
- dos.writeInt(parallelism);
- dos.writeInt(taskState.getMaxParallelism());
- dos.writeInt(taskState.getChainLength());
-
- // Sub task states
- Map<Integer, SubtaskState> subtaskStateMap = taskState.getSubtaskStates();
- dos.writeInt(subtaskStateMap.size());
- for (Map.Entry<Integer, SubtaskState> entry : subtaskStateMap.entrySet()) {
- dos.writeInt(entry.getKey());
- serializeSubtaskState(entry.getValue(), dos);
- }
- }
- }
-
- private static void serializeSubtaskState(SubtaskState subtaskState, DataOutputStream dos) throws IOException {
- //backwards compatibility, do not remove
- dos.writeLong(-1L);
-
- //backwards compatibility (number of legacy state handles), do not remove
- dos.writeInt(0);
-
- ChainedStateHandle<OperatorStateHandle> operatorStateBackend = subtaskState.getManagedOperatorState();
-
- int len = operatorStateBackend != null ? operatorStateBackend.getLength() : 0;
- dos.writeInt(len);
- for (int i = 0; i < len; ++i) {
- OperatorStateHandle stateHandle = operatorStateBackend.get(i);
- serializeOperatorStateHandle(stateHandle, dos);
- }
-
- ChainedStateHandle<OperatorStateHandle> operatorStateFromStream = subtaskState.getRawOperatorState();
-
- len = operatorStateFromStream != null ? operatorStateFromStream.getLength() : 0;
- dos.writeInt(len);
- for (int i = 0; i < len; ++i) {
- OperatorStateHandle stateHandle = operatorStateFromStream.get(i);
- serializeOperatorStateHandle(stateHandle, dos);
- }
-
- KeyedStateHandle keyedStateBackend = subtaskState.getManagedKeyedState();
- serializeKeyedStateHandle(keyedStateBackend, dos);
-
- KeyedStateHandle keyedStateStream = subtaskState.getRawKeyedState();
- serializeKeyedStateHandle(keyedStateStream, dos);
- }
-
- private static void serializeStreamStateHandle(
- StreamStateHandle stateHandle, DataOutputStream dos) throws IOException {
-
- if (stateHandle == null) {
- dos.writeByte(NULL_HANDLE);
-
- } else if (stateHandle instanceof FileStateHandle) {
- dos.writeByte(FILE_STREAM_STATE_HANDLE);
- FileStateHandle fileStateHandle = (FileStateHandle) stateHandle;
- dos.writeLong(stateHandle.getStateSize());
- dos.writeUTF(fileStateHandle.getFilePath().toString());
-
- } else if (stateHandle instanceof ByteStreamStateHandle) {
- dos.writeByte(BYTE_STREAM_STATE_HANDLE);
- ByteStreamStateHandle byteStreamStateHandle = (ByteStreamStateHandle) stateHandle;
- dos.writeUTF(byteStreamStateHandle.getHandleName());
- byte[] internalData = byteStreamStateHandle.getData();
- dos.writeInt(internalData.length);
- dos.write(byteStreamStateHandle.getData());
-
- } else {
- throw new IOException("Unknown implementation of StreamStateHandle: " + stateHandle.getClass());
- }
-
- dos.flush();
- }
-
- private static void serializeKeyedStateHandle(
- KeyedStateHandle stateHandle, DataOutputStream dos) throws IOException {
-
- if (stateHandle == null) {
- dos.writeByte(NULL_HANDLE);
- } else if (stateHandle instanceof KeyGroupsStateHandle) {
- KeyGroupsStateHandle keyGroupsStateHandle = (KeyGroupsStateHandle) stateHandle;
-
- dos.writeByte(KEY_GROUPS_HANDLE);
- dos.writeInt(keyGroupsStateHandle.getKeyGroupRange().getStartKeyGroup());
- dos.writeInt(keyGroupsStateHandle.getKeyGroupRange().getNumberOfKeyGroups());
- for (int keyGroup : keyGroupsStateHandle.getKeyGroupRange()) {
- dos.writeLong(keyGroupsStateHandle.getOffsetForKeyGroup(keyGroup));
- }
- serializeStreamStateHandle(keyGroupsStateHandle.getDelegateStateHandle(), dos);
- } else {
- throw new IllegalStateException("Unknown KeyedStateHandle type: " + stateHandle.getClass());
- }
- }
-
- private static void serializeOperatorStateHandle(
- OperatorStateHandle stateHandle, DataOutputStream dos) throws IOException {
-
- if (stateHandle != null) {
- dos.writeByte(PARTITIONABLE_OPERATOR_STATE_HANDLE);
- Map<String, OperatorStateHandle.StateMetaInfo> partitionOffsetsMap =
- stateHandle.getStateNameToPartitionOffsets();
- dos.writeInt(partitionOffsetsMap.size());
- for (Map.Entry<String, OperatorStateHandle.StateMetaInfo> entry : partitionOffsetsMap.entrySet()) {
- dos.writeUTF(entry.getKey());
-
- OperatorStateHandle.StateMetaInfo stateMetaInfo = entry.getValue();
-
- int mode = stateMetaInfo.getDistributionMode().ordinal();
- dos.writeByte(mode);
-
- long[] offsets = stateMetaInfo.getOffsets();
- dos.writeInt(offsets.length);
- for (long offset : offsets) {
- dos.writeLong(offset);
- }
- }
- serializeStreamStateHandle(stateHandle.getDelegateStateHandle(), dos);
- } else {
- dos.writeByte(NULL_HANDLE);
- }
+ throw new IOException("This savepoint / checkpoint version (Flink 1.1 / 1.2) is no longer supported.");
}
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2.java
index 6dc628d..cf761d8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2.java
@@ -20,22 +20,8 @@ package org.apache.flink.runtime.checkpoint.savepoint;
import org.apache.flink.runtime.checkpoint.MasterState;
import org.apache.flink.runtime.checkpoint.OperatorState;
-import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
-import org.apache.flink.runtime.checkpoint.SubtaskState;
-import org.apache.flink.runtime.checkpoint.TaskState;
-import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
-import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.jobgraph.OperatorID;
-import org.apache.flink.runtime.state.ChainedStateHandle;
-import org.apache.flink.runtime.state.KeyedStateHandle;
-import org.apache.flink.runtime.state.OperatorStateHandle;
-import org.apache.flink.util.Preconditions;
import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -51,48 +37,15 @@ public class SavepointV2 implements Savepoint {
/** The checkpoint ID. */
private final long checkpointId;
- /**
- * The task states.
- * @deprecated Only kept for backwards-compatibility with versions < 1.3. Will be removed in the future.
- */
- @Deprecated
- private final Collection<TaskState> taskStates;
-
/** The operator states. */
private final Collection<OperatorState> operatorStates;
/** The states generated by the CheckpointCoordinator. */
private final Collection<MasterState> masterStates;
- /** @deprecated Only kept for backwards-compatibility with versions < 1.3. Will be removed in the future. */
- @Deprecated
- public SavepointV2(long checkpointId, Collection<TaskState> taskStates) {
- this(
- checkpointId,
- null,
- checkNotNull(taskStates, "taskStates"),
- Collections.<MasterState>emptyList()
- );
- }
-
public SavepointV2(long checkpointId, Collection<OperatorState> operatorStates, Collection<MasterState> masterStates) {
- this(
- checkpointId,
- checkNotNull(operatorStates, "operatorStates"),
- null,
- masterStates
- );
- }
-
- private SavepointV2(
- long checkpointId,
- Collection<OperatorState> operatorStates,
- Collection<TaskState> taskStates,
- Collection<MasterState> masterStates) {
-
this.checkpointId = checkpointId;
this.operatorStates = operatorStates;
- this.taskStates = taskStates;
this.masterStates = checkNotNull(masterStates, "masterStates");
}
@@ -112,11 +65,6 @@ public class SavepointV2 implements Savepoint {
}
@Override
- public Collection<TaskState> getTaskStates() {
- return taskStates;
- }
-
- @Override
public Collection<MasterState> getMasterStates() {
return masterStates;
}
@@ -134,122 +82,4 @@ public class SavepointV2 implements Savepoint {
public String toString() {
return "Checkpoint Metadata (version=" + VERSION + ')';
}
-
- /**
- * Converts the {@link Savepoint} containing {@link TaskState TaskStates} to an equivalent savepoint containing
- * {@link OperatorState OperatorStates}.
- *
- * @param savepoint savepoint to convert
- * @param tasks map of all vertices and their job vertex ids
- * @return converted completed checkpoint
- * @deprecated Only kept for backwards-compatibility with versions < 1.3. Will be removed in the future.
- * */
- @Deprecated
- public static Savepoint convertToOperatorStateSavepointV2(
- Map<JobVertexID, ExecutionJobVertex> tasks,
- Savepoint savepoint) {
-
- if (savepoint.getOperatorStates() != null) {
- return savepoint;
- }
-
- boolean expandedToLegacyIds = false;
-
- Map<OperatorID, OperatorState> operatorStates = new HashMap<>(savepoint.getTaskStates().size() << 1);
-
- for (TaskState taskState : savepoint.getTaskStates()) {
- ExecutionJobVertex jobVertex = tasks.get(taskState.getJobVertexID());
-
- // on the first time we can not find the execution job vertex for an id, we also consider alternative ids,
- // for example as generated from older flink versions, to provide backwards compatibility.
- if (jobVertex == null && !expandedToLegacyIds) {
- tasks = ExecutionJobVertex.includeLegacyJobVertexIDs(tasks);
- jobVertex = tasks.get(taskState.getJobVertexID());
- expandedToLegacyIds = true;
- }
-
- if (jobVertex == null) {
- throw new IllegalStateException(
- "Could not find task for state with ID " + taskState.getJobVertexID() + ". " +
- "When migrating a savepoint from a version < 1.3 please make sure that the topology was not " +
- "changed through removal of a stateful operator or modification of a chain containing a stateful " +
- "operator.");
- }
-
- List<OperatorID> operatorIDs = jobVertex.getOperatorIDs();
-
- Preconditions.checkArgument(
- jobVertex.getParallelism() == taskState.getParallelism(),
- "Detected change in parallelism during migration for task " + jobVertex.getJobVertexId() + "." +
- "When migrating a savepoint from a version < 1.3 please make sure that no changes were made " +
- "to the parallelism of stateful operators.");
-
- Preconditions.checkArgument(
- operatorIDs.size() == taskState.getChainLength(),
- "Detected change in chain length during migration for task " + jobVertex.getJobVertexId() + ". " +
- "When migrating a savepoint from a version < 1.3 please make sure that the topology was not " +
- "changed by modification of a chain containing a stateful operator.");
-
- for (int subtaskIndex = 0; subtaskIndex < jobVertex.getParallelism(); subtaskIndex++) {
- SubtaskState subtaskState;
- try {
- subtaskState = taskState.getState(subtaskIndex);
- } catch (Exception e) {
- throw new IllegalStateException(
- "Could not find subtask with index " + subtaskIndex + " for task " + jobVertex.getJobVertexId() + ". " +
- "When migrating a savepoint from a version < 1.3 please make sure that no changes were made " +
- "to the parallelism of stateful operators.",
- e);
- }
-
- if (subtaskState == null) {
- continue;
- }
-
- ChainedStateHandle<OperatorStateHandle> partitioneableState =
- subtaskState.getManagedOperatorState();
- ChainedStateHandle<OperatorStateHandle> rawOperatorState =
- subtaskState.getRawOperatorState();
-
- for (int chainIndex = 0; chainIndex < taskState.getChainLength(); chainIndex++) {
-
- // task consists of multiple operators so we have to break the state apart
- for (int operatorIndex = 0; operatorIndex < operatorIDs.size(); operatorIndex++) {
- OperatorID operatorID = operatorIDs.get(operatorIndex);
- OperatorState operatorState = operatorStates.get(operatorID);
-
- if (operatorState == null) {
- operatorState = new OperatorState(
- operatorID,
- jobVertex.getParallelism(),
- jobVertex.getMaxParallelism());
- operatorStates.put(operatorID, operatorState);
- }
-
- KeyedStateHandle managedKeyedState = null;
- KeyedStateHandle rawKeyedState = null;
-
- // only the head operator retains the keyed state
- if (operatorIndex == operatorIDs.size() - 1) {
- managedKeyedState = subtaskState.getManagedKeyedState();
- rawKeyedState = subtaskState.getRawKeyedState();
- }
-
- OperatorSubtaskState operatorSubtaskState = new OperatorSubtaskState(
- partitioneableState != null ? partitioneableState.get(operatorIndex) : null,
- rawOperatorState != null ? rawOperatorState.get(operatorIndex) : null,
- managedKeyedState,
- rawKeyedState);
-
- operatorState.putState(subtaskIndex, operatorSubtaskState);
- }
- }
- }
- }
-
- return new SavepointV2(
- savepoint.getCheckpointId(),
- operatorStates.values(),
- savepoint.getMasterStates());
- }
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2Serializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2Serializer.java
index 3d60e22..9e5dff6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2Serializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2Serializer.java
@@ -36,7 +36,6 @@ import org.apache.flink.runtime.state.StateHandleID;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.filesystem.FileStateHandle;
import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
-import org.apache.flink.util.Preconditions;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
@@ -290,22 +289,14 @@ public class SavepointV2Serializer implements SavepointSerializer {
// Duration field has been removed from SubtaskState, do not remove
long ignoredDuration = dis.readLong();
- // for compatibility, do not remove
- int len = dis.readInt();
-
- if (SavepointSerializers.failWhenLegacyStateDetected) {
- Preconditions.checkState(len == 0,
+ final int numLegacyTaskStates = dis.readInt();
+ if (numLegacyTaskStates > 0) {
+ throw new IOException(
"Legacy state (from Flink <= 1.1, created through the 'Checkpointed' interface) is " +
- "no longer supported starting from Flink 1.4. Please rewrite your job to use " +
- "'CheckpointedFunction' instead!");
- } else {
- for (int i = 0; i < len; ++i) {
- // absorb bytes from stream and ignore result
- deserializeStreamStateHandle(dis);
- }
+ "no longer supported.");
}
- len = dis.readInt();
+ int len = dis.readInt();
OperatorStateHandle operatorStateBackend = len == 0 ? null : deserializeOperatorStateHandle(dis);
len = dis.readInt();
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1SerializerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1SerializerTest.java
deleted file mode 100644
index da448f0..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1SerializerTest.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.checkpoint.savepoint;
-
-import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
-import org.apache.flink.core.memory.DataInputViewStreamWrapper;
-import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
-import org.apache.flink.runtime.checkpoint.TaskState;
-
-import org.junit.Test;
-
-import java.io.ByteArrayInputStream;
-import java.util.Collection;
-import java.util.Random;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-/**
- * Test that the Checkpoint Metadata V1 deserializer can deserialize a the metadata correctly
- * into the latest format.
- */
-public class SavepointV1SerializerTest {
-
- @Test
- public void testSerializeDeserializeV1() throws Exception {
- final Random r = new Random(42);
-
- for (int i = 0; i < 50; ++i) {
- final long checkpointId = i + 123123;
- final Collection<TaskState> taskStates = CheckpointTestUtils.createTaskStates(r, 1 + r.nextInt(64), 1 + r.nextInt(64));
-
- // Serialize
- final ByteArrayOutputStreamWithPos baos = new ByteArrayOutputStreamWithPos();
- SavepointV1Serializer.serializeVersion1(checkpointId, taskStates, new DataOutputViewStreamWrapper(baos));
- final byte[] bytes = baos.toByteArray();
-
- // Deserialize
- final SavepointV2 actual = SavepointV1Serializer.INSTANCE.deserialize(
- new DataInputViewStreamWrapper(new ByteArrayInputStream(bytes)),
- Thread.currentThread().getContextClassLoader());
-
- assertEquals(checkpointId, actual.getCheckpointId());
- assertEquals(taskStates, actual.getTaskStates());
- assertTrue(actual.getMasterStates().isEmpty());
- }
- }
-}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackendSnapshotMigrationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackendSnapshotMigrationTest.java
index 23e74cd..bbc457b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackendSnapshotMigrationTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackendSnapshotMigrationTest.java
@@ -19,15 +19,12 @@
package org.apache.flink.runtime.state.heap;
import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.StateObjectCollection;
-import org.apache.flink.runtime.state.KeyGroupsStateHandle;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.SnapshotResult;
-import org.apache.flink.runtime.state.internal.InternalListState;
import org.apache.flink.runtime.state.internal.InternalMapState;
import org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory;
import org.apache.flink.util.InstantiationUtil;
@@ -38,12 +35,10 @@ import org.junit.Test;
import java.io.BufferedInputStream;
import java.io.FileInputStream;
import java.net.URL;
-import java.util.Collections;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.RunnableFuture;
-import static java.util.Arrays.asList;
import static org.junit.Assert.assertEquals;
/**
@@ -203,134 +198,4 @@ public class HeapKeyedStateBackendSnapshotMigrationTest extends HeapStateBackend
// }
// }
// }
-
- /**
- * [FLINK-5979]
- *
- * <p>This test takes a snapshot that was created with Flink 1.2 and tries to restore it in master to check
- * the backwards compatibility of the serialization format of {@link StateTable}s.
- */
- @Test
- public void testRestore1_2ToMaster() throws Exception {
-
- ClassLoader cl = getClass().getClassLoader();
- URL resource = cl.getResource("heap_keyed_statebackend_1_2.snapshot");
-
- Preconditions.checkNotNull(resource, "Binary snapshot resource not found!");
-
- final Integer namespace1 = 1;
- final Integer namespace2 = 2;
- final Integer namespace3 = 3;
-
- final KeyGroupsStateHandle stateHandle;
- try (BufferedInputStream bis = new BufferedInputStream((new FileInputStream(resource.getFile())))) {
- stateHandle = InstantiationUtil.deserializeObject(bis, Thread.currentThread().getContextClassLoader());
- }
- try (final HeapKeyedStateBackend<String> keyedBackend = createKeyedBackend(StateObjectCollection.singleton(stateHandle))) {
- final ListStateDescriptor<Long> stateDescr = new ListStateDescriptor<>("my-state", Long.class);
- stateDescr.initializeSerializerUnlessSet(new ExecutionConfig());
-
- InternalListState<String, Integer, Long> state = keyedBackend.createInternalState(IntSerializer.INSTANCE, stateDescr);
-
- assertEquals(7, keyedBackend.numKeyValueStateEntries());
-
- keyedBackend.setCurrentKey("abc");
- state.setCurrentNamespace(namespace1);
- assertEquals(asList(33L, 55L), state.get());
- state.setCurrentNamespace(namespace2);
- assertEquals(asList(22L, 11L), state.get());
- state.setCurrentNamespace(namespace3);
- assertEquals(Collections.singletonList(44L), state.get());
-
- keyedBackend.setCurrentKey("def");
- state.setCurrentNamespace(namespace1);
- assertEquals(asList(11L, 44L), state.get());
-
- state.setCurrentNamespace(namespace3);
- assertEquals(asList(22L, 55L, 33L), state.get());
-
- keyedBackend.setCurrentKey("jkl");
- state.setCurrentNamespace(namespace1);
- assertEquals(asList(11L, 22L, 33L, 44L, 55L), state.get());
-
- keyedBackend.setCurrentKey("mno");
- state.setCurrentNamespace(namespace3);
- assertEquals(asList(11L, 22L, 33L, 44L, 55L), state.get());
- }
- }
-
-// /**
-// * This code was used to create the binary file of the old version's snapshot used by this test. If you need to
-// * recreate the binary, you can comment this out and run it.
-// */
-// private void createBinarySnapshot() throws Exception {
-//
-// final String pathToWrite = "/PATH/TO/WRITE";
-//
-// final ListStateDescriptor<Long> stateDescr = new ListStateDescriptor<>("my-state", Long.class);
-// stateDescr.initializeSerializerUnlessSet(new ExecutionConfig());
-//
-// final Integer namespace1 = 1;
-// final Integer namespace2 = 2;
-// final Integer namespace3 = 3;
-//
-// final HeapKeyedStateBackend<String> keyedBackend = createKeyedBackend();
-//
-// try {
-// InternalListState<Integer, Long> state = keyedBackend.createListState(IntSerializer.INSTANCE, stateDescr);
-//
-// keyedBackend.setCurrentKey("abc");
-// state.setCurrentNamespace(namespace1);
-// state.add(33L);
-// state.add(55L);
-//
-// state.setCurrentNamespace(namespace2);
-// state.add(22L);
-// state.add(11L);
-//
-// state.setCurrentNamespace(namespace3);
-// state.add(44L);
-//
-// keyedBackend.setCurrentKey("def");
-// state.setCurrentNamespace(namespace1);
-// state.add(11L);
-// state.add(44L);
-//
-// state.setCurrentNamespace(namespace3);
-// state.add(22L);
-// state.add(55L);
-// state.add(33L);
-//
-// keyedBackend.setCurrentKey("jkl");
-// state.setCurrentNamespace(namespace1);
-// state.add(11L);
-// state.add(22L);
-// state.add(33L);
-// state.add(44L);
-// state.add(55L);
-//
-// keyedBackend.setCurrentKey("mno");
-// state.setCurrentNamespace(namespace3);
-// state.add(11L);
-// state.add(22L);
-// state.add(33L);
-// state.add(44L);
-// state.add(55L);
-// RunnableFuture<KeyGroupsStateHandle> snapshot = keyedBackend.snapshot(
-// 0L,
-// 0L,
-// new MemCheckpointStreamFactory(4 * 1024 * 1024),
-// CheckpointOptions.forCheckpointWithDefaultLocation());
-//
-// snapshot.run();
-//
-// try (BufferedOutputStream bis = new BufferedOutputStream(new FileOutputStream(pathToWrite))) {
-// InstantiationUtil.serializeObject(bis, snapshot.get());
-// }
-//
-// } finally {
-// keyedBackend.close();
-// keyedBackend.dispose();
-// }
-// }
}
diff --git a/flink-runtime/src/test/resources/heap_keyed_statebackend_1_2.snapshot b/flink-runtime/src/test/resources/heap_keyed_statebackend_1_2.snapshot
deleted file mode 100644
index b9171bc..0000000
Binary files a/flink-runtime/src/test/resources/heap_keyed_statebackend_1_2.snapshot and /dev/null differ
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorMigrationTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorMigrationTest.java
index 4939bd2..e304661 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorMigrationTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorMigrationTest.java
@@ -86,7 +86,6 @@ public class WindowOperatorMigrationTest {
@Parameterized.Parameters(name = "Migration Savepoint: {0}")
public static Collection<MigrationVersion> parameters () {
return Arrays.asList(
- MigrationVersion.v1_2,
MigrationVersion.v1_3,
MigrationVersion.v1_4,
MigrationVersion.v1_5,
diff --git a/flink-streaming-java/src/test/resources/win-op-migration-test-accum-aligned-flink1.2-snapshot b/flink-streaming-java/src/test/resources/win-op-migration-test-accum-aligned-flink1.2-snapshot
deleted file mode 100644
index d182bee..0000000
Binary files a/flink-streaming-java/src/test/resources/win-op-migration-test-accum-aligned-flink1.2-snapshot and /dev/null differ
diff --git a/flink-streaming-java/src/test/resources/win-op-migration-test-aggr-aligned-flink1.2-snapshot b/flink-streaming-java/src/test/resources/win-op-migration-test-aggr-aligned-flink1.2-snapshot
deleted file mode 100644
index 83741c1..0000000
Binary files a/flink-streaming-java/src/test/resources/win-op-migration-test-aggr-aligned-flink1.2-snapshot and /dev/null differ
diff --git a/flink-streaming-java/src/test/resources/win-op-migration-test-apply-event-time-flink1.2-snapshot b/flink-streaming-java/src/test/resources/win-op-migration-test-apply-event-time-flink1.2-snapshot
deleted file mode 100644
index 3411df6..0000000
Binary files a/flink-streaming-java/src/test/resources/win-op-migration-test-apply-event-time-flink1.2-snapshot and /dev/null differ
diff --git a/flink-streaming-java/src/test/resources/win-op-migration-test-apply-processing-time-flink1.2-snapshot b/flink-streaming-java/src/test/resources/win-op-migration-test-apply-processing-time-flink1.2-snapshot
deleted file mode 100644
index 060397f..0000000
Binary files a/flink-streaming-java/src/test/resources/win-op-migration-test-apply-processing-time-flink1.2-snapshot and /dev/null differ
diff --git a/flink-streaming-java/src/test/resources/win-op-migration-test-kryo-serialized-key-flink1.2-snapshot b/flink-streaming-java/src/test/resources/win-op-migration-test-kryo-serialized-key-flink1.2-snapshot
deleted file mode 100644
index 3a4cc98..0000000
Binary files a/flink-streaming-java/src/test/resources/win-op-migration-test-kryo-serialized-key-flink1.2-snapshot and /dev/null differ
diff --git a/flink-streaming-java/src/test/resources/win-op-migration-test-reduce-event-time-flink1.2-snapshot b/flink-streaming-java/src/test/resources/win-op-migration-test-reduce-event-time-flink1.2-snapshot
deleted file mode 100644
index dad4630..0000000
Binary files a/flink-streaming-java/src/test/resources/win-op-migration-test-reduce-event-time-flink1.2-snapshot and /dev/null differ
diff --git a/flink-streaming-java/src/test/resources/win-op-migration-test-reduce-processing-time-flink1.2-snapshot b/flink-streaming-java/src/test/resources/win-op-migration-test-reduce-processing-time-flink1.2-snapshot
deleted file mode 100644
index 0edf25e..0000000
Binary files a/flink-streaming-java/src/test/resources/win-op-migration-test-reduce-processing-time-flink1.2-snapshot and /dev/null differ
diff --git a/flink-streaming-java/src/test/resources/win-op-migration-test-session-with-stateful-trigger-flink1.2-snapshot b/flink-streaming-java/src/test/resources/win-op-migration-test-session-with-stateful-trigger-flink1.2-snapshot
deleted file mode 100644
index d0afbe0..0000000
Binary files a/flink-streaming-java/src/test/resources/win-op-migration-test-session-with-stateful-trigger-flink1.2-snapshot and /dev/null differ
diff --git a/flink-streaming-java/src/test/resources/win-op-migration-test-session-with-stateful-trigger-mint-flink1.2-snapshot b/flink-streaming-java/src/test/resources/win-op-migration-test-session-with-stateful-trigger-mint-flink1.2-snapshot
deleted file mode 100644
index 7d0b6cc..0000000
Binary files a/flink-streaming-java/src/test/resources/win-op-migration-test-session-with-stateful-trigger-mint-flink1.2-snapshot and /dev/null differ
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/LegacyStatefulJobSavepointMigrationITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/LegacyStatefulJobSavepointMigrationITCase.java
index 9c2ff79..e40294e 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/LegacyStatefulJobSavepointMigrationITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/LegacyStatefulJobSavepointMigrationITCase.java
@@ -70,10 +70,6 @@ public class LegacyStatefulJobSavepointMigrationITCase extends SavepointMigratio
@Parameterized.Parameters(name = "Migrate Savepoint / Backend: {0}")
public static Collection<Tuple2<MigrationVersion, String>> parameters () {
return Arrays.asList(
- Tuple2.of(MigrationVersion.v1_2, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
- Tuple2.of(MigrationVersion.v1_2, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME),
- Tuple2.of(MigrationVersion.v1_3, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
- Tuple2.of(MigrationVersion.v1_3, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME),
Tuple2.of(MigrationVersion.v1_4, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
Tuple2.of(MigrationVersion.v1_4, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME));
}
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java
index eac68b1..de22ab2 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java
@@ -30,7 +30,6 @@ import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HeartbeatManagerOptions;
import org.apache.flink.configuration.TaskManagerOptions;
-import org.apache.flink.runtime.checkpoint.savepoint.SavepointSerializers;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
@@ -40,8 +39,6 @@ import org.apache.flink.test.util.TestBaseUtils;
import org.apache.flink.testutils.junit.category.AlsoRunWithLegacyScheduler;
import org.apache.commons.io.FileUtils;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.experimental.categories.Category;
@@ -77,16 +74,6 @@ public abstract class SavepointMigrationTestBase extends TestBaseUtils {
protected static final int DEFAULT_PARALLELISM = 4;
- @BeforeClass
- public static void before() {
- SavepointSerializers.setFailWhenLegacyStateDetected(false);
- }
-
- @AfterClass
- public static void after() {
- SavepointSerializers.setFailWhenLegacyStateDetected(true);
- }
-
protected static String getResourceFilename(String filename) {
ClassLoader cl = SavepointMigrationTestBase.class.getClassLoader();
URL resource = cl.getResource(filename);
diff --git a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java
index 9b00d49..b305c7e 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java
@@ -25,7 +25,6 @@ import org.apache.flink.api.common.time.Time;
import org.apache.flink.client.ClientUtils;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.runtime.checkpoint.CheckpointFailureReason;
-import org.apache.flink.runtime.checkpoint.savepoint.SavepointSerializers;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
@@ -41,8 +40,6 @@ import org.apache.flink.testutils.junit.category.AlsoRunWithLegacyScheduler;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.TestLogger;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@@ -104,16 +101,6 @@ public abstract class AbstractOperatorRestoreTestBase extends TestLogger {
this.allowNonRestoredState = allowNonRestoredState;
}
- @BeforeClass
- public static void beforeClass() {
- SavepointSerializers.setFailWhenLegacyStateDetected(false);
- }
-
- @AfterClass
- public static void after() {
- SavepointSerializers.setFailWhenLegacyStateDetected(true);
- }
-
@Test
public void testMigrationAndRestore() throws Throwable {
ClusterClient<?> clusterClient = cluster.getClusterClient();
diff --git a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/keyed/AbstractKeyedOperatorRestoreTestBase.java b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/keyed/AbstractKeyedOperatorRestoreTestBase.java
index c76f7b2..c8a966e 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/keyed/AbstractKeyedOperatorRestoreTestBase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/keyed/AbstractKeyedOperatorRestoreTestBase.java
@@ -42,7 +42,6 @@ public abstract class AbstractKeyedOperatorRestoreTestBase extends AbstractOpera
@Parameterized.Parameters(name = "Migrate Savepoint: {0}")
public static Collection<MigrationVersion> parameters () {
return Arrays.asList(
- MigrationVersion.v1_2,
MigrationVersion.v1_3,
MigrationVersion.v1_4,
MigrationVersion.v1_5,
diff --git a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/AbstractNonKeyedOperatorRestoreTestBase.java b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/AbstractNonKeyedOperatorRestoreTestBase.java
index c36700a..15e9d87 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/AbstractNonKeyedOperatorRestoreTestBase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/AbstractNonKeyedOperatorRestoreTestBase.java
@@ -48,7 +48,6 @@ public abstract class AbstractNonKeyedOperatorRestoreTestBase extends AbstractOp
@Parameterized.Parameters(name = "Migrate Savepoint: {0}")
public static Collection<MigrationVersion> parameters () {
return Arrays.asList(
- MigrationVersion.v1_2,
MigrationVersion.v1_3,
MigrationVersion.v1_4,
MigrationVersion.v1_5,
diff --git a/flink-tests/src/test/resources/operatorstate/complexKeyed-flink1.2/_metadata b/flink-tests/src/test/resources/operatorstate/complexKeyed-flink1.2/_metadata
deleted file mode 100644
index 0a1ed10..0000000
Binary files a/flink-tests/src/test/resources/operatorstate/complexKeyed-flink1.2/_metadata and /dev/null differ
diff --git a/flink-tests/src/test/resources/operatorstate/nonKeyed-flink1.2/_metadata b/flink-tests/src/test/resources/operatorstate/nonKeyed-flink1.2/_metadata
deleted file mode 100644
index 8fcd1ea..0000000
Binary files a/flink-tests/src/test/resources/operatorstate/nonKeyed-flink1.2/_metadata and /dev/null differ
diff --git a/flink-tests/src/test/resources/stateful-udf-migration-itcase-flink1.2-rocksdb-savepoint b/flink-tests/src/test/resources/stateful-udf-migration-itcase-flink1.2-rocksdb-savepoint
deleted file mode 100644
index 548993f..0000000
Binary files a/flink-tests/src/test/resources/stateful-udf-migration-itcase-flink1.2-rocksdb-savepoint and /dev/null differ
diff --git a/flink-tests/src/test/resources/stateful-udf-migration-itcase-flink1.2-savepoint b/flink-tests/src/test/resources/stateful-udf-migration-itcase-flink1.2-savepoint
deleted file mode 100644
index a8d19f2..0000000
Binary files a/flink-tests/src/test/resources/stateful-udf-migration-itcase-flink1.2-savepoint and /dev/null differ
diff --git a/flink-tests/src/test/resources/stateful-udf-migration-itcase-flink1.3-rocksdb-savepoint/_metadata b/flink-tests/src/test/resources/stateful-udf-migration-itcase-flink1.3-rocksdb-savepoint/_metadata
deleted file mode 100644
index 8f22bcb..0000000
Binary files a/flink-tests/src/test/resources/stateful-udf-migration-itcase-flink1.3-rocksdb-savepoint/_metadata and /dev/null differ
diff --git a/flink-tests/src/test/resources/stateful-udf-migration-itcase-flink1.3-savepoint/_metadata b/flink-tests/src/test/resources/stateful-udf-migration-itcase-flink1.3-savepoint/_metadata
deleted file mode 100644
index 8ca91ec..0000000
Binary files a/flink-tests/src/test/resources/stateful-udf-migration-itcase-flink1.3-savepoint/_metadata and /dev/null differ