You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2020/05/07 15:52:33 UTC

[flink] branch master updated: [FLINK-16638][runtime][checkpointing] Flink checkStateMappingCompleteness doesn't include UserDefinedOperatorIDs

This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 0114338  [FLINK-16638][runtime][checkpointing] Flink checkStateMappingCompleteness doesn't include UserDefinedOperatorIDs
0114338 is described below

commit 0114338da5ce52677d1dfa1ab4350b1567dc3522
Author: edu05 <ed...@gmail.com>
AuthorDate: Wed Apr 29 19:36:08 2020 +0200

    [FLINK-16638][runtime][checkpointing] Flink checkStateMappingCompleteness doesn't include UserDefinedOperatorIDs
---
 .../state/api/input/OperatorStateInputFormat.java  |  3 +-
 .../state/api/runtime/OperatorIDGeneratorTest.java |  2 +-
 .../org/apache/flink/runtime/OperatorIDPair.java   | 58 +++++++++++++++
 .../flink/runtime/checkpoint/Checkpoints.java      | 17 +----
 .../runtime/checkpoint/PendingCheckpoint.java      | 13 ++--
 .../checkpoint/StateAssignmentOperation.java       | 35 ++++-----
 .../runtime/executiongraph/ExecutionJobVertex.java | 87 ++--------------------
 .../runtime/jobgraph/InputOutputFormatVertex.java  |  7 +-
 .../apache/flink/runtime/jobgraph/JobVertex.java   | 49 ++++--------
 .../ChannelStateNoRescalingPartitionerTest.java    |  3 +-
 .../CheckpointCoordinatorRestoringTest.java        | 23 +++---
 .../checkpoint/CheckpointCoordinatorTest.java      |  2 +-
 .../CheckpointCoordinatorTestingUtils.java         | 18 +++--
 .../checkpoint/CheckpointMetadataLoadingTest.java  |  3 +-
 .../checkpoint/CheckpointStateRestoreTest.java     |  4 +-
 .../runtime/checkpoint/PendingCheckpointTest.java  |  3 +-
 .../checkpoint/StateAssignmentOperationTest.java   | 54 ++++++++++----
 .../executiongraph/LegacyJobVertexIdTest.java      | 64 ----------------
 .../api/graph/StreamingJobGraphGenerator.java      | 24 ++----
 .../StreamingJobGraphGeneratorNodeHashTest.java    | 23 ------
 20 files changed, 191 insertions(+), 301 deletions(-)

diff --git a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/input/OperatorStateInputFormat.java b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/input/OperatorStateInputFormat.java
index a312938..ec4c9b1 100644
--- a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/input/OperatorStateInputFormat.java
+++ b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/input/OperatorStateInputFormat.java
@@ -26,6 +26,7 @@ import org.apache.flink.api.common.io.statistics.BaseStatistics;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.CloseableRegistry;
 import org.apache.flink.core.io.InputSplitAssigner;
+import org.apache.flink.runtime.OperatorIDPair;
 import org.apache.flink.runtime.checkpoint.OperatorState;
 import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
 import org.apache.flink.runtime.checkpoint.RoundRobinOperatorStateRepartitioner;
@@ -123,7 +124,7 @@ abstract class OperatorStateInputFormat<OT> extends RichInputFormat<OT, Operator
 		Map<OperatorInstanceID, List<OperatorStateHandle>> newManagedOperatorStates = reDistributePartitionableStates(
 			singletonList(operatorState),
 			minNumSplits,
-			singletonList(operatorState.getOperatorID()),
+			singletonList(OperatorIDPair.generatedIDOnly(operatorState.getOperatorID())),
 			OperatorSubtaskState::getManagedOperatorState,
 			RoundRobinOperatorStateRepartitioner.INSTANCE);
 
