You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2020/02/25 22:55:44 UTC

[flink] 05/07: [FLINK-16192][checkpointing] Remove remaining bits of "legacy state" and Savepoint 1.2 compatibility

This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit b8277f92cfd1fa80325194bd705d1a13293b30e3
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Thu Feb 20 18:19:51 2020 +0100

    [FLINK-16192][checkpointing] Remove remaining bits of "legacy state" and Savepoint 1.2 compatibility
    
    "Legacy State" Refers to the state originally created by the old "Checkpointed" interface, before state was re-scalable.
    Because some of the tests for 1.3 compatibility used checkpoints with that legacy state, some 1.3 migration
    tests had to be removed as well.
---
 .../fs/bucketing/BucketingSinkMigrationTest.java   |   1 -
 ...bucketing-sink-migration-test-flink1.2-snapshot | Bin 1623 -> 0 bytes
 .../kafka/FlinkKafkaConsumerBaseMigrationTest.java |   5 +-
 ...er-migration-test-flink1.2-empty-state-snapshot | Bin 240 -> 0 bytes
 ...kafka-consumer-migration-test-flink1.2-snapshot | Bin 1022 -> 0 bytes
 .../testutils/migration/MigrationVersion.java      |   1 -
 .../ContinuousFileProcessingMigrationTest.java     |   1 -
 ...-migration-test-1493116191000-flink1.2-snapshot | Bin 373 -> 0 bytes
 .../reader-migration-test-flink1.2-snapshot        | Bin 2590 -> 0 bytes
 .../flink/runtime/checkpoint/Checkpoints.java      |   9 +-
 .../runtime/checkpoint/savepoint/Savepoint.java    |  13 -
 .../checkpoint/savepoint/SavepointSerializers.java |  14 -
 .../savepoint/SavepointV1Serializer.java           | 322 +--------------------
 .../runtime/checkpoint/savepoint/SavepointV2.java  | 170 -----------
 .../savepoint/SavepointV2Serializer.java           |  19 +-
 .../savepoint/SavepointV1SerializerTest.java       |  64 ----
 ...HeapKeyedStateBackendSnapshotMigrationTest.java | 135 ---------
 .../resources/heap_keyed_statebackend_1_2.snapshot | Bin 2068 -> 0 bytes
 .../windowing/WindowOperatorMigrationTest.java     |   1 -
 ...-migration-test-accum-aligned-flink1.2-snapshot | Bin 222 -> 0 bytes
 ...p-migration-test-aggr-aligned-flink1.2-snapshot | Bin 187 -> 0 bytes
 ...gration-test-apply-event-time-flink1.2-snapshot | Bin 2193 -> 0 bytes
 ...on-test-apply-processing-time-flink1.2-snapshot | Bin 2081 -> 0 bytes
 ...tion-test-kryo-serialized-key-flink1.2-snapshot | Bin 3686 -> 0 bytes
 ...ration-test-reduce-event-time-flink1.2-snapshot | Bin 1988 -> 0 bytes
 ...n-test-reduce-processing-time-flink1.2-snapshot | Bin 1925 -> 0 bytes
 ...session-with-stateful-trigger-flink1.2-snapshot | Bin 3703 -> 0 bytes
 ...on-with-stateful-trigger-mint-flink1.2-snapshot | Bin 494 -> 0 bytes
 .../LegacyStatefulJobSavepointMigrationITCase.java |   4 -
 .../utils/SavepointMigrationTestBase.java          |  13 -
 .../restore/AbstractOperatorRestoreTestBase.java   |  13 -
 .../AbstractKeyedOperatorRestoreTestBase.java      |   1 -
 .../AbstractNonKeyedOperatorRestoreTestBase.java   |   1 -
 .../operatorstate/complexKeyed-flink1.2/_metadata  | Bin 134953 -> 0 bytes
 .../operatorstate/nonKeyed-flink1.2/_metadata      | Bin 3212 -> 0 bytes
 ...udf-migration-itcase-flink1.2-rocksdb-savepoint | Bin 25256 -> 0 bytes
 ...tateful-udf-migration-itcase-flink1.2-savepoint | Bin 25245 -> 0 bytes
 .../_metadata                                      | Bin 36467 -> 0 bytes
 .../_metadata                                      | Bin 36395 -> 0 bytes
 39 files changed, 13 insertions(+), 774 deletions(-)