diff --git a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/runtime/OperatorIDGeneratorTest.java b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/runtime/OperatorIDGeneratorTest.java
index 9dff2a1..cb90641 100644
--- a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/runtime/OperatorIDGeneratorTest.java
+++ b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/runtime/OperatorIDGeneratorTest.java
@@ -65,6 +65,6 @@ public class OperatorIDGeneratorTest {
 			.findFirst()
 			.orElseThrow(() -> new IllegalStateException("Unable to find vertex"));
 
-		return vertex.getOperatorIDs().get(0);
+		return vertex.getOperatorIDs().get(0).getGeneratedOperatorID();
 	}
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/OperatorIDPair.java b/flink-runtime/src/main/java/org/apache/flink/runtime/OperatorIDPair.java
new file mode 100644
index 0000000..e6070d1
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/OperatorIDPair.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime;
+
+import org.apache.flink.runtime.jobgraph.OperatorID;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+import java.util.Optional;
+
+/**
+ * Formed of a mandatory operator ID and optionally a user defined operator ID.
+ */
+public class OperatorIDPair implements Serializable {
+
+	private static final long serialVersionUID = 1L;
+
+	private final OperatorID generatedOperatorID;
+	private final OperatorID userDefinedOperatorID;
+
+	private OperatorIDPair(OperatorID generatedOperatorID, @Nullable OperatorID userDefinedOperatorID) {
+		this.generatedOperatorID = generatedOperatorID;
+		this.userDefinedOperatorID = userDefinedOperatorID;
+	}
+
+	public static OperatorIDPair of(OperatorID generatedOperatorID, @Nullable OperatorID userDefinedOperatorID) {
+		return new OperatorIDPair(generatedOperatorID, userDefinedOperatorID);
+	}
+
+	public static OperatorIDPair generatedIDOnly(OperatorID generatedOperatorID) {
+		return new OperatorIDPair(generatedOperatorID, null);
+	}
+
+	public OperatorID getGeneratedOperatorID() {
+		return generatedOperatorID;
+	}
+
+	public Optional<OperatorID> getUserDefinedOperatorID() {
+		return Optional.ofNullable(userDefinedOperatorID);
+	}
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/Checkpoints.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/Checkpoints.java
index 17db5e6..37f96f8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/Checkpoints.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/Checkpoints.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.checkpoint;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.OperatorIDPair;
 import org.apache.flink.runtime.checkpoint.metadata.CheckpointMetadata;
 import org.apache.flink.runtime.checkpoint.metadata.MetadataSerializer;
 import org.apache.flink.runtime.checkpoint.metadata.MetadataSerializers;
@@ -137,28 +138,18 @@ public class Checkpoints {
 		// generate mapping from operator to task
 		Map<OperatorID, ExecutionJobVertex> operatorToJobVertexMapping = new HashMap<>();
 		for (ExecutionJobVertex task : tasks.values()) {
-			for (OperatorID operatorID : task.getOperatorIDs()) {
-				operatorToJobVertexMapping.put(operatorID, task);
+			for (OperatorIDPair operatorIDPair : task.getOperatorIDs()) {
+				operatorToJobVertexMapping.put(operatorIDPair.getGeneratedOperatorID(), task);
+				operatorIDPair.getUserDefinedOperatorID().ifPresent(id -> operatorToJobVertexMapping.put(id, task));
 			}
 		}
 
 		// (2) validate it (parallelism, etc)
-		boolean expandedToLegacyIds = false;
-
 		HashMap<OperatorID, OperatorState> operatorStates = new HashMap<>(checkpointMetadata.getOperatorStates().size());
 		for (OperatorState operatorState : checkpointMetadata.getOperatorStates()) {
 
 			ExecutionJobVertex executionJobVertex = operatorToJobVertexMapping.get(operatorState.getOperatorID());
 
-			// on the first time we can not find the execution job vertex for an id, we also consider alternative ids,
-			// for example as generated from older flink versions, to provide backwards compatibility.
-			if (executionJobVertex == null && !expandedToLegacyIds) {
-				operatorToJobVertexMapping = ExecutionJobVertex.includeAlternativeOperatorIDs(operatorToJobVertexMapping);
-				executionJobVertex = operatorToJobVertexMapping.get(operatorState.getOperatorID());
-				expandedToLegacyIds = true;
-				LOG.info("Could not find ExecutionJobVertex. Including user-defined OperatorIDs in search.");
-			}
-
 			if (executionJobVertex != null) {
 
 				if (executionJobVertex.getMaxParallelism() == operatorState.getMaxParallelism()
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
index 52844f8..6ccdc53 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.checkpoint;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.OperatorIDPair;
 import org.apache.flink.runtime.checkpoint.metadata.CheckpointMetadata;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.executiongraph.ExecutionVertex;
@@ -371,31 +372,31 @@ public class PendingCheckpoint {
 				acknowledgedTasks.add(executionAttemptId);
 			}
 
-			List<OperatorID> operatorIDs = vertex.getJobVertex().getOperatorIDs();
+			List<OperatorIDPair> operatorIDs = vertex.getJobVertex().getOperatorIDs();
 			int subtaskIndex = vertex.getParallelSubtaskIndex();
 			long ackTimestamp = System.currentTimeMillis();
 
 			long stateSize = 0L;
 
 			if (operatorSubtaskStates != null) {
-				for (OperatorID operatorID : operatorIDs) {
+				for (OperatorIDPair operatorID : operatorIDs) {
 
 					OperatorSubtaskState operatorSubtaskState =
-						operatorSubtaskStates.getSubtaskStateByOperatorID(operatorID);
+						operatorSubtaskStates.getSubtaskStateByOperatorID(operatorID.getGeneratedOperatorID());
 
 					// if no real operatorSubtaskState was reported, we insert an empty state
 					if (operatorSubtaskState == null) {
 						operatorSubtaskState = new OperatorSubtaskState();
 					}
 
-					OperatorState operatorState = operatorStates.get(operatorID);
+					OperatorState operatorState = operatorStates.get(operatorID.getGeneratedOperatorID());
 
 					if (operatorState == null) {
 						operatorState = new OperatorState(
-							operatorID,
+							operatorID.getGeneratedOperatorID(),
 							vertex.getTotalNumberOfParallelSubtasks(),
 							vertex.getMaxParallelism());
-						operatorStates.put(operatorID, operatorState);
+						operatorStates.put(operatorID.getGeneratedOperatorID(), operatorState);
 					}
 
 					operatorState.putState(subtaskIndex, operatorSubtaskState);
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java
index 7f49828..9644430 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.checkpoint;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.OperatorIDPair;
 import org.apache.flink.runtime.executiongraph.Execution;
 import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
 import org.apache.flink.runtime.jobgraph.OperatorID;
@@ -86,14 +87,11 @@ public class StateAssignmentOperation {
 		for (ExecutionJobVertex executionJobVertex : this.tasks) {
 
 			// find the states of all operators belonging to this task
-			List<OperatorID> operatorIDs = executionJobVertex.getOperatorIDs();
-			List<OperatorID> altOperatorIDs = executionJobVertex.getUserDefinedOperatorIDs();
-			List<OperatorState> operatorStates = new ArrayList<>(operatorIDs.size());
+			List<OperatorIDPair> operatorIDPairs = executionJobVertex.getOperatorIDs();
+			List<OperatorState> operatorStates = new ArrayList<>(operatorIDPairs.size());
 			boolean statelessTask = true;
-			for (int x = 0; x < operatorIDs.size(); x++) {
-				OperatorID operatorID = altOperatorIDs.get(x) == null
-					? operatorIDs.get(x)
-					: altOperatorIDs.get(x);
+			for (OperatorIDPair operatorIDPair : operatorIDPairs) {
+				OperatorID operatorID = operatorIDPair.getUserDefinedOperatorID().orElse(operatorIDPair.getGeneratedOperatorID());
 
 				OperatorState operatorState = localOperators.remove(operatorID);
 				if (operatorState == null) {
@@ -115,7 +113,7 @@ public class StateAssignmentOperation {
 
 	private void assignAttemptState(ExecutionJobVertex executionJobVertex, List<OperatorState> operatorStates) {
 
-		List<OperatorID> operatorIDs = executionJobVertex.getOperatorIDs();
+		List<OperatorIDPair> operatorIDs = executionJobVertex.getOperatorIDs();
 
 		//1. first compute the new parallelism
 		checkParallelismPreconditions(operatorStates, executionJobVertex);
@@ -214,7 +212,7 @@ public class StateAssignmentOperation {
 			Map<OperatorInstanceID, List<KeyedStateHandle>> subRawKeyedState,
 			int newParallelism) {
 
-		List<OperatorID> operatorIDs = executionJobVertex.getOperatorIDs();
+		List<OperatorIDPair> operatorIDs = executionJobVertex.getOperatorIDs();
 
 		for (int subTaskIndex = 0; subTaskIndex < newParallelism; subTaskIndex++) {
 
@@ -224,8 +222,8 @@ public class StateAssignmentOperation {
 			TaskStateSnapshot taskState = new TaskStateSnapshot(operatorIDs.size());
 			boolean statelessTask = true;
 
-			for (OperatorID operatorID : operatorIDs) {
-				OperatorInstanceID instanceID = OperatorInstanceID.of(subTaskIndex, operatorID);
+			for (OperatorIDPair operatorID : operatorIDs) {
+				OperatorInstanceID instanceID = OperatorInstanceID.of(subTaskIndex, operatorID.getGeneratedOperatorID());
 
 				OperatorSubtaskState operatorSubtaskState = operatorSubtaskStateFrom(
 					instanceID,
@@ -239,7 +237,7 @@ public class StateAssignmentOperation {
 				if (operatorSubtaskState.hasState()) {
 					statelessTask = false;
 				}
-				taskState.putSubtaskStateByOperatorID(operatorID, operatorSubtaskState);
+				taskState.putSubtaskStateByOperatorID(operatorID.getGeneratedOperatorID(), operatorSubtaskState);
 			}
 
 			if (!statelessTask) {
@@ -288,7 +286,7 @@ public class StateAssignmentOperation {
 	private void reDistributeKeyedStates(
 			List<OperatorState> oldOperatorStates,
 			int newParallelism,
-			List<OperatorID> newOperatorIDs,
+			List<OperatorIDPair> newOperatorIDs,
 			List<KeyGroupRange> newKeyGroupPartitions,
 			Map<OperatorInstanceID, List<KeyedStateHandle>> newManagedKeyedState,
 			Map<OperatorInstanceID, List<KeyedStateHandle>> newRawKeyedState) {
@@ -300,7 +298,7 @@ public class StateAssignmentOperation {
 			OperatorState operatorState = oldOperatorStates.get(operatorIndex);
 			int oldParallelism = operatorState.getParallelism();
 			for (int subTaskIndex = 0; subTaskIndex < newParallelism; subTaskIndex++) {
-				OperatorInstanceID instanceID = OperatorInstanceID.of(subTaskIndex, newOperatorIDs.get(operatorIndex));
+				OperatorInstanceID instanceID = OperatorInstanceID.of(subTaskIndex, newOperatorIDs.get(operatorIndex).getGeneratedOperatorID());
 				Tuple2<List<KeyedStateHandle>, List<KeyedStateHandle>> subKeyedStates = reAssignSubKeyedStates(
 					operatorState,
 					newKeyGroupPartitions,
@@ -347,7 +345,7 @@ public class StateAssignmentOperation {
 	public static <T extends StateObject> Map<OperatorInstanceID, List<T>>  reDistributePartitionableStates(
 			List<OperatorState> oldOperatorStates,
 			int newParallelism,
-			List<OperatorID> newOperatorIDs,
+			List<OperatorIDPair> newOperatorIDs,
 			Function<OperatorSubtaskState, StateObjectCollection<T>> extractHandle,
 			OperatorStateRepartitioner<T> stateRepartitioner) {
 
@@ -361,7 +359,7 @@ public class StateAssignmentOperation {
 		Map<OperatorInstanceID, List<T>> result = new HashMap<>();
 		for (int operatorIndex = 0; operatorIndex < newOperatorIDs.size(); operatorIndex++) {
 			result.putAll(applyRepartitioner(
-				newOperatorIDs.get(operatorIndex),
+				newOperatorIDs.get(operatorIndex).getGeneratedOperatorID(),
 				stateRepartitioner,
 				oldStates.get(operatorIndex),
 				oldOperatorStates.get(operatorIndex).getParallelism(),
@@ -555,7 +553,10 @@ public class StateAssignmentOperation {
 
 		Set<OperatorID> allOperatorIDs = new HashSet<>();
 		for (ExecutionJobVertex executionJobVertex : tasks) {
-			allOperatorIDs.addAll(executionJobVertex.getOperatorIDs());
+			for (OperatorIDPair operatorIDPair : executionJobVertex.getOperatorIDs()) {
+				allOperatorIDs.add(operatorIDPair.getGeneratedOperatorID());
+				operatorIDPair.getUserDefinedOperatorID().ifPresent(allOperatorIDs::add);
+			}
 		}
 		for (Map.Entry<OperatorID, OperatorState> operatorGroupStateEntry : operatorStates.entrySet()) {
 			OperatorState operatorState = operatorGroupStateEntry.getValue();
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
index 9cca608..ee2cd2a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
@@ -32,6 +32,7 @@ import org.apache.flink.core.io.InputSplit;
 import org.apache.flink.core.io.InputSplitAssigner;
 import org.apache.flink.core.io.InputSplitSource;
 import org.apache.flink.runtime.JobException;
+import org.apache.flink.runtime.OperatorIDPair;
 import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
 import org.apache.flink.runtime.blob.BlobWriter;
 import org.apache.flink.runtime.blob.PermanentBlobKey;
@@ -91,24 +92,6 @@ public class ExecutionJobVertex implements AccessExecutionJobVertex, Archiveable
 
 	private final JobVertex jobVertex;
 
-	/**
-	 * The IDs of all operators contained in this execution job vertex.
-	 *
-	 * <p>The ID's are stored depth-first post-order; for the forking chain below the ID's would be stored as [D, E, B, C, A].
-	 *  A - B - D
-	 *   \    \
-	 *    C    E
-	 * This is the same order that operators are stored in the {@code StreamTask}.
-	 */
-	private final List<OperatorID> operatorIDs;
-
-	/**
-	 * The alternative IDs of all operators contained in this execution job vertex.
-	 *
-	 * <p>The ID's are in the same order as {@link ExecutionJobVertex#operatorIDs}.
-	 */
-	private final List<OperatorID> userDefinedOperatorIds;
-
 	private final ExecutionVertex[] taskVertices;
 
 	private final IntermediateResult[] producedDataSets;
@@ -200,8 +183,6 @@ public class ExecutionJobVertex implements AccessExecutionJobVertex, Archiveable
 		this.resourceProfile = ResourceProfile.fromResourceSpec(jobVertex.getMinResources(), MemorySize.ZERO);
 
 		this.taskVertices = new ExecutionVertex[numTaskVertices];
-		this.operatorIDs = Collections.unmodifiableList(jobVertex.getOperatorIDs());
-		this.userDefinedOperatorIds = Collections.unmodifiableList(jobVertex.getUserDefinedOperatorIDs());
 
 		this.inputs = new ArrayList<>(jobVertex.getInputs().size());
 
@@ -289,21 +270,12 @@ public class ExecutionJobVertex implements AccessExecutionJobVertex, Archiveable
 	}
 
 	/**
-	 * Returns a list containing the IDs of all operators contained in this execution job vertex.
+	 * Returns a list containing the ID pairs of all operators contained in this execution job vertex.
 	 *
-	 * @return list containing the IDs of all contained operators
+	 * @return list containing the ID pairs of all contained operators
 	 */
-	public List<OperatorID> getOperatorIDs() {
-		return operatorIDs;
-	}
-
-	/**
-	 * Returns a list containing the alternative IDs of all operators contained in this execution job vertex.
-	 *
-	 * @return list containing alternative the IDs of all contained operators
-	 */
-	public List<OperatorID> getUserDefinedOperatorIDs() {
-		return userDefinedOperatorIds;
+	public List<OperatorIDPair> getOperatorIDs() {
+		return jobVertex.getOperatorIDs();
 	}
 
 	public void setMaxParallelism(int maxParallelismDerived) {
@@ -626,53 +598,4 @@ public class ExecutionJobVertex implements AccessExecutionJobVertex, Archiveable
 			return ExecutionState.CREATED;
 		}
 	}
-
-	public static Map<JobVertexID, ExecutionJobVertex> includeLegacyJobVertexIDs(
-			Map<JobVertexID, ExecutionJobVertex> tasks) {
-
-		Map<JobVertexID, ExecutionJobVertex> expanded = new HashMap<>(2 * tasks.size());
-		// first include all new ids
-		expanded.putAll(tasks);
-
-		// now expand and add legacy ids
-		for (ExecutionJobVertex executionJobVertex : tasks.values()) {
-			if (null != executionJobVertex) {
-				JobVertex jobVertex = executionJobVertex.getJobVertex();
-				if (null != jobVertex) {
-					List<JobVertexID> alternativeIds = jobVertex.getIdAlternatives();
-					for (JobVertexID jobVertexID : alternativeIds) {
-						ExecutionJobVertex old = expanded.put(jobVertexID, executionJobVertex);
-						Preconditions.checkState(null == old || old.equals(executionJobVertex),
-								"Ambiguous jobvertex id detected during expansion to legacy ids.");
-					}
-				}
-			}
-		}
-
-		return expanded;
-	}
-
-	public static Map<OperatorID, ExecutionJobVertex> includeAlternativeOperatorIDs(
-			Map<OperatorID, ExecutionJobVertex> operatorMapping) {
-
-		Map<OperatorID, ExecutionJobVertex> expanded = new HashMap<>(2 * operatorMapping.size());
-		// first include all existing ids
-		expanded.putAll(operatorMapping);
-
-		// now expand and add user-defined ids
-		for (ExecutionJobVertex executionJobVertex : operatorMapping.values()) {
-			if (executionJobVertex != null) {
-				JobVertex jobVertex = executionJobVertex.getJobVertex();
-				if (jobVertex != null) {
-					for (OperatorID operatorID : jobVertex.getUserDefinedOperatorIDs()) {
-						if (operatorID != null) {
-							expanded.put(operatorID, executionJobVertex);
-						}
-					}
-				}
-			}
-		}
-
-		return expanded;
-	}
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/InputOutputFormatVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/InputOutputFormatVertex.java
index 741df52..f1e7a2b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/InputOutputFormatVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/InputOutputFormatVertex.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.io.InitializeOnMaster;
 import org.apache.flink.api.common.io.InputFormat;
 import org.apache.flink.api.common.io.OutputFormat;
 import org.apache.flink.api.common.operators.util.UserCodeWrapper;
+import org.apache.flink.runtime.OperatorIDPair;
 import org.apache.flink.runtime.operators.util.TaskConfig;
 
 import java.util.HashMap;
@@ -48,11 +49,9 @@ public class InputOutputFormatVertex extends JobVertex {
 	public InputOutputFormatVertex(
 		String name,
 		JobVertexID id,
-		List<JobVertexID> alternativeIds,
-		List<OperatorID> operatorIds,
-		List<OperatorID> alternativeOperatorIds) {
+		List<OperatorIDPair> operatorIDPairs) {
 
-		super(name, id, alternativeIds, operatorIds, alternativeOperatorIds);
+		super(name, id, operatorIDPairs);
 	}
 
 	@Override
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java
index e9616a7..c77103d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.InputDependencyConstraint;
 import org.apache.flink.api.common.operators.ResourceSpec;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.io.InputSplitSource;
+import org.apache.flink.runtime.OperatorIDPair;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
@@ -55,14 +56,15 @@ public class JobVertex implements java.io.Serializable {
 	/** The ID of the vertex. */
 	private final JobVertexID id;
 
-	/** The alternative IDs of the vertex. */
-	private final ArrayList<JobVertexID> idAlternatives = new ArrayList<>();
-
-	/** The IDs of all operators contained in this vertex. */
-	private final ArrayList<OperatorID> operatorIDs = new ArrayList<>();
-
-	/** The alternative IDs of all operators contained in this vertex. */
-	private final ArrayList<OperatorID> operatorIdsAlternatives = new ArrayList<>();
+	/** The IDs of all operators contained in this vertex.
+	 *
+	 * <p>The ID pairs are stored depth-first post-order; for the forking chain below the ID's would be stored as [D, E, B, C, A].
+	 *  A - B - D
+	 *   \    \
+	 *    C    E
+	 * This is the same order that operators are stored in the {@code StreamTask}.
+	 */
+	private final List<OperatorIDPair> operatorIDs;
 
 	/** List of produced data sets, one per writer. */
 	private final ArrayList<IntermediateDataSet> results = new ArrayList<>();
@@ -143,9 +145,8 @@ public class JobVertex implements java.io.Serializable {
 	public JobVertex(String name, JobVertexID id) {
 		this.name = name == null ? DEFAULT_NAME : name;
 		this.id = id == null ? new JobVertexID() : id;
-		// the id lists must have the same size
-		this.operatorIDs.add(OperatorID.fromJobVertexID(this.id));
-		this.operatorIdsAlternatives.add(null);
+		OperatorIDPair operatorIDPair = OperatorIDPair.generatedIDOnly(OperatorID.fromJobVertexID(this.id));
+		this.operatorIDs = Collections.singletonList(operatorIDPair);
 	}
 
 	/**
@@ -153,17 +154,12 @@ public class JobVertex implements java.io.Serializable {
 	 *
 	 * @param name The name of the new job vertex.
 	 * @param primaryId The id of the job vertex.
-	 * @param alternativeIds The alternative ids of the job vertex.
-	 * @param operatorIds The ids of all operators contained in this job vertex.
-	 * @param alternativeOperatorIds The alternative ids of all operators contained in this job vertex-
+	 * @param operatorIDPairs The operator ID pairs of the job vertex.
 	 */
-	public JobVertex(String name, JobVertexID primaryId, List<JobVertexID> alternativeIds, List<OperatorID> operatorIds, List<OperatorID> alternativeOperatorIds) {
-		Preconditions.checkArgument(operatorIds.size() == alternativeOperatorIds.size());
+	public JobVertex(String name, JobVertexID primaryId, List<OperatorIDPair> operatorIDPairs) {
 		this.name = name == null ? DEFAULT_NAME : name;
 		this.id = primaryId == null ? new JobVertexID() : primaryId;
-		this.idAlternatives.addAll(alternativeIds);
-		this.operatorIDs.addAll(operatorIds);
-		this.operatorIdsAlternatives.addAll(alternativeOperatorIds);
+		this.operatorIDs = Collections.unmodifiableList(operatorIDPairs);
 	}
 
 	// --------------------------------------------------------------------------------------------
@@ -178,15 +174,6 @@ public class JobVertex implements java.io.Serializable {
 	}
 
 	/**
-	 * Returns a list of all alternative IDs of this job vertex.
-	 *
-	 * @return List of all alternative IDs for this job vertex
-	 */
-	public List<JobVertexID> getIdAlternatives() {
-		return idAlternatives;
-	}
-
-	/**
 	 * Returns the name of the vertex.
 	 *
 	 * @return The name of the vertex.
@@ -222,14 +209,10 @@ public class JobVertex implements java.io.Serializable {
 		return this.inputs.size();
 	}
 
-	public List<OperatorID> getOperatorIDs() {
+	public List<OperatorIDPair> getOperatorIDs() {
 		return operatorIDs;
 	}
 
-	public List<OperatorID> getUserDefinedOperatorIDs() {
-		return operatorIdsAlternatives;
-	}
-
 	/**
 	 * Returns the vertex's configuration object which can be used to pass custom settings to the task at runtime.
 	 *
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ChannelStateNoRescalingPartitionerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ChannelStateNoRescalingPartitionerTest.java
index a7763cb..91ae0ff 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ChannelStateNoRescalingPartitionerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ChannelStateNoRescalingPartitionerTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.flink.runtime.checkpoint;
 
+import org.apache.flink.runtime.OperatorIDPair;
 import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo;
 import org.apache.flink.runtime.checkpoint.channel.ResultSubpartitionInfo;
 import org.apache.flink.runtime.jobgraph.OperatorID;
@@ -101,7 +102,7 @@ public class ChannelStateNoRescalingPartitionerTest {
 			StateAssignmentOperation.reDistributePartitionableStates(
 				singletonList(state),
 				newParallelism,
-				singletonList(OPERATOR_ID),
+				singletonList(OperatorIDPair.generatedIDOnly(OPERATOR_ID)),
 				(Function<OperatorSubtaskState, StateObjectCollection<T>>) this.extractState,
 				channelStateNonRescalingRepartitioner("test"));
 		} catch (IllegalArgumentException e) {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorRestoringTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorRestoringTest.java
index d5acd88..0f03875 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorRestoringTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorRestoringTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.checkpoint;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.OperatorIDPair;
 import org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder;
 import org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTestingUtils.CheckpointCoordinatorConfigurationBuilder;
 import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutor;
@@ -506,7 +507,7 @@ public class CheckpointCoordinatorRestoringTest extends TestLogger {
 		List<List<Collection<OperatorStateHandle>>> actualOpStatesRaw = new ArrayList<>(newJobVertex2.getParallelism());
 		for (int i = 0; i < newJobVertex2.getParallelism(); i++) {
 
-			List<OperatorID> operatorIDs = newJobVertex2.getOperatorIDs();
+			List<OperatorIDPair> operatorIDs = newJobVertex2.getOperatorIDs();
 
 			KeyGroupsStateHandle originalKeyedStateBackend = generateKeyGroupState(jobVertexID2, newKeyGroupPartitions2.get(i), false);
 			KeyGroupsStateHandle originalKeyedStateRaw = generateKeyGroupState(jobVertexID2, newKeyGroupPartitions2.get(i), true);
@@ -520,7 +521,7 @@ public class CheckpointCoordinatorRestoringTest extends TestLogger {
 			List<Collection<OperatorStateHandle>> allParallelRawOpStates = new ArrayList<>(operatorIDs.size());
 
 			for (int idx = 0; idx < operatorIDs.size(); ++idx) {
-				OperatorID operatorID = operatorIDs.get(idx);
+				OperatorID operatorID = operatorIDs.get(idx).getGeneratedOperatorID();
 				OperatorSubtaskState opState = taskStateHandles.getSubtaskStateByOperatorID(operatorID);
 				Collection<OperatorStateHandle> opStateBackend = opState.getManagedOperatorState();
 				Collection<OperatorStateHandle> opStateRaw = opState.getRawOperatorState();
@@ -827,13 +828,13 @@ public class CheckpointCoordinatorRestoringTest extends TestLogger {
 
 		for (int i = 0; i < newJobVertex1.getParallelism(); i++) {
 
-			final List<OperatorID> operatorIds = newJobVertex1.getOperatorIDs();
+			final List<OperatorIDPair> operatorIDs = newJobVertex1.getOperatorIDs();
 
 			JobManagerTaskRestore taskRestore = newJobVertex1.getTaskVertices()[i].getCurrentExecutionAttempt().getTaskRestore();
 			Assert.assertEquals(2L, taskRestore.getRestoreCheckpointId());
 			TaskStateSnapshot stateSnapshot = taskRestore.getTaskStateSnapshot();
 
-			OperatorSubtaskState headOpState = stateSnapshot.getSubtaskStateByOperatorID(operatorIds.get(operatorIds.size() - 1));
+			OperatorSubtaskState headOpState = stateSnapshot.getSubtaskStateByOperatorID(operatorIDs.get(operatorIDs.size() - 1).getGeneratedOperatorID());
 			assertTrue(headOpState.getManagedKeyedState().isEmpty());
 			assertTrue(headOpState.getRawKeyedState().isEmpty());
 
@@ -841,7 +842,7 @@ public class CheckpointCoordinatorRestoringTest extends TestLogger {
 			{
 				int operatorIndexInChain = 2;
 				OperatorSubtaskState opState =
-					stateSnapshot.getSubtaskStateByOperatorID(operatorIds.get(operatorIndexInChain));
+					stateSnapshot.getSubtaskStateByOperatorID(operatorIDs.get(operatorIndexInChain).getGeneratedOperatorID());
 
 				assertTrue(opState.getManagedOperatorState().isEmpty());
 				assertTrue(opState.getRawOperatorState().isEmpty());
@@ -850,7 +851,7 @@ public class CheckpointCoordinatorRestoringTest extends TestLogger {
 			{
 				int operatorIndexInChain = 1;
 				OperatorSubtaskState opState =
-					stateSnapshot.getSubtaskStateByOperatorID(operatorIds.get(operatorIndexInChain));
+					stateSnapshot.getSubtaskStateByOperatorID(operatorIDs.get(operatorIndexInChain).getGeneratedOperatorID());
 
 				OperatorStateHandle expectedManagedOpState = generatePartitionableStateHandle(
 					id1.f0, i, 2, 8, false);
@@ -871,7 +872,7 @@ public class CheckpointCoordinatorRestoringTest extends TestLogger {
 			{
 				int operatorIndexInChain = 0;
 				OperatorSubtaskState opState =
-					stateSnapshot.getSubtaskStateByOperatorID(operatorIds.get(operatorIndexInChain));
+					stateSnapshot.getSubtaskStateByOperatorID(operatorIDs.get(operatorIndexInChain).getGeneratedOperatorID());
 
 				OperatorStateHandle expectedManagedOpState = generatePartitionableStateHandle(
 					id2.f0, i, 2, 8, false);
@@ -895,7 +896,7 @@ public class CheckpointCoordinatorRestoringTest extends TestLogger {
 
 		for (int i = 0; i < newJobVertex2.getParallelism(); i++) {
 
-			final List<OperatorID> operatorIds = newJobVertex2.getOperatorIDs();
+			final List<OperatorIDPair> operatorIDs = newJobVertex2.getOperatorIDs();
 
 			JobManagerTaskRestore taskRestore = newJobVertex2.getTaskVertices()[i].getCurrentExecutionAttempt().getTaskRestore();
 			Assert.assertEquals(2L, taskRestore.getRestoreCheckpointId());
@@ -905,7 +906,7 @@ public class CheckpointCoordinatorRestoringTest extends TestLogger {
 			{
 				int operatorIndexInChain = 1;
 				OperatorSubtaskState opState =
-					stateSnapshot.getSubtaskStateByOperatorID(operatorIds.get(operatorIndexInChain));
+					stateSnapshot.getSubtaskStateByOperatorID(operatorIDs.get(operatorIndexInChain).getGeneratedOperatorID());
 
 				List<Collection<OperatorStateHandle>> actualSubManagedOperatorState = new ArrayList<>(1);
 				actualSubManagedOperatorState.add(opState.getManagedOperatorState());
@@ -921,7 +922,7 @@ public class CheckpointCoordinatorRestoringTest extends TestLogger {
 			{
 				int operatorIndexInChain = 0;
 				OperatorSubtaskState opState =
-					stateSnapshot.getSubtaskStateByOperatorID(operatorIds.get(operatorIndexInChain));
+					stateSnapshot.getSubtaskStateByOperatorID(operatorIDs.get(operatorIndexInChain).getGeneratedOperatorID());
 				assertTrue(opState.getManagedOperatorState().isEmpty());
 				assertTrue(opState.getRawOperatorState().isEmpty());
 
@@ -931,7 +932,7 @@ public class CheckpointCoordinatorRestoringTest extends TestLogger {
 			KeyGroupsStateHandle originalKeyedStateRaw = generateKeyGroupState(id3.f0, newKeyGroupPartitions2.get(i), true);
 
 			OperatorSubtaskState headOpState =
-				stateSnapshot.getSubtaskStateByOperatorID(operatorIds.get(operatorIds.size() - 1));
+				stateSnapshot.getSubtaskStateByOperatorID(operatorIDs.get(operatorIDs.size() - 1).getGeneratedOperatorID());
 
 			Collection<KeyedStateHandle> keyedStateBackend = headOpState.getManagedKeyedState();
 			Collection<KeyedStateHandle> keyGroupStateRaw = headOpState.getRawKeyedState();
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
index ce662f6..d0df7e3 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
@@ -2363,7 +2363,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
 
 			Map<OperatorID, OperatorSubtaskState> opStates = new HashMap<>();
 
-			opStates.put(jobVertex1.getOperatorIDs().get(0), operatorSubtaskState);
+			opStates.put(jobVertex1.getOperatorIDs().get(0).getGeneratedOperatorID(), operatorSubtaskState);
 
 			TaskStateSnapshot taskStateSnapshot = new TaskStateSnapshot(opStates);
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTestingUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTestingUtils.java
index 95f7b58..7b1ff45 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTestingUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTestingUtils.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.core.fs.FSDataInputStream;
 import org.apache.flink.core.io.SimpleVersionedSerializer;
 import org.apache.flink.mock.Whitebox;
+import org.apache.flink.runtime.OperatorIDPair;
 import org.apache.flink.runtime.concurrent.Executors;
 import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutor;
 import org.apache.flink.runtime.concurrent.ScheduledExecutor;
@@ -65,7 +66,6 @@ import java.io.IOException;
 import java.io.Serializable;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
@@ -357,8 +357,11 @@ public class CheckpointCoordinatorTestingUtils {
 		when(executionJobVertex.getParallelism()).thenReturn(parallelism);
 		when(executionJobVertex.getMaxParallelism()).thenReturn(maxParallelism);
 		when(executionJobVertex.isMaxParallelismConfigured()).thenReturn(true);
-		when(executionJobVertex.getOperatorIDs()).thenReturn(jobVertexIDs);
-		when(executionJobVertex.getUserDefinedOperatorIDs()).thenReturn(Arrays.asList(new OperatorID[jobVertexIDs.size()]));
+		List<OperatorIDPair> operatorIDPairs = new ArrayList<>();
+		for (OperatorID operatorID : jobVertexIDs) {
+			operatorIDPairs.add(OperatorIDPair.generatedIDOnly(operatorID));
+		}
+		when(executionJobVertex.getOperatorIDs()).thenReturn(operatorIDPairs);
 
 		return executionJobVertex;
 	}
@@ -455,7 +458,11 @@ public class CheckpointCoordinatorTestingUtils {
 		when(vertex.getMaxParallelism()).thenReturn(maxParallelism);
 
 		ExecutionJobVertex jobVertex = mock(ExecutionJobVertex.class);
-		when(jobVertex.getOperatorIDs()).thenReturn(jobVertexIDs);
+		List<OperatorIDPair> operatorIDPairs = new ArrayList<>();
+		for (OperatorID operatorID : jobVertexIDs) {
+			operatorIDPairs.add(OperatorIDPair.generatedIDOnly(operatorID));
+		}
+		when(jobVertex.getOperatorIDs()).thenReturn(operatorIDPairs);
 
 		when(vertex.getJobVertex()).thenReturn(jobVertex);
 
@@ -560,8 +567,7 @@ public class CheckpointCoordinatorTestingUtils {
 		when(vertex.getMaxParallelism()).thenReturn(vertices.length);
 		when(vertex.getJobVertexId()).thenReturn(id);
 		when(vertex.getTaskVertices()).thenReturn(vertices);
-		when(vertex.getOperatorIDs()).thenReturn(Collections.singletonList(OperatorID.fromJobVertexID(id)));
-		when(vertex.getUserDefinedOperatorIDs()).thenReturn(Collections.<OperatorID>singletonList(null));
+		when(vertex.getOperatorIDs()).thenReturn(Collections.singletonList(OperatorIDPair.generatedIDOnly(OperatorID.fromJobVertexID(id))));
 
 		for (ExecutionVertex v : vertices) {
 			when(v.getJobVertex()).thenReturn(vertex);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointMetadataLoadingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointMetadataLoadingTest.java
index 6d12b14..fc93b2c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointMetadataLoadingTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointMetadataLoadingTest.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.checkpoint;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.OperatorIDPair;
 import org.apache.flink.runtime.checkpoint.metadata.CheckpointMetadata;
 import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
@@ -104,7 +105,7 @@ public class CheckpointMetadataLoadingTest {
 		ExecutionJobVertex vertex = mock(ExecutionJobVertex.class);
 		when(vertex.getParallelism()).thenReturn(parallelism);
 		when(vertex.getMaxParallelism()).thenReturn(parallelism);
-		when(vertex.getOperatorIDs()).thenReturn(Collections.singletonList(operatorID));
+		when(vertex.getOperatorIDs()).thenReturn(Collections.singletonList(OperatorIDPair.generatedIDOnly(operatorID)));
 
 		Map<JobVertexID, ExecutionJobVertex> tasks = new HashMap<>();
 		tasks.put(jobVertexID, vertex);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
index 958f3f1..bd7f2ee 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.checkpoint;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.OperatorIDPair;
 import org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder;
 import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutor;
 import org.apache.flink.runtime.execution.ExecutionState;
@@ -312,8 +313,7 @@ public class CheckpointStateRestoreTest {
 		when(vertex.getMaxParallelism()).thenReturn(vertices.length);
 		when(vertex.getJobVertexId()).thenReturn(id);
 		when(vertex.getTaskVertices()).thenReturn(vertices);
-		when(vertex.getOperatorIDs()).thenReturn(Collections.singletonList(OperatorID.fromJobVertexID(id)));
-		when(vertex.getUserDefinedOperatorIDs()).thenReturn(Collections.<OperatorID>singletonList(null));
+		when(vertex.getOperatorIDs()).thenReturn(Collections.singletonList(OperatorIDPair.generatedIDOnly(OperatorID.fromJobVertexID(id))));
 
 		for (ExecutionVertex v : vertices) {
 			when(v.getJobVertex()).thenReturn(vertex);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
index f172e51..5afcc73 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.core.fs.local.LocalFileSystem;
 import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.runtime.OperatorIDPair;
 import org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTestingUtils.StringSerializer;
 import org.apache.flink.runtime.checkpoint.PendingCheckpoint.TaskAcknowledgeResult;
 import org.apache.flink.runtime.checkpoint.hooks.MasterHooks;
@@ -87,7 +88,7 @@ public class PendingCheckpointTest {
 
 	static {
 		ExecutionJobVertex jobVertex = mock(ExecutionJobVertex.class);
-		when(jobVertex.getOperatorIDs()).thenReturn(Collections.singletonList(new OperatorID()));
+		when(jobVertex.getOperatorIDs()).thenReturn(Collections.singletonList(OperatorIDPair.generatedIDOnly(new OperatorID())));
 
 		ExecutionVertex vertex = mock(ExecutionVertex.class);
 		when(vertex.getMaxParallelism()).thenReturn(128);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperationTest.java
index 5d12738..7fed3cd 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperationTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperationTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.checkpoint;
 
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.JobException;
+import org.apache.flink.runtime.OperatorIDPair;
 import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.runtime.executiongraph.ExecutionGraph;
 import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
@@ -49,7 +50,6 @@ import java.util.function.Function;
 import java.util.stream.Collectors;
 
 import static java.util.Arrays.asList;
-import static java.util.Collections.emptyList;
 import static java.util.Collections.singletonList;
 import static org.apache.flink.runtime.checkpoint.StateHandleDummyUtil.createNewInputChannelStateHandle;
 import static org.apache.flink.runtime.checkpoint.StateHandleDummyUtil.createNewKeyedStateHandle;
@@ -173,7 +173,7 @@ public class StateAssignmentOperationTest extends TestLogger {
 			StateAssignmentOperation.reDistributePartitionableStates(
 				Collections.singletonList(operatorState),
 				newParallelism,
-				Collections.singletonList(operatorID),
+				Collections.singletonList(OperatorIDPair.generatedIDOnly(operatorID)),
 				OperatorSubtaskState::getManagedOperatorState,
 				RoundRobinOperatorStateRepartitioner.INSTANCE
 			);
@@ -324,6 +324,21 @@ public class StateAssignmentOperationTest extends TestLogger {
 		}
 	}
 
+	@Test
+	public void assigningStatesShouldWorkWithUserDefinedOperatorIdsAsWell() throws JobException, JobExecutionException {
+		int numSubTasks = 1;
+		OperatorID operatorId = new OperatorID();
+		OperatorID userDefinedOperatorId = new OperatorID();
+		Set<OperatorID> operatorIds = Collections.singleton(userDefinedOperatorId);
+
+		ExecutionJobVertex executionJobVertex = buildExecutionJobVertex(operatorId, userDefinedOperatorId, 1);
+		Map<OperatorID, OperatorState> states = buildOperatorStates(operatorIds, numSubTasks);
+
+		new StateAssignmentOperation(0, Collections.singleton(executionJobVertex), states, false).assignStates();
+
+		Assert.assertEquals(states.get(userDefinedOperatorId).getState(0), getAssignedState(executionJobVertex, operatorId, 0));
+	}
+
 	private Set<OperatorID> buildOperatorIds(int operators) {
 		Set<OperatorID> set = new HashSet<>();
 		for (int j = 0; j < operators; j++) {
@@ -349,21 +364,28 @@ public class StateAssignmentOperationTest extends TestLogger {
 		}));
 	}
 
-	private Map<OperatorID, ExecutionJobVertex> buildVertices(Set<OperatorID> operators, int parallelism) throws JobException, JobExecutionException {
+	private Map<OperatorID, ExecutionJobVertex> buildVertices(Set<OperatorID> operators, int parallelism) {
+		return operators.stream()
+			.collect(Collectors.toMap(Function.identity(), operatorID -> {
+				try {
+					return buildExecutionJobVertex(operatorID, parallelism);
+				} catch (Exception e) {
+					throw new RuntimeException(e);
+				}
+			}));
+	}
+
+	private ExecutionJobVertex buildExecutionJobVertex(OperatorID operatorID, int parallelism) throws JobException, JobExecutionException {
+		return buildExecutionJobVertex(operatorID, operatorID, parallelism);
+	}
+
+	private ExecutionJobVertex buildExecutionJobVertex(OperatorID operatorID, OperatorID userDefinedOperatorId, int parallelism) throws JobException, JobExecutionException {
 		ExecutionGraph graph = TestingExecutionGraphBuilder.newBuilder().build();
-		return operators.stream().collect(Collectors.toMap(Function.identity(), operatorID -> {
-			JobVertex jobVertex = new JobVertex(
-				operatorID.toHexString(),
-				new JobVertexID(),
-				emptyList(),
-				singletonList(operatorID),
-				singletonList(operatorID));
-			try {
-				return new ExecutionJobVertex(graph, jobVertex, parallelism, 1, Time.seconds(1), 1L, 1L);
-			} catch (Exception e) {
-				throw new RuntimeException(e);
-			}
-		}));
+		JobVertex jobVertex = new JobVertex(
+			operatorID.toHexString(),
+			new JobVertexID(),
+			singletonList(OperatorIDPair.of(operatorID, userDefinedOperatorId)));
+		return new ExecutionJobVertex(graph, jobVertex, parallelism, 1, Time.seconds(1), 1L, 1L);
 	}
 
 	private OperatorSubtaskState getAssignedState(ExecutionJobVertex executionJobVertex, OperatorID operatorId, int subtaskIdx) {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/LegacyJobVertexIdTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/LegacyJobVertexIdTest.java
deleted file mode 100644
index d70b524..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/LegacyJobVertexIdTest.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.executiongraph;
-
-import org.apache.flink.api.common.time.Time;
-import org.apache.flink.runtime.jobgraph.JobVertex;
-import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.Map;
-
-public class LegacyJobVertexIdTest {
-
-	@Test
-	public void testIntroduceLegacyJobVertexIds() throws Exception {
-		JobVertexID defaultId = new JobVertexID();
-		JobVertexID legacyId1 = new JobVertexID();
-		JobVertexID legacyId2 = new JobVertexID();
-
-		JobVertex jobVertex = new JobVertex("test", defaultId, Arrays.asList(legacyId1, legacyId2), new ArrayList<>(), new ArrayList<>());
-		jobVertex.setInvokableClass(AbstractInvokable.class);
-
-		ExecutionGraph executionGraph = TestingExecutionGraphBuilder.newBuilder().build();
-
-		ExecutionJobVertex executionJobVertex =
-				new ExecutionJobVertex(executionGraph, jobVertex, 1, Time.seconds(1));
-
-		Map<JobVertexID, ExecutionJobVertex> idToVertex = new HashMap<>();
-		idToVertex.put(executionJobVertex.getJobVertexId(), executionJobVertex);
-
-		Assert.assertEquals(executionJobVertex, idToVertex.get(defaultId));
-		Assert.assertNull(idToVertex.get(legacyId1));
-		Assert.assertNull(idToVertex.get(legacyId2));
-
-		idToVertex = ExecutionJobVertex.includeLegacyJobVertexIDs(idToVertex);
-
-		Assert.assertEquals(3, idToVertex.size());
-		Assert.assertEquals(executionJobVertex, idToVertex.get(defaultId));
-		Assert.assertEquals(executionJobVertex, idToVertex.get(legacyId1));
-		Assert.assertEquals(executionJobVertex, idToVertex.get(legacyId2));
-	}
-}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
index 1065568..52d67f3 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
@@ -25,6 +25,7 @@ import org.apache.flink.api.common.operators.ResourceSpec;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.runtime.OperatorIDPair;
 import org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy;
 import org.apache.flink.runtime.checkpoint.MasterTriggerRestoreHook;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
@@ -408,21 +409,12 @@ public class StreamingJobGraphGenerator {
 
 		JobVertexID jobVertexId = new JobVertexID(hash);
 
-		List<JobVertexID> legacyJobVertexIds = new ArrayList<>(legacyHashes.size());
-		for (Map<Integer, byte[]> legacyHash : legacyHashes) {
-			hash = legacyHash.get(streamNodeId);
-			if (null != hash) {
-				legacyJobVertexIds.add(new JobVertexID(hash));
-			}
-		}
-
 		List<Tuple2<byte[], byte[]>> chainedOperators = chainedOperatorHashes.get(streamNodeId);
-		List<OperatorID> chainedOperatorVertexIds = new ArrayList<>();
-		List<OperatorID> userDefinedChainedOperatorVertexIds = new ArrayList<>();
+		List<OperatorIDPair> operatorIDPairs = new ArrayList<>();
 		if (chainedOperators != null) {
 			for (Tuple2<byte[], byte[]> chainedOperator : chainedOperators) {
-				chainedOperatorVertexIds.add(new OperatorID(chainedOperator.f0));
-				userDefinedChainedOperatorVertexIds.add(chainedOperator.f1 != null ? new OperatorID(chainedOperator.f1) : null);
+				OperatorID userDefinedOperatorID = chainedOperator.f1 == null ? null : new OperatorID(chainedOperator.f1);
+				operatorIDPairs.add(OperatorIDPair.of(new OperatorID(chainedOperator.f0), userDefinedOperatorID));
 			}
 		}
 
@@ -430,9 +422,7 @@ public class StreamingJobGraphGenerator {
 			jobVertex = new InputOutputFormatVertex(
 					chainedNames.get(streamNodeId),
 					jobVertexId,
-					legacyJobVertexIds,
-					chainedOperatorVertexIds,
-					userDefinedChainedOperatorVertexIds);
+					operatorIDPairs);
 
 			chainedInputOutputFormats
 				.get(streamNodeId)
@@ -441,9 +431,7 @@ public class StreamingJobGraphGenerator {
 			jobVertex = new JobVertex(
 					chainedNames.get(streamNodeId),
 					jobVertexId,
-					legacyJobVertexIds,
-					chainedOperatorVertexIds,
-					userDefinedChainedOperatorVertexIds);
+					operatorIDPairs);
 		}
 
 		jobVertex.setResources(chainedMinResources.get(streamNodeId), chainedPreferredResources.get(streamNodeId));
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/graph/StreamingJobGraphGeneratorNodeHashTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/graph/StreamingJobGraphGeneratorNodeHashTest.java
index d00d0af..7cf7526 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/graph/StreamingJobGraphGeneratorNodeHashTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/graph/StreamingJobGraphGeneratorNodeHashTest.java
@@ -33,10 +33,8 @@ import org.apache.flink.streaming.api.graph.StreamGraph;
 import org.apache.flink.streaming.api.graph.StreamNode;
 import org.apache.flink.util.TestLogger;
 
-import org.junit.Assert;
 import org.junit.Test;
 
-import java.util.Arrays;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -423,27 +421,6 @@ public class StreamingJobGraphGeneratorNodeHashTest extends TestLogger {
 	}
 
 	@Test
-	public void testUserProvidedHashing() {
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
-
-		List<String> userHashes = Arrays.asList("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa", "bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb");
-
-		env.addSource(new NoOpSourceFunction(), "src").setUidHash(userHashes.get(0))
-				.map(new NoOpMapFunction())
-				.filter(new NoOpFilterFunction())
-				.keyBy(new NoOpKeySelector())
-				.reduce(new NoOpReduceFunction()).name("reduce").setUidHash(userHashes.get(1));
-
-		StreamGraph streamGraph = env.getStreamGraph();
-		int idx = 1;
-		for (JobVertex jobVertex : streamGraph.getJobGraph().getVertices()) {
-			List<JobVertexID> idAlternatives = jobVertex.getIdAlternatives();
-			Assert.assertEquals(idAlternatives.get(idAlternatives.size() - 1).toString(), userHashes.get(idx));
-			--idx;
-		}
-	}
-
-	@Test
 	public void testUserProvidedHashingOnChainSupported() {
 		StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();