diff --git a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkMigrationTest.java b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkMigrationTest.java
index b9cdea0..5bd0fcf 100644
--- a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkMigrationTest.java
+++ b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkMigrationTest.java
@@ -86,7 +86,6 @@ public class BucketingSinkMigrationTest {
 	@Parameterized.Parameters(name = "Migration Savepoint / Bucket Files Prefix: {0}")
 	public static Collection<Tuple2<MigrationVersion, String>> parameters () {
 		return Arrays.asList(
-			Tuple2.of(MigrationVersion.v1_2, "/var/folders/v_/ry2wp5fx0y7c1rvr41xy9_700000gn/T/junit9160378385359106772/junit479663758539998903/1970-01-01--01/part-0-"),
 			Tuple2.of(MigrationVersion.v1_3, "/var/folders/tv/b_1d8fvx23dgk1_xs8db_95h0000gn/T/junit4273542175898623023/junit3801102997056424640/1970-01-01--01/part-0-"),
 			Tuple2.of(MigrationVersion.v1_4, "/var/folders/tv/b_1d8fvx23dgk1_xs8db_95h0000gn/T/junit3198043255809479705/junit8947526563966405708/1970-01-01--01/part-0-"),
 			Tuple2.of(MigrationVersion.v1_5, "/tmp/junit4927100426019463155/junit2465610012100182280/1970-01-01--00/part-0-"),
diff --git a/flink-connectors/flink-connector-filesystem/src/test/resources/bucketing-sink-migration-test-flink1.2-snapshot b/flink-connectors/flink-connector-filesystem/src/test/resources/bucketing-sink-migration-test-flink1.2-snapshot
deleted file mode 100644
index a541bad..0000000
Binary files a/flink-connectors/flink-connector-filesystem/src/test/resources/bucketing-sink-migration-test-flink1.2-snapshot and /dev/null differ
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseMigrationTest.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseMigrationTest.java
index 4b888c6..4501590 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseMigrationTest.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseMigrationTest.java
@@ -100,7 +100,6 @@ public class FlinkKafkaConsumerBaseMigrationTest {
 	@Parameterized.Parameters(name = "Migration Savepoint: {0}")
 	public static Collection<MigrationVersion> parameters () {
 		return Arrays.asList(
-			MigrationVersion.v1_2,
 			MigrationVersion.v1_3,
 			MigrationVersion.v1_4,
 			MigrationVersion.v1_5,
@@ -260,7 +259,7 @@ public class FlinkKafkaConsumerBaseMigrationTest {
 
 		testHarness.open();
 
-		// the expected state in "kafka-consumer-migration-test-flink1.2-snapshot-empty-state";
+		// the expected state in "kafka-consumer-migration-test-flink1.x-snapshot-empty-state";
 		// all new partitions after the snapshot are considered as partitions that were created while the
 		// consumer wasn't running, and should start from the earliest offset.
 		final HashMap<KafkaTopicPartition, Long> expectedSubscribedPartitionsWithStartOffsets = new HashMap<>();
@@ -332,7 +331,7 @@ public class FlinkKafkaConsumerBaseMigrationTest {
 	 */
 	@Test
 	public void testRestoreFailsWithNonEmptyPreFlink13StatesIfDiscoveryEnabled() throws Exception {
-		assumeTrue(testMigrateVersion == MigrationVersion.v1_3 || testMigrateVersion == MigrationVersion.v1_2);
+		assumeTrue(testMigrateVersion == MigrationVersion.v1_3);
 
 		final List<KafkaTopicPartition> partitions = new ArrayList<>(PARTITION_STATE.keySet());
 
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/resources/kafka-consumer-migration-test-flink1.2-empty-state-snapshot b/flink-connectors/flink-connector-kafka-base/src/test/resources/kafka-consumer-migration-test-flink1.2-empty-state-snapshot
deleted file mode 100644
index 45047ee..0000000
Binary files a/flink-connectors/flink-connector-kafka-base/src/test/resources/kafka-consumer-migration-test-flink1.2-empty-state-snapshot and /dev/null differ
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/resources/kafka-consumer-migration-test-flink1.2-snapshot b/flink-connectors/flink-connector-kafka-base/src/test/resources/kafka-consumer-migration-test-flink1.2-snapshot
deleted file mode 100644
index f0be11a..0000000
Binary files a/flink-connectors/flink-connector-kafka-base/src/test/resources/kafka-consumer-migration-test-flink1.2-snapshot and /dev/null differ
diff --git a/flink-core/src/test/java/org/apache/flink/testutils/migration/MigrationVersion.java b/flink-core/src/test/java/org/apache/flink/testutils/migration/MigrationVersion.java
index 4122c5d..5a6260a 100644
--- a/flink-core/src/test/java/org/apache/flink/testutils/migration/MigrationVersion.java
+++ b/flink-core/src/test/java/org/apache/flink/testutils/migration/MigrationVersion.java
@@ -26,7 +26,6 @@ public enum MigrationVersion {
 
 	// NOTE: the version strings must not change,
 	// as they are used to locate snapshot file paths
-	v1_2("1.2"),
 	v1_3("1.3"),
 	v1_4("1.4"),
 	v1_5("1.5"),
diff --git a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingMigrationTest.java b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingMigrationTest.java
index 5f3bc48..8b81247 100644
--- a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingMigrationTest.java
+++ b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingMigrationTest.java
@@ -75,7 +75,6 @@ public class ContinuousFileProcessingMigrationTest {
 	@Parameterized.Parameters(name = "Migration Savepoint / Mod Time: {0}")
 	public static Collection<Tuple2<MigrationVersion, Long>> parameters () {
 		return Arrays.asList(
-			Tuple2.of(MigrationVersion.v1_2, 1493116191000L),
 			Tuple2.of(MigrationVersion.v1_3, 1496532000000L),
 			Tuple2.of(MigrationVersion.v1_4, 1516897628000L),
 			Tuple2.of(MigrationVersion.v1_5, 1533639934000L),
diff --git a/flink-fs-tests/src/test/resources/monitoring-function-migration-test-1493116191000-flink1.2-snapshot b/flink-fs-tests/src/test/resources/monitoring-function-migration-test-1493116191000-flink1.2-snapshot
deleted file mode 100644
index 25451ac..0000000
Binary files a/flink-fs-tests/src/test/resources/monitoring-function-migration-test-1493116191000-flink1.2-snapshot and /dev/null differ
diff --git a/flink-fs-tests/src/test/resources/reader-migration-test-flink1.2-snapshot b/flink-fs-tests/src/test/resources/reader-migration-test-flink1.2-snapshot
deleted file mode 100644
index b22ceee..0000000
Binary files a/flink-fs-tests/src/test/resources/reader-migration-test-flink1.2-snapshot and /dev/null differ
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/Checkpoints.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/Checkpoints.java
index fa84742..f593fe8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/Checkpoints.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/Checkpoints.java
@@ -113,7 +113,6 @@ public class Checkpoints {
 		}
 	}
 
-	@SuppressWarnings("deprecation")
 	public static CompletedCheckpoint loadAndValidateCheckpoint(
 			JobID jobId,
 			Map<JobVertexID, ExecutionJobVertex> tasks,
@@ -130,16 +129,12 @@ public class Checkpoints {
 		final String checkpointPointer = location.getExternalPointer();
 
 		// (1) load the savepoint
-		final Savepoint rawCheckpointMetadata;
+		final Savepoint checkpointMetadata;
 		try (InputStream in = metadataHandle.openInputStream()) {
 			DataInputStream dis = new DataInputStream(in);
-			rawCheckpointMetadata = loadCheckpointMetadata(dis, classLoader);
+			checkpointMetadata = loadCheckpointMetadata(dis, classLoader);
 		}
 
-		final Savepoint checkpointMetadata = rawCheckpointMetadata.getTaskStates() == null ?
-				rawCheckpointMetadata :
-				SavepointV2.convertToOperatorStateSavepointV2(tasks, rawCheckpointMetadata);
-
 		// generate mapping from operator to task
 		Map<OperatorID, ExecutionJobVertex> operatorToJobVertexMapping = new HashMap<>();
 		for (ExecutionJobVertex task : tasks.values()) {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/Savepoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/Savepoint.java
index 468b12f..0867c14 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/Savepoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/Savepoint.java
@@ -22,7 +22,6 @@ import org.apache.flink.core.io.Versioned;
 import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
 import org.apache.flink.runtime.checkpoint.MasterState;
 import org.apache.flink.runtime.checkpoint.OperatorState;
-import org.apache.flink.runtime.checkpoint.TaskState;
 import org.apache.flink.util.Disposable;
 
 import java.util.Collection;
@@ -51,17 +50,6 @@ public interface Savepoint extends Disposable, Versioned {
 	long getCheckpointId();
 
 	/**
-	 * Returns the snapshotted task states.
-	 *
-	 * <p>These are used to restore the snapshot state.
-	 *
-	 * @deprecated Only kept for backwards-compatibility with versionS < 1.3. Will be removed in the future.
-	 * @return Snapshotted task states
-	 */
-	@Deprecated
-	Collection<TaskState> getTaskStates();
-
-	/**
 	 * Gets the checkpointed states generated by the master.
 	 */
 	Collection<MasterState> getMasterStates();
@@ -74,5 +62,4 @@ public interface Savepoint extends Disposable, Versioned {
 	 * @return Snapshotted operator states
 	 */
 	Collection<OperatorState> getOperatorStates();
-
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointSerializers.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointSerializers.java
index 256aee1..078471f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointSerializers.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointSerializers.java
@@ -18,8 +18,6 @@
 
 package org.apache.flink.runtime.checkpoint.savepoint;
 
-import org.apache.flink.annotation.VisibleForTesting;
-
 import java.util.HashMap;
 import java.util.Map;
 
@@ -30,9 +28,6 @@ import java.util.Map;
  */
 public class SavepointSerializers {
 
-	/** If this flag is true, restoring a savepoint fails if it contains legacy state (<= Flink 1.1 format). */
-	static boolean failWhenLegacyStateDetected = true;
-
 	private static final Map<Integer, SavepointSerializer> SERIALIZERS = new HashMap<>(2);
 
 	static {
@@ -61,13 +56,4 @@ public class SavepointSerializers {
 			throw new IllegalArgumentException("Unrecognized checkpoint version number: " + version);
 		}
 	}
-
-	/**
-	 * This is only visible as a temporary solution to keep the stateful job migration it cases working from binary
-	 * savepoints that still contain legacy state (<= Flink 1.1).
-	 */
-	@VisibleForTesting
-	public static void setFailWhenLegacyStateDetected(boolean fail) {
-		failWhenLegacyStateDetected = fail;
-	}
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Serializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Serializer.java
index e0dd8b9..9af5dde 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Serializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Serializer.java
@@ -19,341 +19,27 @@
 package org.apache.flink.runtime.checkpoint.savepoint;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.runtime.checkpoint.SubtaskState;
-import org.apache.flink.runtime.checkpoint.TaskState;
-import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.state.ChainedStateHandle;
-import org.apache.flink.runtime.state.KeyGroupRange;
-import org.apache.flink.runtime.state.KeyGroupRangeOffsets;
-import org.apache.flink.runtime.state.KeyGroupsStateHandle;
-import org.apache.flink.runtime.state.KeyedStateHandle;
-import org.apache.flink.runtime.state.OperatorStateHandle;
-import org.apache.flink.runtime.state.OperatorStreamStateHandle;
-import org.apache.flink.runtime.state.StreamStateHandle;
-import org.apache.flink.runtime.state.filesystem.FileStateHandle;
-import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
-import org.apache.flink.util.Preconditions;
 
 import java.io.DataInputStream;
-import java.io.DataOutputStream;
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
 
 /**
- * Deserializer for checkpoints written in format {@code 1} (Flink 1.2.x format)
- *
- * <p>In contrast to the previous versions, this serializer makes sure that no Java
- * serialization is used for serialization. Therefore, we don't rely on any involved
- * classes to stay the same.
+ * Deserializer for checkpoints written in format {@code 1} (Flink 1.2.x format).
+ * This class is only retained to give a better error message: Rather than getting a "unknown version",
+ * the user gets a "version no longer supported".
  */
 @Internal
-@SuppressWarnings("deprecation")
 public class SavepointV1Serializer implements SavepointSerializer {
 
 	/** The savepoint version. */
 	public static final int VERSION = 1;
 
-	private static final byte NULL_HANDLE = 0;
-	private static final byte BYTE_STREAM_STATE_HANDLE = 1;
-	private static final byte FILE_STREAM_STATE_HANDLE = 2;
-	private static final byte KEY_GROUPS_HANDLE = 3;
-	private static final byte PARTITIONABLE_OPERATOR_STATE_HANDLE = 4;
-
 	public static final SavepointV1Serializer INSTANCE = new SavepointV1Serializer();
 
 	private SavepointV1Serializer() {}
 
 	@Override
 	public SavepointV2 deserialize(DataInputStream dis, ClassLoader cl) throws IOException {
-		long checkpointId = dis.readLong();
-
-		// Task states
-		int numTaskStates = dis.readInt();
-		List<TaskState> taskStates = new ArrayList<>(numTaskStates);
-
-		for (int i = 0; i < numTaskStates; i++) {
-			JobVertexID jobVertexId = new JobVertexID(dis.readLong(), dis.readLong());
-			int parallelism = dis.readInt();
-			int maxParallelism = dis.readInt();
-			int chainLength = dis.readInt();
-
-			// Add task state
-			TaskState taskState = new TaskState(jobVertexId, parallelism, maxParallelism, chainLength);
-			taskStates.add(taskState);
-
-			// Sub task states
-			int numSubTaskStates = dis.readInt();
-
-			for (int j = 0; j < numSubTaskStates; j++) {
-				int subtaskIndex = dis.readInt();
-				SubtaskState subtaskState = deserializeSubtaskState(dis);
-				taskState.putState(subtaskIndex, subtaskState);
-			}
-		}
-
-		return new SavepointV2(checkpointId, taskStates);
-	}
-
-	private static SubtaskState deserializeSubtaskState(DataInputStream dis) throws IOException {
-		// Duration field has been removed from SubtaskState
-		dis.readLong();
-
-		int len = dis.readInt();
-
-		if (SavepointSerializers.failWhenLegacyStateDetected) {
-			Preconditions.checkState(len == 0,
-				"Legacy state (from Flink <= 1.1, created through the 'Checkpointed' interface) is " +
-					"no longer supported starting from Flink 1.4. Please rewrite your job to use " +
-					"'CheckpointedFunction' instead!");
-
-		} else {
-			for (int i = 0; i < len; ++i) {
-				// absorb bytes from stream and ignore result
-				deserializeStreamStateHandle(dis);
-			}
-		}
-
-		len = dis.readInt();
-		List<OperatorStateHandle> operatorStateBackend = new ArrayList<>(len);
-		for (int i = 0; i < len; ++i) {
-			OperatorStateHandle streamStateHandle = deserializeOperatorStateHandle(dis);
-			operatorStateBackend.add(streamStateHandle);
-		}
-
-		len = dis.readInt();
-		List<OperatorStateHandle> operatorStateStream = new ArrayList<>(len);
-		for (int i = 0; i < len; ++i) {
-			OperatorStateHandle streamStateHandle = deserializeOperatorStateHandle(dis);
-			operatorStateStream.add(streamStateHandle);
-		}
-
-		KeyedStateHandle keyedStateBackend = deserializeKeyedStateHandle(dis);
-
-		KeyedStateHandle keyedStateStream = deserializeKeyedStateHandle(dis);
-
-		ChainedStateHandle<OperatorStateHandle> operatorStateBackendChain =
-				new ChainedStateHandle<>(operatorStateBackend);
-
-		ChainedStateHandle<OperatorStateHandle> operatorStateStreamChain =
-				new ChainedStateHandle<>(operatorStateStream);
-
-		return new SubtaskState(
-				operatorStateBackendChain,
-				operatorStateStreamChain,
-				keyedStateBackend,
-				keyedStateStream);
-	}
-
-	private static KeyedStateHandle deserializeKeyedStateHandle(DataInputStream dis) throws IOException {
-		final int type = dis.readByte();
-		if (NULL_HANDLE == type) {
-			return null;
-		} else if (KEY_GROUPS_HANDLE == type) {
-			int startKeyGroup = dis.readInt();
-			int numKeyGroups = dis.readInt();
-			KeyGroupRange keyGroupRange = KeyGroupRange.of(startKeyGroup, startKeyGroup + numKeyGroups - 1);
-			long[] offsets = new long[numKeyGroups];
-			for (int i = 0; i < numKeyGroups; ++i) {
-				offsets[i] = dis.readLong();
-			}
-			KeyGroupRangeOffsets keyGroupRangeOffsets = new KeyGroupRangeOffsets(
-				keyGroupRange, offsets);
-			StreamStateHandle stateHandle = deserializeStreamStateHandle(dis);
-			return new KeyGroupsStateHandle(keyGroupRangeOffsets, stateHandle);
-		} else {
-			throw new IllegalStateException("Reading invalid KeyedStateHandle, type: " + type);
-		}
-	}
-
-	private static OperatorStateHandle deserializeOperatorStateHandle(DataInputStream dis) throws IOException {
-		final int type = dis.readByte();
-		if (NULL_HANDLE == type) {
-			return null;
-		} else if (PARTITIONABLE_OPERATOR_STATE_HANDLE == type) {
-			int mapSize = dis.readInt();
-			Map<String, OperatorStateHandle.StateMetaInfo> offsetsMap = new HashMap<>(mapSize);
-			for (int i = 0; i < mapSize; ++i) {
-				String key = dis.readUTF();
-
-				int modeOrdinal = dis.readByte();
-				OperatorStateHandle.Mode mode = OperatorStateHandle.Mode.values()[modeOrdinal];
-
-				long[] offsets = new long[dis.readInt()];
-				for (int j = 0; j < offsets.length; ++j) {
-					offsets[j] = dis.readLong();
-				}
-
-				OperatorStateHandle.StateMetaInfo metaInfo =
-						new OperatorStateHandle.StateMetaInfo(offsets, mode);
-				offsetsMap.put(key, metaInfo);
-			}
-			StreamStateHandle stateHandle = deserializeStreamStateHandle(dis);
-			return new OperatorStreamStateHandle(offsetsMap, stateHandle);
-		} else {
-			throw new IllegalStateException("Reading invalid OperatorStateHandle, type: " + type);
-		}
-	}
-
-	private static StreamStateHandle deserializeStreamStateHandle(DataInputStream dis) throws IOException {
-		int type = dis.read();
-		if (NULL_HANDLE == type) {
-			return null;
-		} else if (FILE_STREAM_STATE_HANDLE == type) {
-			long size = dis.readLong();
-			String pathString = dis.readUTF();
-			return new FileStateHandle(new Path(pathString), size);
-		} else if (BYTE_STREAM_STATE_HANDLE == type) {
-			String handleName = dis.readUTF();
-			int numBytes = dis.readInt();
-			byte[] data = new byte[numBytes];
-			dis.readFully(data);
-			return new ByteStreamStateHandle(handleName, data);
-		} else {
-			throw new IOException("Unknown implementation of StreamStateHandle, code: " + type);
-		}
-	}
-
-	// ------------------------------------------------------------------------
-	//  old format serialization, for testing
-	// ------------------------------------------------------------------------
-
-	@VisibleForTesting
-	public static void serializeVersion1(long checkpointId, Collection<TaskState> taskStates, DataOutputStream dos) throws IOException {
-		dos.writeLong(checkpointId);
-
-		dos.writeInt(taskStates.size());
-
-		for (TaskState taskState : taskStates) {
-			// Vertex ID
-			dos.writeLong(taskState.getJobVertexID().getLowerPart());
-			dos.writeLong(taskState.getJobVertexID().getUpperPart());
-
-			// Parallelism
-			int parallelism = taskState.getParallelism();
-			dos.writeInt(parallelism);
-			dos.writeInt(taskState.getMaxParallelism());
-			dos.writeInt(taskState.getChainLength());
-
-			// Sub task states
-			Map<Integer, SubtaskState> subtaskStateMap = taskState.getSubtaskStates();
-			dos.writeInt(subtaskStateMap.size());
-			for (Map.Entry<Integer, SubtaskState> entry : subtaskStateMap.entrySet()) {
-				dos.writeInt(entry.getKey());
-				serializeSubtaskState(entry.getValue(), dos);
-			}
-		}
-	}
-
-	private static void serializeSubtaskState(SubtaskState subtaskState, DataOutputStream dos) throws IOException {
-		//backwards compatibility, do not remove
-		dos.writeLong(-1L);
-
-		//backwards compatibility (number of legacy state handles), do not remove
-		dos.writeInt(0);
-
-		ChainedStateHandle<OperatorStateHandle> operatorStateBackend = subtaskState.getManagedOperatorState();
-
-		int len = operatorStateBackend != null ? operatorStateBackend.getLength() : 0;
-		dos.writeInt(len);
-		for (int i = 0; i < len; ++i) {
-			OperatorStateHandle stateHandle = operatorStateBackend.get(i);
-			serializeOperatorStateHandle(stateHandle, dos);
-		}
-
-		ChainedStateHandle<OperatorStateHandle> operatorStateFromStream = subtaskState.getRawOperatorState();
-
-		len = operatorStateFromStream != null ? operatorStateFromStream.getLength() : 0;
-		dos.writeInt(len);
-		for (int i = 0; i < len; ++i) {
-			OperatorStateHandle stateHandle = operatorStateFromStream.get(i);
-			serializeOperatorStateHandle(stateHandle, dos);
-		}
-
-		KeyedStateHandle keyedStateBackend = subtaskState.getManagedKeyedState();
-		serializeKeyedStateHandle(keyedStateBackend, dos);
-
-		KeyedStateHandle keyedStateStream = subtaskState.getRawKeyedState();
-		serializeKeyedStateHandle(keyedStateStream, dos);
-	}
-
-	private static void serializeStreamStateHandle(
-		StreamStateHandle stateHandle, DataOutputStream dos) throws IOException {
-
-		if (stateHandle == null) {
-			dos.writeByte(NULL_HANDLE);
-
-		} else if (stateHandle instanceof FileStateHandle) {
-			dos.writeByte(FILE_STREAM_STATE_HANDLE);
-			FileStateHandle fileStateHandle = (FileStateHandle) stateHandle;
-			dos.writeLong(stateHandle.getStateSize());
-			dos.writeUTF(fileStateHandle.getFilePath().toString());
-
-		} else if (stateHandle instanceof ByteStreamStateHandle) {
-			dos.writeByte(BYTE_STREAM_STATE_HANDLE);
-			ByteStreamStateHandle byteStreamStateHandle = (ByteStreamStateHandle) stateHandle;
-			dos.writeUTF(byteStreamStateHandle.getHandleName());
-			byte[] internalData = byteStreamStateHandle.getData();
-			dos.writeInt(internalData.length);
-			dos.write(byteStreamStateHandle.getData());
-
-		} else {
-			throw new IOException("Unknown implementation of StreamStateHandle: " + stateHandle.getClass());
-		}
-
-		dos.flush();
-	}
-
-	private static void serializeKeyedStateHandle(
-		KeyedStateHandle stateHandle, DataOutputStream dos) throws IOException {
-
-		if (stateHandle == null) {
-			dos.writeByte(NULL_HANDLE);
-		} else if (stateHandle instanceof KeyGroupsStateHandle) {
-			KeyGroupsStateHandle keyGroupsStateHandle = (KeyGroupsStateHandle) stateHandle;
-
-			dos.writeByte(KEY_GROUPS_HANDLE);
-			dos.writeInt(keyGroupsStateHandle.getKeyGroupRange().getStartKeyGroup());
-			dos.writeInt(keyGroupsStateHandle.getKeyGroupRange().getNumberOfKeyGroups());
-			for (int keyGroup : keyGroupsStateHandle.getKeyGroupRange()) {
-				dos.writeLong(keyGroupsStateHandle.getOffsetForKeyGroup(keyGroup));
-			}
-			serializeStreamStateHandle(keyGroupsStateHandle.getDelegateStateHandle(), dos);
-		} else {
-			throw new IllegalStateException("Unknown KeyedStateHandle type: " + stateHandle.getClass());
-		}
-	}
-
-	private static void serializeOperatorStateHandle(
-		OperatorStateHandle stateHandle, DataOutputStream dos) throws IOException {
-
-		if (stateHandle != null) {
-			dos.writeByte(PARTITIONABLE_OPERATOR_STATE_HANDLE);
-			Map<String, OperatorStateHandle.StateMetaInfo> partitionOffsetsMap =
-				stateHandle.getStateNameToPartitionOffsets();
-			dos.writeInt(partitionOffsetsMap.size());
-			for (Map.Entry<String, OperatorStateHandle.StateMetaInfo> entry : partitionOffsetsMap.entrySet()) {
-				dos.writeUTF(entry.getKey());
-
-				OperatorStateHandle.StateMetaInfo stateMetaInfo = entry.getValue();
-
-				int mode = stateMetaInfo.getDistributionMode().ordinal();
-				dos.writeByte(mode);
-
-				long[] offsets = stateMetaInfo.getOffsets();
-				dos.writeInt(offsets.length);
-				for (long offset : offsets) {
-					dos.writeLong(offset);
-				}
-			}
-			serializeStreamStateHandle(stateHandle.getDelegateStateHandle(), dos);
-		} else {
-			dos.writeByte(NULL_HANDLE);
-		}
+		throw new IOException("This savepoint / checkpoint version (Flink 1.1 / 1.2) is no longer supported.");
 	}
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2.java
index 6dc628d..cf761d8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2.java
@@ -20,22 +20,8 @@ package org.apache.flink.runtime.checkpoint.savepoint;
 
 import org.apache.flink.runtime.checkpoint.MasterState;
 import org.apache.flink.runtime.checkpoint.OperatorState;
-import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
-import org.apache.flink.runtime.checkpoint.SubtaskState;
-import org.apache.flink.runtime.checkpoint.TaskState;
-import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
-import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.jobgraph.OperatorID;
-import org.apache.flink.runtime.state.ChainedStateHandle;
-import org.apache.flink.runtime.state.KeyedStateHandle;
-import org.apache.flink.runtime.state.OperatorStateHandle;
-import org.apache.flink.util.Preconditions;
 
 import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -51,48 +37,15 @@ public class SavepointV2 implements Savepoint {
 	/** The checkpoint ID. */
 	private final long checkpointId;
 
-	/**
-	 * The task states.
-	 * @deprecated Only kept for backwards-compatibility with versions < 1.3. Will be removed in the future.
-	 */
-	@Deprecated
-	private final Collection<TaskState> taskStates;
-
 	/** The operator states. */
 	private final Collection<OperatorState> operatorStates;
 
 	/** The states generated by the CheckpointCoordinator. */
 	private final Collection<MasterState> masterStates;
 
-	/** @deprecated Only kept for backwards-compatibility with versions < 1.3. Will be removed in the future. */
-	@Deprecated
-	public SavepointV2(long checkpointId, Collection<TaskState> taskStates) {
-		this(
-			checkpointId,
-			null,
-			checkNotNull(taskStates, "taskStates"),
-			Collections.<MasterState>emptyList()
-		);
-	}
-
 	public SavepointV2(long checkpointId, Collection<OperatorState> operatorStates, Collection<MasterState> masterStates) {
-		this(
-			checkpointId,
-			checkNotNull(operatorStates, "operatorStates"),
-			null,
-			masterStates
-		);
-	}
-
-	private SavepointV2(
-			long checkpointId,
-			Collection<OperatorState> operatorStates,
-			Collection<TaskState> taskStates,
-			Collection<MasterState> masterStates) {
-
 		this.checkpointId = checkpointId;
 		this.operatorStates = operatorStates;
-		this.taskStates = taskStates;
 		this.masterStates = checkNotNull(masterStates, "masterStates");
 	}
 
@@ -112,11 +65,6 @@ public class SavepointV2 implements Savepoint {
 	}
 
 	@Override
-	public Collection<TaskState> getTaskStates() {
-		return taskStates;
-	}
-
-	@Override
 	public Collection<MasterState> getMasterStates() {
 		return masterStates;
 	}
@@ -134,122 +82,4 @@ public class SavepointV2 implements Savepoint {
 	public String toString() {
 		return "Checkpoint Metadata (version=" + VERSION + ')';
 	}
-
-	/**
-	 * Converts the {@link Savepoint} containing {@link TaskState TaskStates} to an equivalent savepoint containing
-	 * {@link OperatorState OperatorStates}.
-	 *
-	 * @param savepoint savepoint to convert
-	 * @param tasks     map of all vertices and their job vertex ids
-	 * @return converted completed checkpoint
-	 * @deprecated Only kept for backwards-compatibility with versions < 1.3. Will be removed in the future.
-	 * */
-	@Deprecated
-	public static Savepoint convertToOperatorStateSavepointV2(
-			Map<JobVertexID, ExecutionJobVertex> tasks,
-			Savepoint savepoint) {
-
-		if (savepoint.getOperatorStates() != null) {
-			return savepoint;
-		}
-
-		boolean expandedToLegacyIds = false;
-
-		Map<OperatorID, OperatorState> operatorStates = new HashMap<>(savepoint.getTaskStates().size() << 1);
-
-		for (TaskState taskState : savepoint.getTaskStates()) {
-			ExecutionJobVertex jobVertex = tasks.get(taskState.getJobVertexID());
-
-			// on the first time we can not find the execution job vertex for an id, we also consider alternative ids,
-			// for example as generated from older flink versions, to provide backwards compatibility.
-			if (jobVertex == null && !expandedToLegacyIds) {
-				tasks = ExecutionJobVertex.includeLegacyJobVertexIDs(tasks);
-				jobVertex = tasks.get(taskState.getJobVertexID());
-				expandedToLegacyIds = true;
-			}
-
-			if (jobVertex == null) {
-				throw new IllegalStateException(
-					"Could not find task for state with ID " + taskState.getJobVertexID() + ". " +
-					"When migrating a savepoint from a version < 1.3 please make sure that the topology was not " +
-					"changed through removal of a stateful operator or modification of a chain containing a stateful " +
-					"operator.");
-			}
-
-			List<OperatorID> operatorIDs = jobVertex.getOperatorIDs();
-
-			Preconditions.checkArgument(
-				jobVertex.getParallelism() == taskState.getParallelism(),
-				"Detected change in parallelism during migration for task " + jobVertex.getJobVertexId() + "." +
-					"When migrating a savepoint from a version < 1.3 please make sure that no changes were made " +
-					"to the parallelism of stateful operators.");
-
-			Preconditions.checkArgument(
-				operatorIDs.size() == taskState.getChainLength(),
-				"Detected change in chain length during migration for task " + jobVertex.getJobVertexId() + ". " +
-					"When migrating a savepoint from a version < 1.3 please make sure that the topology was not " +
-					"changed by modification of a chain containing a stateful operator.");
-
-			for (int subtaskIndex = 0; subtaskIndex < jobVertex.getParallelism(); subtaskIndex++) {
-				SubtaskState subtaskState;
-				try {
-					subtaskState = taskState.getState(subtaskIndex);
-				} catch (Exception e) {
-					throw new IllegalStateException(
-						"Could not find subtask with index " + subtaskIndex + " for task " + jobVertex.getJobVertexId() + ". " +
-						"When migrating a savepoint from a version < 1.3 please make sure that no changes were made " +
-						"to the parallelism of stateful operators.",
-						e);
-				}
-
-				if (subtaskState == null) {
-					continue;
-				}
-
-				ChainedStateHandle<OperatorStateHandle> partitioneableState =
-					subtaskState.getManagedOperatorState();
-				ChainedStateHandle<OperatorStateHandle> rawOperatorState =
-					subtaskState.getRawOperatorState();
-
-				for (int chainIndex = 0; chainIndex < taskState.getChainLength(); chainIndex++) {
-
-					// task consists of multiple operators so we have to break the state apart
-					for (int operatorIndex = 0; operatorIndex < operatorIDs.size(); operatorIndex++) {
-						OperatorID operatorID = operatorIDs.get(operatorIndex);
-						OperatorState operatorState = operatorStates.get(operatorID);
-
-						if (operatorState == null) {
-							operatorState = new OperatorState(
-								operatorID,
-								jobVertex.getParallelism(),
-								jobVertex.getMaxParallelism());
-							operatorStates.put(operatorID, operatorState);
-						}
-
-						KeyedStateHandle managedKeyedState = null;
-						KeyedStateHandle rawKeyedState = null;
-
-						// only the head operator retains the keyed state
-						if (operatorIndex == operatorIDs.size() - 1) {
-							managedKeyedState = subtaskState.getManagedKeyedState();
-							rawKeyedState = subtaskState.getRawKeyedState();
-						}
-
-						OperatorSubtaskState operatorSubtaskState = new OperatorSubtaskState(
-							partitioneableState != null ? partitioneableState.get(operatorIndex) : null,
-							rawOperatorState != null ? rawOperatorState.get(operatorIndex) : null,
-							managedKeyedState,
-							rawKeyedState);
-
-						operatorState.putState(subtaskIndex, operatorSubtaskState);
-					}
-				}
-			}
-		}
-
-		return new SavepointV2(
-			savepoint.getCheckpointId(),
-			operatorStates.values(),
-			savepoint.getMasterStates());
-	}
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2Serializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2Serializer.java
index 3d60e22..9e5dff6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2Serializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2Serializer.java
@@ -36,7 +36,6 @@ import org.apache.flink.runtime.state.StateHandleID;
 import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.state.filesystem.FileStateHandle;
 import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
-import org.apache.flink.util.Preconditions;
 
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
@@ -290,22 +289,14 @@ public class SavepointV2Serializer implements SavepointSerializer {
 		// Duration field has been removed from SubtaskState, do not remove
 		long ignoredDuration = dis.readLong();
 
-		// for compatibility, do not remove
-		int len = dis.readInt();
-
-		if (SavepointSerializers.failWhenLegacyStateDetected) {
-			Preconditions.checkState(len == 0,
+		final int numLegacyTaskStates = dis.readInt();
+		if (numLegacyTaskStates > 0) {
+			throw new IOException(
 				"Legacy state (from Flink <= 1.1, created through the 'Checkpointed' interface) is " +
-					"no longer supported starting from Flink 1.4. Please rewrite your job to use " +
-					"'CheckpointedFunction' instead!");
-		} else {
-			for (int i = 0; i < len; ++i) {
-				// absorb bytes from stream and ignore result
-				deserializeStreamStateHandle(dis);
-			}
+				"no longer supported.");
 		}
 
-		len = dis.readInt();
+		int len = dis.readInt();
 		OperatorStateHandle operatorStateBackend = len == 0 ? null : deserializeOperatorStateHandle(dis);
 
 		len = dis.readInt();
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1SerializerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1SerializerTest.java
deleted file mode 100644
index da448f0..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1SerializerTest.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.checkpoint.savepoint;
-
-import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
-import org.apache.flink.core.memory.DataInputViewStreamWrapper;
-import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
-import org.apache.flink.runtime.checkpoint.TaskState;
-
-import org.junit.Test;
-
-import java.io.ByteArrayInputStream;
-import java.util.Collection;
-import java.util.Random;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-/**
- * Test that the Checkpoint Metadata V1 deserializer can deserialize a the metadata correctly
- * into the latest format.
- */
-public class SavepointV1SerializerTest {
-
-	@Test
-	public void testSerializeDeserializeV1() throws Exception {
-		final Random r = new Random(42);
-
-		for (int i = 0; i < 50; ++i) {
-			final long checkpointId = i + 123123;
-			final Collection<TaskState> taskStates = CheckpointTestUtils.createTaskStates(r, 1 + r.nextInt(64), 1 + r.nextInt(64));
-
-			// Serialize
-			final ByteArrayOutputStreamWithPos baos = new ByteArrayOutputStreamWithPos();
-			SavepointV1Serializer.serializeVersion1(checkpointId, taskStates, new DataOutputViewStreamWrapper(baos));
-			final byte[] bytes = baos.toByteArray();
-
-			// Deserialize
-			final SavepointV2 actual = SavepointV1Serializer.INSTANCE.deserialize(
-					new DataInputViewStreamWrapper(new ByteArrayInputStream(bytes)),
-					Thread.currentThread().getContextClassLoader());
-
-			assertEquals(checkpointId, actual.getCheckpointId());
-			assertEquals(taskStates, actual.getTaskStates());
-			assertTrue(actual.getMasterStates().isEmpty());
-		}
-	}
-}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackendSnapshotMigrationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackendSnapshotMigrationTest.java
index 23e74cd..bbc457b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackendSnapshotMigrationTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackendSnapshotMigrationTest.java
@@ -19,15 +19,12 @@
 package org.apache.flink.runtime.state.heap;
 
 import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.state.MapStateDescriptor;
 import org.apache.flink.api.common.typeutils.base.IntSerializer;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.checkpoint.StateObjectCollection;
-import org.apache.flink.runtime.state.KeyGroupsStateHandle;
 import org.apache.flink.runtime.state.KeyedStateHandle;
 import org.apache.flink.runtime.state.SnapshotResult;
-import org.apache.flink.runtime.state.internal.InternalListState;
 import org.apache.flink.runtime.state.internal.InternalMapState;
 import org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory;
 import org.apache.flink.util.InstantiationUtil;
@@ -38,12 +35,10 @@ import org.junit.Test;
 import java.io.BufferedInputStream;
 import java.io.FileInputStream;
 import java.net.URL;
-import java.util.Collections;
 import java.util.Iterator;
 import java.util.Map;
 import java.util.concurrent.RunnableFuture;
 
-import static java.util.Arrays.asList;
 import static org.junit.Assert.assertEquals;
 
 /**
@@ -203,134 +198,4 @@ public class HeapKeyedStateBackendSnapshotMigrationTest extends HeapStateBackend
 //			}
 //		}
 //	}
-
-	/**
-	 * [FLINK-5979]
-	 *
-	 * <p>This test takes a snapshot that was created with Flink 1.2 and tries to restore it in master to check
-	 * the backwards compatibility of the serialization format of {@link StateTable}s.
-	 */
-	@Test
-	public void testRestore1_2ToMaster() throws Exception {
-
-		ClassLoader cl = getClass().getClassLoader();
-		URL resource = cl.getResource("heap_keyed_statebackend_1_2.snapshot");
-
-		Preconditions.checkNotNull(resource, "Binary snapshot resource not found!");
-
-		final Integer namespace1 = 1;
-		final Integer namespace2 = 2;
-		final Integer namespace3 = 3;
-
-		final KeyGroupsStateHandle stateHandle;
-		try (BufferedInputStream bis = new BufferedInputStream((new FileInputStream(resource.getFile())))) {
-			stateHandle = InstantiationUtil.deserializeObject(bis, Thread.currentThread().getContextClassLoader());
-		}
-		try (final HeapKeyedStateBackend<String> keyedBackend = createKeyedBackend(StateObjectCollection.singleton(stateHandle))) {
-			final ListStateDescriptor<Long> stateDescr = new ListStateDescriptor<>("my-state", Long.class);
-			stateDescr.initializeSerializerUnlessSet(new ExecutionConfig());
-
-			InternalListState<String, Integer, Long> state = keyedBackend.createInternalState(IntSerializer.INSTANCE, stateDescr);
-
-			assertEquals(7, keyedBackend.numKeyValueStateEntries());
-
-			keyedBackend.setCurrentKey("abc");
-			state.setCurrentNamespace(namespace1);
-			assertEquals(asList(33L, 55L), state.get());
-			state.setCurrentNamespace(namespace2);
-			assertEquals(asList(22L, 11L), state.get());
-			state.setCurrentNamespace(namespace3);
-			assertEquals(Collections.singletonList(44L), state.get());
-
-			keyedBackend.setCurrentKey("def");
-			state.setCurrentNamespace(namespace1);
-			assertEquals(asList(11L, 44L), state.get());
-
-			state.setCurrentNamespace(namespace3);
-			assertEquals(asList(22L, 55L, 33L), state.get());
-
-			keyedBackend.setCurrentKey("jkl");
-			state.setCurrentNamespace(namespace1);
-			assertEquals(asList(11L, 22L, 33L, 44L, 55L), state.get());
-
-			keyedBackend.setCurrentKey("mno");
-			state.setCurrentNamespace(namespace3);
-			assertEquals(asList(11L, 22L, 33L, 44L, 55L), state.get());
-		}
-	}
-
-//	/**
-//	 * This code was used to create the binary file of the old version's snapshot used by this test. If you need to
-//	 * recreate the binary, you can comment this out and run it.
-//	 */
-//	private void createBinarySnapshot() throws Exception {
-//
-//		final String pathToWrite = "/PATH/TO/WRITE";
-//
-//		final ListStateDescriptor<Long> stateDescr = new ListStateDescriptor<>("my-state", Long.class);
-//		stateDescr.initializeSerializerUnlessSet(new ExecutionConfig());
-//
-//		final Integer namespace1 = 1;
-//		final Integer namespace2 = 2;
-//		final Integer namespace3 = 3;
-//
-//		final HeapKeyedStateBackend<String> keyedBackend = createKeyedBackend();
-//
-//		try {
-//			InternalListState<Integer, Long> state = keyedBackend.createListState(IntSerializer.INSTANCE, stateDescr);
-//
-//			keyedBackend.setCurrentKey("abc");
-//			state.setCurrentNamespace(namespace1);
-//			state.add(33L);
-//			state.add(55L);
-//
-//			state.setCurrentNamespace(namespace2);
-//			state.add(22L);
-//			state.add(11L);
-//
-//			state.setCurrentNamespace(namespace3);
-//			state.add(44L);
-//
-//			keyedBackend.setCurrentKey("def");
-//			state.setCurrentNamespace(namespace1);
-//			state.add(11L);
-//			state.add(44L);
-//
-//			state.setCurrentNamespace(namespace3);
-//			state.add(22L);
-//			state.add(55L);
-//			state.add(33L);
-//
-//			keyedBackend.setCurrentKey("jkl");
-//			state.setCurrentNamespace(namespace1);
-//			state.add(11L);
-//			state.add(22L);
-//			state.add(33L);
-//			state.add(44L);
-//			state.add(55L);
-//
-//			keyedBackend.setCurrentKey("mno");
-//			state.setCurrentNamespace(namespace3);
-//			state.add(11L);
-//			state.add(22L);
-//			state.add(33L);
-//			state.add(44L);
-//			state.add(55L);
-//			RunnableFuture<KeyGroupsStateHandle> snapshot = keyedBackend.snapshot(
-//					0L,
-//					0L,
-//					new MemCheckpointStreamFactory(4 * 1024 * 1024),
-//					CheckpointOptions.forCheckpointWithDefaultLocation());
-//
-//			snapshot.run();
-//
-//			try (BufferedOutputStream bis = new BufferedOutputStream(new FileOutputStream(pathToWrite))) {
-//				InstantiationUtil.serializeObject(bis, snapshot.get());
-//			}
-//
-//		} finally {
-//			keyedBackend.close();
-//			keyedBackend.dispose();
-//		}
-//	}
 }
diff --git a/flink-runtime/src/test/resources/heap_keyed_statebackend_1_2.snapshot b/flink-runtime/src/test/resources/heap_keyed_statebackend_1_2.snapshot
deleted file mode 100644
index b9171bc..0000000
Binary files a/flink-runtime/src/test/resources/heap_keyed_statebackend_1_2.snapshot and /dev/null differ
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorMigrationTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorMigrationTest.java
index 4939bd2..e304661 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorMigrationTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorMigrationTest.java
@@ -86,7 +86,6 @@ public class WindowOperatorMigrationTest {
 	@Parameterized.Parameters(name = "Migration Savepoint: {0}")
 	public static Collection<MigrationVersion> parameters () {
 		return Arrays.asList(
-			MigrationVersion.v1_2,
 			MigrationVersion.v1_3,
 			MigrationVersion.v1_4,
 			MigrationVersion.v1_5,
diff --git a/flink-streaming-java/src/test/resources/win-op-migration-test-accum-aligned-flink1.2-snapshot b/flink-streaming-java/src/test/resources/win-op-migration-test-accum-aligned-flink1.2-snapshot
deleted file mode 100644
index d182bee..0000000
Binary files a/flink-streaming-java/src/test/resources/win-op-migration-test-accum-aligned-flink1.2-snapshot and /dev/null differ
diff --git a/flink-streaming-java/src/test/resources/win-op-migration-test-aggr-aligned-flink1.2-snapshot b/flink-streaming-java/src/test/resources/win-op-migration-test-aggr-aligned-flink1.2-snapshot
deleted file mode 100644
index 83741c1..0000000
Binary files a/flink-streaming-java/src/test/resources/win-op-migration-test-aggr-aligned-flink1.2-snapshot and /dev/null differ
diff --git a/flink-streaming-java/src/test/resources/win-op-migration-test-apply-event-time-flink1.2-snapshot b/flink-streaming-java/src/test/resources/win-op-migration-test-apply-event-time-flink1.2-snapshot
deleted file mode 100644
index 3411df6..0000000
Binary files a/flink-streaming-java/src/test/resources/win-op-migration-test-apply-event-time-flink1.2-snapshot and /dev/null differ
diff --git a/flink-streaming-java/src/test/resources/win-op-migration-test-apply-processing-time-flink1.2-snapshot b/flink-streaming-java/src/test/resources/win-op-migration-test-apply-processing-time-flink1.2-snapshot
deleted file mode 100644
index 060397f..0000000
Binary files a/flink-streaming-java/src/test/resources/win-op-migration-test-apply-processing-time-flink1.2-snapshot and /dev/null differ
diff --git a/flink-streaming-java/src/test/resources/win-op-migration-test-kryo-serialized-key-flink1.2-snapshot b/flink-streaming-java/src/test/resources/win-op-migration-test-kryo-serialized-key-flink1.2-snapshot
deleted file mode 100644
index 3a4cc98..0000000
Binary files a/flink-streaming-java/src/test/resources/win-op-migration-test-kryo-serialized-key-flink1.2-snapshot and /dev/null differ
diff --git a/flink-streaming-java/src/test/resources/win-op-migration-test-reduce-event-time-flink1.2-snapshot b/flink-streaming-java/src/test/resources/win-op-migration-test-reduce-event-time-flink1.2-snapshot
deleted file mode 100644
index dad4630..0000000
Binary files a/flink-streaming-java/src/test/resources/win-op-migration-test-reduce-event-time-flink1.2-snapshot and /dev/null differ
diff --git a/flink-streaming-java/src/test/resources/win-op-migration-test-reduce-processing-time-flink1.2-snapshot b/flink-streaming-java/src/test/resources/win-op-migration-test-reduce-processing-time-flink1.2-snapshot
deleted file mode 100644
index 0edf25e..0000000
Binary files a/flink-streaming-java/src/test/resources/win-op-migration-test-reduce-processing-time-flink1.2-snapshot and /dev/null differ
diff --git a/flink-streaming-java/src/test/resources/win-op-migration-test-session-with-stateful-trigger-flink1.2-snapshot b/flink-streaming-java/src/test/resources/win-op-migration-test-session-with-stateful-trigger-flink1.2-snapshot
deleted file mode 100644
index d0afbe0..0000000
Binary files a/flink-streaming-java/src/test/resources/win-op-migration-test-session-with-stateful-trigger-flink1.2-snapshot and /dev/null differ
diff --git a/flink-streaming-java/src/test/resources/win-op-migration-test-session-with-stateful-trigger-mint-flink1.2-snapshot b/flink-streaming-java/src/test/resources/win-op-migration-test-session-with-stateful-trigger-mint-flink1.2-snapshot
deleted file mode 100644
index 7d0b6cc..0000000
Binary files a/flink-streaming-java/src/test/resources/win-op-migration-test-session-with-stateful-trigger-mint-flink1.2-snapshot and /dev/null differ
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/LegacyStatefulJobSavepointMigrationITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/LegacyStatefulJobSavepointMigrationITCase.java
index 9c2ff79..e40294e 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/LegacyStatefulJobSavepointMigrationITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/LegacyStatefulJobSavepointMigrationITCase.java
@@ -70,10 +70,6 @@ public class LegacyStatefulJobSavepointMigrationITCase extends SavepointMigratio
 	@Parameterized.Parameters(name = "Migrate Savepoint / Backend: {0}")
 	public static Collection<Tuple2<MigrationVersion, String>> parameters () {
 		return Arrays.asList(
-			Tuple2.of(MigrationVersion.v1_2, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
-			Tuple2.of(MigrationVersion.v1_2, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME),
-			Tuple2.of(MigrationVersion.v1_3, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
-			Tuple2.of(MigrationVersion.v1_3, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME),
 			Tuple2.of(MigrationVersion.v1_4, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
 			Tuple2.of(MigrationVersion.v1_4, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME));
 	}
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java
index eac68b1..de22ab2 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java
@@ -30,7 +30,6 @@ import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.HeartbeatManagerOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
-import org.apache.flink.runtime.checkpoint.savepoint.SavepointSerializers;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
 import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
@@ -40,8 +39,6 @@ import org.apache.flink.test.util.TestBaseUtils;
 import org.apache.flink.testutils.junit.category.AlsoRunWithLegacyScheduler;
 
 import org.apache.commons.io.FileUtils;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
 import org.junit.ClassRule;
 import org.junit.Rule;
 import org.junit.experimental.categories.Category;
@@ -77,16 +74,6 @@ public abstract class SavepointMigrationTestBase extends TestBaseUtils {
 
 	protected static final int DEFAULT_PARALLELISM = 4;
 
-	@BeforeClass
-	public static void before() {
-		SavepointSerializers.setFailWhenLegacyStateDetected(false);
-	}
-
-	@AfterClass
-	public static void after() {
-		SavepointSerializers.setFailWhenLegacyStateDetected(true);
-	}
-
 	protected static String getResourceFilename(String filename) {
 		ClassLoader cl = SavepointMigrationTestBase.class.getClassLoader();
 		URL resource = cl.getResource(filename);
diff --git a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java
index 9b00d49..b305c7e 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java
@@ -25,7 +25,6 @@ import org.apache.flink.api.common.time.Time;
 import org.apache.flink.client.ClientUtils;
 import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.runtime.checkpoint.CheckpointFailureReason;
-import org.apache.flink.runtime.checkpoint.savepoint.SavepointSerializers;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
@@ -41,8 +40,6 @@ import org.apache.flink.testutils.junit.category.AlsoRunWithLegacyScheduler;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.TestLogger;
 
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
@@ -104,16 +101,6 @@ public abstract class AbstractOperatorRestoreTestBase extends TestLogger {
 		this.allowNonRestoredState = allowNonRestoredState;
 	}
 
-	@BeforeClass
-	public static void beforeClass() {
-		SavepointSerializers.setFailWhenLegacyStateDetected(false);
-	}
-
-	@AfterClass
-	public static void after() {
-		SavepointSerializers.setFailWhenLegacyStateDetected(true);
-	}
-
 	@Test
 	public void testMigrationAndRestore() throws Throwable {
 		ClusterClient<?> clusterClient = cluster.getClusterClient();
diff --git a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/keyed/AbstractKeyedOperatorRestoreTestBase.java b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/keyed/AbstractKeyedOperatorRestoreTestBase.java
index c76f7b2..c8a966e 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/keyed/AbstractKeyedOperatorRestoreTestBase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/keyed/AbstractKeyedOperatorRestoreTestBase.java
@@ -42,7 +42,6 @@ public abstract class AbstractKeyedOperatorRestoreTestBase extends AbstractOpera
 	@Parameterized.Parameters(name = "Migrate Savepoint: {0}")
 	public static Collection<MigrationVersion> parameters () {
 		return Arrays.asList(
-			MigrationVersion.v1_2,
 			MigrationVersion.v1_3,
 			MigrationVersion.v1_4,
 			MigrationVersion.v1_5,
diff --git a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/AbstractNonKeyedOperatorRestoreTestBase.java b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/AbstractNonKeyedOperatorRestoreTestBase.java
index c36700a..15e9d87 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/AbstractNonKeyedOperatorRestoreTestBase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/AbstractNonKeyedOperatorRestoreTestBase.java
@@ -48,7 +48,6 @@ public abstract class AbstractNonKeyedOperatorRestoreTestBase extends AbstractOp
 	@Parameterized.Parameters(name = "Migrate Savepoint: {0}")
 	public static Collection<MigrationVersion> parameters () {
 		return Arrays.asList(
-			MigrationVersion.v1_2,
 			MigrationVersion.v1_3,
 			MigrationVersion.v1_4,
 			MigrationVersion.v1_5,
diff --git a/flink-tests/src/test/resources/operatorstate/complexKeyed-flink1.2/_metadata b/flink-tests/src/test/resources/operatorstate/complexKeyed-flink1.2/_metadata
deleted file mode 100644
index 0a1ed10..0000000
Binary files a/flink-tests/src/test/resources/operatorstate/complexKeyed-flink1.2/_metadata and /dev/null differ
diff --git a/flink-tests/src/test/resources/operatorstate/nonKeyed-flink1.2/_metadata b/flink-tests/src/test/resources/operatorstate/nonKeyed-flink1.2/_metadata
deleted file mode 100644
index 8fcd1ea..0000000
Binary files a/flink-tests/src/test/resources/operatorstate/nonKeyed-flink1.2/_metadata and /dev/null differ
diff --git a/flink-tests/src/test/resources/stateful-udf-migration-itcase-flink1.2-rocksdb-savepoint b/flink-tests/src/test/resources/stateful-udf-migration-itcase-flink1.2-rocksdb-savepoint
deleted file mode 100644
index 548993f..0000000
Binary files a/flink-tests/src/test/resources/stateful-udf-migration-itcase-flink1.2-rocksdb-savepoint and /dev/null differ
diff --git a/flink-tests/src/test/resources/stateful-udf-migration-itcase-flink1.2-savepoint b/flink-tests/src/test/resources/stateful-udf-migration-itcase-flink1.2-savepoint
deleted file mode 100644
index a8d19f2..0000000
Binary files a/flink-tests/src/test/resources/stateful-udf-migration-itcase-flink1.2-savepoint and /dev/null differ
diff --git a/flink-tests/src/test/resources/stateful-udf-migration-itcase-flink1.3-rocksdb-savepoint/_metadata b/flink-tests/src/test/resources/stateful-udf-migration-itcase-flink1.3-rocksdb-savepoint/_metadata
deleted file mode 100644
index 8f22bcb..0000000
Binary files a/flink-tests/src/test/resources/stateful-udf-migration-itcase-flink1.3-rocksdb-savepoint/_metadata and /dev/null differ
diff --git a/flink-tests/src/test/resources/stateful-udf-migration-itcase-flink1.3-savepoint/_metadata b/flink-tests/src/test/resources/stateful-udf-migration-itcase-flink1.3-savepoint/_metadata
deleted file mode 100644
index 8ca91ec..0000000
Binary files a/flink-tests/src/test/resources/stateful-udf-migration-itcase-flink1.3-savepoint/_metadata and /dev/null differ