You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2020/05/07 15:52:33 UTC
[flink] branch master updated:
[FLINK-16638][runtime][checkpointing] Flink checkStateMappingCompleteness
doesn't include UserDefinedOperatorIDs
This is an automated email from the ASF dual-hosted git repository.
pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 0114338 [FLINK-16638][runtime][checkpointing] Flink checkStateMappingCompleteness doesn't include UserDefinedOperatorIDs
0114338 is described below
commit 0114338da5ce52677d1dfa1ab4350b1567dc3522
Author: edu05 <ed...@gmail.com>
AuthorDate: Wed Apr 29 19:36:08 2020 +0200
[FLINK-16638][runtime][checkpointing] Flink checkStateMappingCompleteness doesn't include UserDefinedOperatorIDs
---
.../state/api/input/OperatorStateInputFormat.java | 3 +-
.../state/api/runtime/OperatorIDGeneratorTest.java | 2 +-
.../org/apache/flink/runtime/OperatorIDPair.java | 58 +++++++++++++++
.../flink/runtime/checkpoint/Checkpoints.java | 17 +----
.../runtime/checkpoint/PendingCheckpoint.java | 13 ++--
.../checkpoint/StateAssignmentOperation.java | 35 ++++-----
.../runtime/executiongraph/ExecutionJobVertex.java | 87 ++--------------------
.../runtime/jobgraph/InputOutputFormatVertex.java | 7 +-
.../apache/flink/runtime/jobgraph/JobVertex.java | 49 ++++--------
.../ChannelStateNoRescalingPartitionerTest.java | 3 +-
.../CheckpointCoordinatorRestoringTest.java | 23 +++---
.../checkpoint/CheckpointCoordinatorTest.java | 2 +-
.../CheckpointCoordinatorTestingUtils.java | 18 +++--
.../checkpoint/CheckpointMetadataLoadingTest.java | 3 +-
.../checkpoint/CheckpointStateRestoreTest.java | 4 +-
.../runtime/checkpoint/PendingCheckpointTest.java | 3 +-
.../checkpoint/StateAssignmentOperationTest.java | 54 ++++++++++----
.../executiongraph/LegacyJobVertexIdTest.java | 64 ----------------
.../api/graph/StreamingJobGraphGenerator.java | 24 ++----
.../StreamingJobGraphGeneratorNodeHashTest.java | 23 ------
20 files changed, 191 insertions(+), 301 deletions(-)
diff --git a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/input/OperatorStateInputFormat.java b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/input/OperatorStateInputFormat.java
index a312938..ec4c9b1 100644
--- a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/input/OperatorStateInputFormat.java
+++ b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/input/OperatorStateInputFormat.java
@@ -26,6 +26,7 @@ import org.apache.flink.api.common.io.statistics.BaseStatistics;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.core.io.InputSplitAssigner;
+import org.apache.flink.runtime.OperatorIDPair;
import org.apache.flink.runtime.checkpoint.OperatorState;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.runtime.checkpoint.RoundRobinOperatorStateRepartitioner;
@@ -123,7 +124,7 @@ abstract class OperatorStateInputFormat<OT> extends RichInputFormat<OT, Operator
Map<OperatorInstanceID, List<OperatorStateHandle>> newManagedOperatorStates = reDistributePartitionableStates(
singletonList(operatorState),
minNumSplits,
- singletonList(operatorState.getOperatorID()),
+ singletonList(OperatorIDPair.generatedIDOnly(operatorState.getOperatorID())),
OperatorSubtaskState::getManagedOperatorState,
RoundRobinOperatorStateRepartitioner.INSTANCE);
diff --git a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/runtime/OperatorIDGeneratorTest.java b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/runtime/OperatorIDGeneratorTest.java
index 9dff2a1..cb90641 100644
--- a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/runtime/OperatorIDGeneratorTest.java
+++ b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/runtime/OperatorIDGeneratorTest.java
@@ -65,6 +65,6 @@ public class OperatorIDGeneratorTest {
.findFirst()
.orElseThrow(() -> new IllegalStateException("Unable to find vertex"));
- return vertex.getOperatorIDs().get(0);
+ return vertex.getOperatorIDs().get(0).getGeneratedOperatorID();
}
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/OperatorIDPair.java b/flink-runtime/src/main/java/org/apache/flink/runtime/OperatorIDPair.java
new file mode 100644
index 0000000..e6070d1
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/OperatorIDPair.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime;
+
+import org.apache.flink.runtime.jobgraph.OperatorID;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+import java.util.Optional;
+
+/**
+ * Formed of a mandatory operator ID and optionally a user defined operator ID.
+ */
+public class OperatorIDPair implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ private final OperatorID generatedOperatorID;
+ private final OperatorID userDefinedOperatorID;
+
+ private OperatorIDPair(OperatorID generatedOperatorID, @Nullable OperatorID userDefinedOperatorID) {
+ this.generatedOperatorID = generatedOperatorID;
+ this.userDefinedOperatorID = userDefinedOperatorID;
+ }
+
+ public static OperatorIDPair of(OperatorID generatedOperatorID, @Nullable OperatorID userDefinedOperatorID) {
+ return new OperatorIDPair(generatedOperatorID, userDefinedOperatorID);
+ }
+
+ public static OperatorIDPair generatedIDOnly(OperatorID generatedOperatorID) {
+ return new OperatorIDPair(generatedOperatorID, null);
+ }
+
+ public OperatorID getGeneratedOperatorID() {
+ return generatedOperatorID;
+ }
+
+ public Optional<OperatorID> getUserDefinedOperatorID() {
+ return Optional.ofNullable(userDefinedOperatorID);
+ }
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/Checkpoints.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/Checkpoints.java
index 17db5e6..37f96f8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/Checkpoints.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/Checkpoints.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.checkpoint;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.OperatorIDPair;
import org.apache.flink.runtime.checkpoint.metadata.CheckpointMetadata;
import org.apache.flink.runtime.checkpoint.metadata.MetadataSerializer;
import org.apache.flink.runtime.checkpoint.metadata.MetadataSerializers;
@@ -137,28 +138,18 @@ public class Checkpoints {
// generate mapping from operator to task
Map<OperatorID, ExecutionJobVertex> operatorToJobVertexMapping = new HashMap<>();
for (ExecutionJobVertex task : tasks.values()) {
- for (OperatorID operatorID : task.getOperatorIDs()) {
- operatorToJobVertexMapping.put(operatorID, task);
+ for (OperatorIDPair operatorIDPair : task.getOperatorIDs()) {
+ operatorToJobVertexMapping.put(operatorIDPair.getGeneratedOperatorID(), task);
+ operatorIDPair.getUserDefinedOperatorID().ifPresent(id -> operatorToJobVertexMapping.put(id, task));
}
}
// (2) validate it (parallelism, etc)
- boolean expandedToLegacyIds = false;
-
HashMap<OperatorID, OperatorState> operatorStates = new HashMap<>(checkpointMetadata.getOperatorStates().size());
for (OperatorState operatorState : checkpointMetadata.getOperatorStates()) {
ExecutionJobVertex executionJobVertex = operatorToJobVertexMapping.get(operatorState.getOperatorID());
- // on the first time we can not find the execution job vertex for an id, we also consider alternative ids,
- // for example as generated from older flink versions, to provide backwards compatibility.
- if (executionJobVertex == null && !expandedToLegacyIds) {
- operatorToJobVertexMapping = ExecutionJobVertex.includeAlternativeOperatorIDs(operatorToJobVertexMapping);
- executionJobVertex = operatorToJobVertexMapping.get(operatorState.getOperatorID());
- expandedToLegacyIds = true;
- LOG.info("Could not find ExecutionJobVertex. Including user-defined OperatorIDs in search.");
- }
-
if (executionJobVertex != null) {
if (executionJobVertex.getMaxParallelism() == operatorState.getMaxParallelism()
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
index 52844f8..6ccdc53 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
@@ -19,6 +19,7 @@
package org.apache.flink.runtime.checkpoint;
import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.OperatorIDPair;
import org.apache.flink.runtime.checkpoint.metadata.CheckpointMetadata;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
@@ -371,31 +372,31 @@ public class PendingCheckpoint {
acknowledgedTasks.add(executionAttemptId);
}
- List<OperatorID> operatorIDs = vertex.getJobVertex().getOperatorIDs();
+ List<OperatorIDPair> operatorIDs = vertex.getJobVertex().getOperatorIDs();
int subtaskIndex = vertex.getParallelSubtaskIndex();
long ackTimestamp = System.currentTimeMillis();
long stateSize = 0L;
if (operatorSubtaskStates != null) {
- for (OperatorID operatorID : operatorIDs) {
+ for (OperatorIDPair operatorID : operatorIDs) {
OperatorSubtaskState operatorSubtaskState =
- operatorSubtaskStates.getSubtaskStateByOperatorID(operatorID);
+ operatorSubtaskStates.getSubtaskStateByOperatorID(operatorID.getGeneratedOperatorID());
// if no real operatorSubtaskState was reported, we insert an empty state
if (operatorSubtaskState == null) {
operatorSubtaskState = new OperatorSubtaskState();
}
- OperatorState operatorState = operatorStates.get(operatorID);
+ OperatorState operatorState = operatorStates.get(operatorID.getGeneratedOperatorID());
if (operatorState == null) {
operatorState = new OperatorState(
- operatorID,
+ operatorID.getGeneratedOperatorID(),
vertex.getTotalNumberOfParallelSubtasks(),
vertex.getMaxParallelism());
- operatorStates.put(operatorID, operatorState);
+ operatorStates.put(operatorID.getGeneratedOperatorID(), operatorState);
}
operatorState.putState(subtaskIndex, operatorSubtaskState);
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java
index 7f49828..9644430 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.checkpoint;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.OperatorIDPair;
import org.apache.flink.runtime.executiongraph.Execution;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.jobgraph.OperatorID;
@@ -86,14 +87,11 @@ public class StateAssignmentOperation {
for (ExecutionJobVertex executionJobVertex : this.tasks) {
// find the states of all operators belonging to this task
- List<OperatorID> operatorIDs = executionJobVertex.getOperatorIDs();
- List<OperatorID> altOperatorIDs = executionJobVertex.getUserDefinedOperatorIDs();
- List<OperatorState> operatorStates = new ArrayList<>(operatorIDs.size());
+ List<OperatorIDPair> operatorIDPairs = executionJobVertex.getOperatorIDs();
+ List<OperatorState> operatorStates = new ArrayList<>(operatorIDPairs.size());
boolean statelessTask = true;
- for (int x = 0; x < operatorIDs.size(); x++) {
- OperatorID operatorID = altOperatorIDs.get(x) == null
- ? operatorIDs.get(x)
- : altOperatorIDs.get(x);
+ for (OperatorIDPair operatorIDPair : operatorIDPairs) {
+ OperatorID operatorID = operatorIDPair.getUserDefinedOperatorID().orElse(operatorIDPair.getGeneratedOperatorID());
OperatorState operatorState = localOperators.remove(operatorID);
if (operatorState == null) {
@@ -115,7 +113,7 @@ public class StateAssignmentOperation {
private void assignAttemptState(ExecutionJobVertex executionJobVertex, List<OperatorState> operatorStates) {
- List<OperatorID> operatorIDs = executionJobVertex.getOperatorIDs();
+ List<OperatorIDPair> operatorIDs = executionJobVertex.getOperatorIDs();
//1. first compute the new parallelism
checkParallelismPreconditions(operatorStates, executionJobVertex);
@@ -214,7 +212,7 @@ public class StateAssignmentOperation {
Map<OperatorInstanceID, List<KeyedStateHandle>> subRawKeyedState,
int newParallelism) {
- List<OperatorID> operatorIDs = executionJobVertex.getOperatorIDs();
+ List<OperatorIDPair> operatorIDs = executionJobVertex.getOperatorIDs();
for (int subTaskIndex = 0; subTaskIndex < newParallelism; subTaskIndex++) {
@@ -224,8 +222,8 @@ public class StateAssignmentOperation {
TaskStateSnapshot taskState = new TaskStateSnapshot(operatorIDs.size());
boolean statelessTask = true;
- for (OperatorID operatorID : operatorIDs) {
- OperatorInstanceID instanceID = OperatorInstanceID.of(subTaskIndex, operatorID);
+ for (OperatorIDPair operatorID : operatorIDs) {
+ OperatorInstanceID instanceID = OperatorInstanceID.of(subTaskIndex, operatorID.getGeneratedOperatorID());
OperatorSubtaskState operatorSubtaskState = operatorSubtaskStateFrom(
instanceID,
@@ -239,7 +237,7 @@ public class StateAssignmentOperation {
if (operatorSubtaskState.hasState()) {
statelessTask = false;
}
- taskState.putSubtaskStateByOperatorID(operatorID, operatorSubtaskState);
+ taskState.putSubtaskStateByOperatorID(operatorID.getGeneratedOperatorID(), operatorSubtaskState);
}
if (!statelessTask) {
@@ -288,7 +286,7 @@ public class StateAssignmentOperation {
private void reDistributeKeyedStates(
List<OperatorState> oldOperatorStates,
int newParallelism,
- List<OperatorID> newOperatorIDs,
+ List<OperatorIDPair> newOperatorIDs,
List<KeyGroupRange> newKeyGroupPartitions,
Map<OperatorInstanceID, List<KeyedStateHandle>> newManagedKeyedState,
Map<OperatorInstanceID, List<KeyedStateHandle>> newRawKeyedState) {
@@ -300,7 +298,7 @@ public class StateAssignmentOperation {
OperatorState operatorState = oldOperatorStates.get(operatorIndex);
int oldParallelism = operatorState.getParallelism();
for (int subTaskIndex = 0; subTaskIndex < newParallelism; subTaskIndex++) {
- OperatorInstanceID instanceID = OperatorInstanceID.of(subTaskIndex, newOperatorIDs.get(operatorIndex));
+ OperatorInstanceID instanceID = OperatorInstanceID.of(subTaskIndex, newOperatorIDs.get(operatorIndex).getGeneratedOperatorID());
Tuple2<List<KeyedStateHandle>, List<KeyedStateHandle>> subKeyedStates = reAssignSubKeyedStates(
operatorState,
newKeyGroupPartitions,
@@ -347,7 +345,7 @@ public class StateAssignmentOperation {
public static <T extends StateObject> Map<OperatorInstanceID, List<T>> reDistributePartitionableStates(
List<OperatorState> oldOperatorStates,
int newParallelism,
- List<OperatorID> newOperatorIDs,
+ List<OperatorIDPair> newOperatorIDs,
Function<OperatorSubtaskState, StateObjectCollection<T>> extractHandle,
OperatorStateRepartitioner<T> stateRepartitioner) {
@@ -361,7 +359,7 @@ public class StateAssignmentOperation {
Map<OperatorInstanceID, List<T>> result = new HashMap<>();
for (int operatorIndex = 0; operatorIndex < newOperatorIDs.size(); operatorIndex++) {
result.putAll(applyRepartitioner(
- newOperatorIDs.get(operatorIndex),
+ newOperatorIDs.get(operatorIndex).getGeneratedOperatorID(),
stateRepartitioner,
oldStates.get(operatorIndex),
oldOperatorStates.get(operatorIndex).getParallelism(),
@@ -555,7 +553,10 @@ public class StateAssignmentOperation {
Set<OperatorID> allOperatorIDs = new HashSet<>();
for (ExecutionJobVertex executionJobVertex : tasks) {
- allOperatorIDs.addAll(executionJobVertex.getOperatorIDs());
+ for (OperatorIDPair operatorIDPair : executionJobVertex.getOperatorIDs()) {
+ allOperatorIDs.add(operatorIDPair.getGeneratedOperatorID());
+ operatorIDPair.getUserDefinedOperatorID().ifPresent(allOperatorIDs::add);
+ }
}
for (Map.Entry<OperatorID, OperatorState> operatorGroupStateEntry : operatorStates.entrySet()) {
OperatorState operatorState = operatorGroupStateEntry.getValue();
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
index 9cca608..ee2cd2a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
@@ -32,6 +32,7 @@ import org.apache.flink.core.io.InputSplit;
import org.apache.flink.core.io.InputSplitAssigner;
import org.apache.flink.core.io.InputSplitSource;
import org.apache.flink.runtime.JobException;
+import org.apache.flink.runtime.OperatorIDPair;
import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
import org.apache.flink.runtime.blob.BlobWriter;
import org.apache.flink.runtime.blob.PermanentBlobKey;
@@ -91,24 +92,6 @@ public class ExecutionJobVertex implements AccessExecutionJobVertex, Archiveable
private final JobVertex jobVertex;
- /**
- * The IDs of all operators contained in this execution job vertex.
- *
- * <p>The ID's are stored depth-first post-order; for the forking chain below the ID's would be stored as [D, E, B, C, A].
- * A - B - D
- * \ \
- * C E
- * This is the same order that operators are stored in the {@code StreamTask}.
- */
- private final List<OperatorID> operatorIDs;
-
- /**
- * The alternative IDs of all operators contained in this execution job vertex.
- *
- * <p>The ID's are in the same order as {@link ExecutionJobVertex#operatorIDs}.
- */
- private final List<OperatorID> userDefinedOperatorIds;
-
private final ExecutionVertex[] taskVertices;
private final IntermediateResult[] producedDataSets;
@@ -200,8 +183,6 @@ public class ExecutionJobVertex implements AccessExecutionJobVertex, Archiveable
this.resourceProfile = ResourceProfile.fromResourceSpec(jobVertex.getMinResources(), MemorySize.ZERO);
this.taskVertices = new ExecutionVertex[numTaskVertices];
- this.operatorIDs = Collections.unmodifiableList(jobVertex.getOperatorIDs());
- this.userDefinedOperatorIds = Collections.unmodifiableList(jobVertex.getUserDefinedOperatorIDs());
this.inputs = new ArrayList<>(jobVertex.getInputs().size());
@@ -289,21 +270,12 @@ public class ExecutionJobVertex implements AccessExecutionJobVertex, Archiveable
}
/**
- * Returns a list containing the IDs of all operators contained in this execution job vertex.
+ * Returns a list containing the ID pairs of all operators contained in this execution job vertex.
*
- * @return list containing the IDs of all contained operators
+ * @return list containing the ID pairs of all contained operators
*/
- public List<OperatorID> getOperatorIDs() {
- return operatorIDs;
- }
-
- /**
- * Returns a list containing the alternative IDs of all operators contained in this execution job vertex.
- *
- * @return list containing alternative the IDs of all contained operators
- */
- public List<OperatorID> getUserDefinedOperatorIDs() {
- return userDefinedOperatorIds;
+ public List<OperatorIDPair> getOperatorIDs() {
+ return jobVertex.getOperatorIDs();
}
public void setMaxParallelism(int maxParallelismDerived) {
@@ -626,53 +598,4 @@ public class ExecutionJobVertex implements AccessExecutionJobVertex, Archiveable
return ExecutionState.CREATED;
}
}
-
- public static Map<JobVertexID, ExecutionJobVertex> includeLegacyJobVertexIDs(
- Map<JobVertexID, ExecutionJobVertex> tasks) {
-
- Map<JobVertexID, ExecutionJobVertex> expanded = new HashMap<>(2 * tasks.size());
- // first include all new ids
- expanded.putAll(tasks);
-
- // now expand and add legacy ids
- for (ExecutionJobVertex executionJobVertex : tasks.values()) {
- if (null != executionJobVertex) {
- JobVertex jobVertex = executionJobVertex.getJobVertex();
- if (null != jobVertex) {
- List<JobVertexID> alternativeIds = jobVertex.getIdAlternatives();
- for (JobVertexID jobVertexID : alternativeIds) {
- ExecutionJobVertex old = expanded.put(jobVertexID, executionJobVertex);
- Preconditions.checkState(null == old || old.equals(executionJobVertex),
- "Ambiguous jobvertex id detected during expansion to legacy ids.");
- }
- }
- }
- }
-
- return expanded;
- }
-
- public static Map<OperatorID, ExecutionJobVertex> includeAlternativeOperatorIDs(
- Map<OperatorID, ExecutionJobVertex> operatorMapping) {
-
- Map<OperatorID, ExecutionJobVertex> expanded = new HashMap<>(2 * operatorMapping.size());
- // first include all existing ids
- expanded.putAll(operatorMapping);
-
- // now expand and add user-defined ids
- for (ExecutionJobVertex executionJobVertex : operatorMapping.values()) {
- if (executionJobVertex != null) {
- JobVertex jobVertex = executionJobVertex.getJobVertex();
- if (jobVertex != null) {
- for (OperatorID operatorID : jobVertex.getUserDefinedOperatorIDs()) {
- if (operatorID != null) {
- expanded.put(operatorID, executionJobVertex);
- }
- }
- }
- }
- }
-
- return expanded;
- }
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/InputOutputFormatVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/InputOutputFormatVertex.java
index 741df52..f1e7a2b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/InputOutputFormatVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/InputOutputFormatVertex.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.io.InitializeOnMaster;
import org.apache.flink.api.common.io.InputFormat;
import org.apache.flink.api.common.io.OutputFormat;
import org.apache.flink.api.common.operators.util.UserCodeWrapper;
+import org.apache.flink.runtime.OperatorIDPair;
import org.apache.flink.runtime.operators.util.TaskConfig;
import java.util.HashMap;
@@ -48,11 +49,9 @@ public class InputOutputFormatVertex extends JobVertex {
public InputOutputFormatVertex(
String name,
JobVertexID id,
- List<JobVertexID> alternativeIds,
- List<OperatorID> operatorIds,
- List<OperatorID> alternativeOperatorIds) {
+ List<OperatorIDPair> operatorIDPairs) {
- super(name, id, alternativeIds, operatorIds, alternativeOperatorIds);
+ super(name, id, operatorIDPairs);
}
@Override
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java
index e9616a7..c77103d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.InputDependencyConstraint;
import org.apache.flink.api.common.operators.ResourceSpec;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.io.InputSplitSource;
+import org.apache.flink.runtime.OperatorIDPair;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
@@ -55,14 +56,15 @@ public class JobVertex implements java.io.Serializable {
/** The ID of the vertex. */
private final JobVertexID id;
- /** The alternative IDs of the vertex. */
- private final ArrayList<JobVertexID> idAlternatives = new ArrayList<>();
-
- /** The IDs of all operators contained in this vertex. */
- private final ArrayList<OperatorID> operatorIDs = new ArrayList<>();
-
- /** The alternative IDs of all operators contained in this vertex. */
- private final ArrayList<OperatorID> operatorIdsAlternatives = new ArrayList<>();
+ /** The IDs of all operators contained in this vertex.
+ *
+ * <p>The ID pairs are stored depth-first post-order; for the forking chain below the ID's would be stored as [D, E, B, C, A].
+ * A - B - D
+ * \ \
+ * C E
+ * This is the same order that operators are stored in the {@code StreamTask}.
+ */
+ private final List<OperatorIDPair> operatorIDs;
/** List of produced data sets, one per writer. */
private final ArrayList<IntermediateDataSet> results = new ArrayList<>();
@@ -143,9 +145,8 @@ public class JobVertex implements java.io.Serializable {
public JobVertex(String name, JobVertexID id) {
this.name = name == null ? DEFAULT_NAME : name;
this.id = id == null ? new JobVertexID() : id;
- // the id lists must have the same size
- this.operatorIDs.add(OperatorID.fromJobVertexID(this.id));
- this.operatorIdsAlternatives.add(null);
+ OperatorIDPair operatorIDPair = OperatorIDPair.generatedIDOnly(OperatorID.fromJobVertexID(this.id));
+ this.operatorIDs = Collections.singletonList(operatorIDPair);
}
/**
@@ -153,17 +154,12 @@ public class JobVertex implements java.io.Serializable {
*
* @param name The name of the new job vertex.
* @param primaryId The id of the job vertex.
- * @param alternativeIds The alternative ids of the job vertex.
- * @param operatorIds The ids of all operators contained in this job vertex.
- * @param alternativeOperatorIds The alternative ids of all operators contained in this job vertex-
+ * @param operatorIDPairs The operator ID pairs of the job vertex.
*/
- public JobVertex(String name, JobVertexID primaryId, List<JobVertexID> alternativeIds, List<OperatorID> operatorIds, List<OperatorID> alternativeOperatorIds) {
- Preconditions.checkArgument(operatorIds.size() == alternativeOperatorIds.size());
+ public JobVertex(String name, JobVertexID primaryId, List<OperatorIDPair> operatorIDPairs) {
this.name = name == null ? DEFAULT_NAME : name;
this.id = primaryId == null ? new JobVertexID() : primaryId;
- this.idAlternatives.addAll(alternativeIds);
- this.operatorIDs.addAll(operatorIds);
- this.operatorIdsAlternatives.addAll(alternativeOperatorIds);
+ this.operatorIDs = Collections.unmodifiableList(operatorIDPairs);
}
// --------------------------------------------------------------------------------------------
@@ -178,15 +174,6 @@ public class JobVertex implements java.io.Serializable {
}
/**
- * Returns a list of all alternative IDs of this job vertex.
- *
- * @return List of all alternative IDs for this job vertex
- */
- public List<JobVertexID> getIdAlternatives() {
- return idAlternatives;
- }
-
- /**
* Returns the name of the vertex.
*
* @return The name of the vertex.
@@ -222,14 +209,10 @@ public class JobVertex implements java.io.Serializable {
return this.inputs.size();
}
- public List<OperatorID> getOperatorIDs() {
+ public List<OperatorIDPair> getOperatorIDs() {
return operatorIDs;
}
- public List<OperatorID> getUserDefinedOperatorIDs() {
- return operatorIdsAlternatives;
- }
-
/**
* Returns the vertex's configuration object which can be used to pass custom settings to the task at runtime.
*
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ChannelStateNoRescalingPartitionerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ChannelStateNoRescalingPartitionerTest.java
index a7763cb..91ae0ff 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ChannelStateNoRescalingPartitionerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ChannelStateNoRescalingPartitionerTest.java
@@ -17,6 +17,7 @@
package org.apache.flink.runtime.checkpoint;
+import org.apache.flink.runtime.OperatorIDPair;
import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo;
import org.apache.flink.runtime.checkpoint.channel.ResultSubpartitionInfo;
import org.apache.flink.runtime.jobgraph.OperatorID;
@@ -101,7 +102,7 @@ public class ChannelStateNoRescalingPartitionerTest {
StateAssignmentOperation.reDistributePartitionableStates(
singletonList(state),
newParallelism,
- singletonList(OPERATOR_ID),
+ singletonList(OperatorIDPair.generatedIDOnly(OPERATOR_ID)),
(Function<OperatorSubtaskState, StateObjectCollection<T>>) this.extractState,
channelStateNonRescalingRepartitioner("test"));
} catch (IllegalArgumentException e) {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorRestoringTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorRestoringTest.java
index d5acd88..0f03875 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorRestoringTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorRestoringTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.checkpoint;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.OperatorIDPair;
import org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder;
import org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTestingUtils.CheckpointCoordinatorConfigurationBuilder;
import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutor;
@@ -506,7 +507,7 @@ public class CheckpointCoordinatorRestoringTest extends TestLogger {
List<List<Collection<OperatorStateHandle>>> actualOpStatesRaw = new ArrayList<>(newJobVertex2.getParallelism());
for (int i = 0; i < newJobVertex2.getParallelism(); i++) {
- List<OperatorID> operatorIDs = newJobVertex2.getOperatorIDs();
+ List<OperatorIDPair> operatorIDs = newJobVertex2.getOperatorIDs();
KeyGroupsStateHandle originalKeyedStateBackend = generateKeyGroupState(jobVertexID2, newKeyGroupPartitions2.get(i), false);
KeyGroupsStateHandle originalKeyedStateRaw = generateKeyGroupState(jobVertexID2, newKeyGroupPartitions2.get(i), true);
@@ -520,7 +521,7 @@ public class CheckpointCoordinatorRestoringTest extends TestLogger {
List<Collection<OperatorStateHandle>> allParallelRawOpStates = new ArrayList<>(operatorIDs.size());
for (int idx = 0; idx < operatorIDs.size(); ++idx) {
- OperatorID operatorID = operatorIDs.get(idx);
+ OperatorID operatorID = operatorIDs.get(idx).getGeneratedOperatorID();
OperatorSubtaskState opState = taskStateHandles.getSubtaskStateByOperatorID(operatorID);
Collection<OperatorStateHandle> opStateBackend = opState.getManagedOperatorState();
Collection<OperatorStateHandle> opStateRaw = opState.getRawOperatorState();
@@ -827,13 +828,13 @@ public class CheckpointCoordinatorRestoringTest extends TestLogger {
for (int i = 0; i < newJobVertex1.getParallelism(); i++) {
- final List<OperatorID> operatorIds = newJobVertex1.getOperatorIDs();
+ final List<OperatorIDPair> operatorIDs = newJobVertex1.getOperatorIDs();
JobManagerTaskRestore taskRestore = newJobVertex1.getTaskVertices()[i].getCurrentExecutionAttempt().getTaskRestore();
Assert.assertEquals(2L, taskRestore.getRestoreCheckpointId());
TaskStateSnapshot stateSnapshot = taskRestore.getTaskStateSnapshot();
- OperatorSubtaskState headOpState = stateSnapshot.getSubtaskStateByOperatorID(operatorIds.get(operatorIds.size() - 1));
+ OperatorSubtaskState headOpState = stateSnapshot.getSubtaskStateByOperatorID(operatorIDs.get(operatorIDs.size() - 1).getGeneratedOperatorID());
assertTrue(headOpState.getManagedKeyedState().isEmpty());
assertTrue(headOpState.getRawKeyedState().isEmpty());
@@ -841,7 +842,7 @@ public class CheckpointCoordinatorRestoringTest extends TestLogger {
{
int operatorIndexInChain = 2;
OperatorSubtaskState opState =
- stateSnapshot.getSubtaskStateByOperatorID(operatorIds.get(operatorIndexInChain));
+ stateSnapshot.getSubtaskStateByOperatorID(operatorIDs.get(operatorIndexInChain).getGeneratedOperatorID());
assertTrue(opState.getManagedOperatorState().isEmpty());
assertTrue(opState.getRawOperatorState().isEmpty());
@@ -850,7 +851,7 @@ public class CheckpointCoordinatorRestoringTest extends TestLogger {
{
int operatorIndexInChain = 1;
OperatorSubtaskState opState =
- stateSnapshot.getSubtaskStateByOperatorID(operatorIds.get(operatorIndexInChain));
+ stateSnapshot.getSubtaskStateByOperatorID(operatorIDs.get(operatorIndexInChain).getGeneratedOperatorID());
OperatorStateHandle expectedManagedOpState = generatePartitionableStateHandle(
id1.f0, i, 2, 8, false);
@@ -871,7 +872,7 @@ public class CheckpointCoordinatorRestoringTest extends TestLogger {
{
int operatorIndexInChain = 0;
OperatorSubtaskState opState =
- stateSnapshot.getSubtaskStateByOperatorID(operatorIds.get(operatorIndexInChain));
+ stateSnapshot.getSubtaskStateByOperatorID(operatorIDs.get(operatorIndexInChain).getGeneratedOperatorID());
OperatorStateHandle expectedManagedOpState = generatePartitionableStateHandle(
id2.f0, i, 2, 8, false);
@@ -895,7 +896,7 @@ public class CheckpointCoordinatorRestoringTest extends TestLogger {
for (int i = 0; i < newJobVertex2.getParallelism(); i++) {
- final List<OperatorID> operatorIds = newJobVertex2.getOperatorIDs();
+ final List<OperatorIDPair> operatorIDs = newJobVertex2.getOperatorIDs();
JobManagerTaskRestore taskRestore = newJobVertex2.getTaskVertices()[i].getCurrentExecutionAttempt().getTaskRestore();
Assert.assertEquals(2L, taskRestore.getRestoreCheckpointId());
@@ -905,7 +906,7 @@ public class CheckpointCoordinatorRestoringTest extends TestLogger {
{
int operatorIndexInChain = 1;
OperatorSubtaskState opState =
- stateSnapshot.getSubtaskStateByOperatorID(operatorIds.get(operatorIndexInChain));
+ stateSnapshot.getSubtaskStateByOperatorID(operatorIDs.get(operatorIndexInChain).getGeneratedOperatorID());
List<Collection<OperatorStateHandle>> actualSubManagedOperatorState = new ArrayList<>(1);
actualSubManagedOperatorState.add(opState.getManagedOperatorState());
@@ -921,7 +922,7 @@ public class CheckpointCoordinatorRestoringTest extends TestLogger {
{
int operatorIndexInChain = 0;
OperatorSubtaskState opState =
- stateSnapshot.getSubtaskStateByOperatorID(operatorIds.get(operatorIndexInChain));
+ stateSnapshot.getSubtaskStateByOperatorID(operatorIDs.get(operatorIndexInChain).getGeneratedOperatorID());
assertTrue(opState.getManagedOperatorState().isEmpty());
assertTrue(opState.getRawOperatorState().isEmpty());
@@ -931,7 +932,7 @@ public class CheckpointCoordinatorRestoringTest extends TestLogger {
KeyGroupsStateHandle originalKeyedStateRaw = generateKeyGroupState(id3.f0, newKeyGroupPartitions2.get(i), true);
OperatorSubtaskState headOpState =
- stateSnapshot.getSubtaskStateByOperatorID(operatorIds.get(operatorIds.size() - 1));
+ stateSnapshot.getSubtaskStateByOperatorID(operatorIDs.get(operatorIDs.size() - 1).getGeneratedOperatorID());
Collection<KeyedStateHandle> keyedStateBackend = headOpState.getManagedKeyedState();
Collection<KeyedStateHandle> keyGroupStateRaw = headOpState.getRawKeyedState();
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
index ce662f6..d0df7e3 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
@@ -2363,7 +2363,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
Map<OperatorID, OperatorSubtaskState> opStates = new HashMap<>();
- opStates.put(jobVertex1.getOperatorIDs().get(0), operatorSubtaskState);
+ opStates.put(jobVertex1.getOperatorIDs().get(0).getGeneratedOperatorID(), operatorSubtaskState);
TaskStateSnapshot taskStateSnapshot = new TaskStateSnapshot(opStates);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTestingUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTestingUtils.java
index 95f7b58..7b1ff45 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTestingUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTestingUtils.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.mock.Whitebox;
+import org.apache.flink.runtime.OperatorIDPair;
import org.apache.flink.runtime.concurrent.Executors;
import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutor;
import org.apache.flink.runtime.concurrent.ScheduledExecutor;
@@ -65,7 +66,6 @@ import java.io.IOException;
import java.io.Serializable;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
@@ -357,8 +357,11 @@ public class CheckpointCoordinatorTestingUtils {
when(executionJobVertex.getParallelism()).thenReturn(parallelism);
when(executionJobVertex.getMaxParallelism()).thenReturn(maxParallelism);
when(executionJobVertex.isMaxParallelismConfigured()).thenReturn(true);
- when(executionJobVertex.getOperatorIDs()).thenReturn(jobVertexIDs);
- when(executionJobVertex.getUserDefinedOperatorIDs()).thenReturn(Arrays.asList(new OperatorID[jobVertexIDs.size()]));
+ List<OperatorIDPair> operatorIDPairs = new ArrayList<>();
+ for (OperatorID operatorID : jobVertexIDs) {
+ operatorIDPairs.add(OperatorIDPair.generatedIDOnly(operatorID));
+ }
+ when(executionJobVertex.getOperatorIDs()).thenReturn(operatorIDPairs);
return executionJobVertex;
}
@@ -455,7 +458,11 @@ public class CheckpointCoordinatorTestingUtils {
when(vertex.getMaxParallelism()).thenReturn(maxParallelism);
ExecutionJobVertex jobVertex = mock(ExecutionJobVertex.class);
- when(jobVertex.getOperatorIDs()).thenReturn(jobVertexIDs);
+ List<OperatorIDPair> operatorIDPairs = new ArrayList<>();
+ for (OperatorID operatorID : jobVertexIDs) {
+ operatorIDPairs.add(OperatorIDPair.generatedIDOnly(operatorID));
+ }
+ when(jobVertex.getOperatorIDs()).thenReturn(operatorIDPairs);
when(vertex.getJobVertex()).thenReturn(jobVertex);
@@ -560,8 +567,7 @@ public class CheckpointCoordinatorTestingUtils {
when(vertex.getMaxParallelism()).thenReturn(vertices.length);
when(vertex.getJobVertexId()).thenReturn(id);
when(vertex.getTaskVertices()).thenReturn(vertices);
- when(vertex.getOperatorIDs()).thenReturn(Collections.singletonList(OperatorID.fromJobVertexID(id)));
- when(vertex.getUserDefinedOperatorIDs()).thenReturn(Collections.<OperatorID>singletonList(null));
+ when(vertex.getOperatorIDs()).thenReturn(Collections.singletonList(OperatorIDPair.generatedIDOnly(OperatorID.fromJobVertexID(id))));
for (ExecutionVertex v : vertices) {
when(v.getJobVertex()).thenReturn(vertex);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointMetadataLoadingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointMetadataLoadingTest.java
index 6d12b14..fc93b2c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointMetadataLoadingTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointMetadataLoadingTest.java
@@ -19,6 +19,7 @@
package org.apache.flink.runtime.checkpoint;
import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.OperatorIDPair;
import org.apache.flink.runtime.checkpoint.metadata.CheckpointMetadata;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
@@ -104,7 +105,7 @@ public class CheckpointMetadataLoadingTest {
ExecutionJobVertex vertex = mock(ExecutionJobVertex.class);
when(vertex.getParallelism()).thenReturn(parallelism);
when(vertex.getMaxParallelism()).thenReturn(parallelism);
- when(vertex.getOperatorIDs()).thenReturn(Collections.singletonList(operatorID));
+ when(vertex.getOperatorIDs()).thenReturn(Collections.singletonList(OperatorIDPair.generatedIDOnly(operatorID)));
Map<JobVertexID, ExecutionJobVertex> tasks = new HashMap<>();
tasks.put(jobVertexID, vertex);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
index 958f3f1..bd7f2ee 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
@@ -19,6 +19,7 @@
package org.apache.flink.runtime.checkpoint;
import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.OperatorIDPair;
import org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder;
import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutor;
import org.apache.flink.runtime.execution.ExecutionState;
@@ -312,8 +313,7 @@ public class CheckpointStateRestoreTest {
when(vertex.getMaxParallelism()).thenReturn(vertices.length);
when(vertex.getJobVertexId()).thenReturn(id);
when(vertex.getTaskVertices()).thenReturn(vertices);
- when(vertex.getOperatorIDs()).thenReturn(Collections.singletonList(OperatorID.fromJobVertexID(id)));
- when(vertex.getUserDefinedOperatorIDs()).thenReturn(Collections.<OperatorID>singletonList(null));
+ when(vertex.getOperatorIDs()).thenReturn(Collections.singletonList(OperatorIDPair.generatedIDOnly(OperatorID.fromJobVertexID(id))));
for (ExecutionVertex v : vertices) {
when(v.getJobVertex()).thenReturn(vertex);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
index f172e51..5afcc73 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.JobID;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.fs.local.LocalFileSystem;
import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.runtime.OperatorIDPair;
import org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTestingUtils.StringSerializer;
import org.apache.flink.runtime.checkpoint.PendingCheckpoint.TaskAcknowledgeResult;
import org.apache.flink.runtime.checkpoint.hooks.MasterHooks;
@@ -87,7 +88,7 @@ public class PendingCheckpointTest {
static {
ExecutionJobVertex jobVertex = mock(ExecutionJobVertex.class);
- when(jobVertex.getOperatorIDs()).thenReturn(Collections.singletonList(new OperatorID()));
+ when(jobVertex.getOperatorIDs()).thenReturn(Collections.singletonList(OperatorIDPair.generatedIDOnly(new OperatorID())));
ExecutionVertex vertex = mock(ExecutionVertex.class);
when(vertex.getMaxParallelism()).thenReturn(128);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperationTest.java
index 5d12738..7fed3cd 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperationTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperationTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.checkpoint;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.JobException;
+import org.apache.flink.runtime.OperatorIDPair;
import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
@@ -49,7 +50,6 @@ import java.util.function.Function;
import java.util.stream.Collectors;
import static java.util.Arrays.asList;
-import static java.util.Collections.emptyList;
import static java.util.Collections.singletonList;
import static org.apache.flink.runtime.checkpoint.StateHandleDummyUtil.createNewInputChannelStateHandle;
import static org.apache.flink.runtime.checkpoint.StateHandleDummyUtil.createNewKeyedStateHandle;
@@ -173,7 +173,7 @@ public class StateAssignmentOperationTest extends TestLogger {
StateAssignmentOperation.reDistributePartitionableStates(
Collections.singletonList(operatorState),
newParallelism,
- Collections.singletonList(operatorID),
+ Collections.singletonList(OperatorIDPair.generatedIDOnly(operatorID)),
OperatorSubtaskState::getManagedOperatorState,
RoundRobinOperatorStateRepartitioner.INSTANCE
);
@@ -324,6 +324,21 @@ public class StateAssignmentOperationTest extends TestLogger {
}
}
+ @Test
+ public void assigningStatesShouldWorkWithUserDefinedOperatorIdsAsWell() throws JobException, JobExecutionException {
+ int numSubTasks = 1;
+ OperatorID operatorId = new OperatorID();
+ OperatorID userDefinedOperatorId = new OperatorID();
+ Set<OperatorID> operatorIds = Collections.singleton(userDefinedOperatorId);
+
+ ExecutionJobVertex executionJobVertex = buildExecutionJobVertex(operatorId, userDefinedOperatorId, 1);
+ Map<OperatorID, OperatorState> states = buildOperatorStates(operatorIds, numSubTasks);
+
+ new StateAssignmentOperation(0, Collections.singleton(executionJobVertex), states, false).assignStates();
+
+ Assert.assertEquals(states.get(userDefinedOperatorId).getState(0), getAssignedState(executionJobVertex, operatorId, 0));
+ }
+
private Set<OperatorID> buildOperatorIds(int operators) {
Set<OperatorID> set = new HashSet<>();
for (int j = 0; j < operators; j++) {
@@ -349,21 +364,28 @@ public class StateAssignmentOperationTest extends TestLogger {
}));
}
- private Map<OperatorID, ExecutionJobVertex> buildVertices(Set<OperatorID> operators, int parallelism) throws JobException, JobExecutionException {
+ private Map<OperatorID, ExecutionJobVertex> buildVertices(Set<OperatorID> operators, int parallelism) {
+ return operators.stream()
+ .collect(Collectors.toMap(Function.identity(), operatorID -> {
+ try {
+ return buildExecutionJobVertex(operatorID, parallelism);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }));
+ }
+
+ private ExecutionJobVertex buildExecutionJobVertex(OperatorID operatorID, int parallelism) throws JobException, JobExecutionException {
+ return buildExecutionJobVertex(operatorID, operatorID, parallelism);
+ }
+
+ private ExecutionJobVertex buildExecutionJobVertex(OperatorID operatorID, OperatorID userDefinedOperatorId, int parallelism) throws JobException, JobExecutionException {
ExecutionGraph graph = TestingExecutionGraphBuilder.newBuilder().build();
- return operators.stream().collect(Collectors.toMap(Function.identity(), operatorID -> {
- JobVertex jobVertex = new JobVertex(
- operatorID.toHexString(),
- new JobVertexID(),
- emptyList(),
- singletonList(operatorID),
- singletonList(operatorID));
- try {
- return new ExecutionJobVertex(graph, jobVertex, parallelism, 1, Time.seconds(1), 1L, 1L);
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }));
+ JobVertex jobVertex = new JobVertex(
+ operatorID.toHexString(),
+ new JobVertexID(),
+ singletonList(OperatorIDPair.of(operatorID, userDefinedOperatorId)));
+ return new ExecutionJobVertex(graph, jobVertex, parallelism, 1, Time.seconds(1), 1L, 1L);
}
private OperatorSubtaskState getAssignedState(ExecutionJobVertex executionJobVertex, OperatorID operatorId, int subtaskIdx) {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/LegacyJobVertexIdTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/LegacyJobVertexIdTest.java
deleted file mode 100644
index d70b524..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/LegacyJobVertexIdTest.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.executiongraph;
-
-import org.apache.flink.api.common.time.Time;
-import org.apache.flink.runtime.jobgraph.JobVertex;
-import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.Map;
-
-public class LegacyJobVertexIdTest {
-
- @Test
- public void testIntroduceLegacyJobVertexIds() throws Exception {
- JobVertexID defaultId = new JobVertexID();
- JobVertexID legacyId1 = new JobVertexID();
- JobVertexID legacyId2 = new JobVertexID();
-
- JobVertex jobVertex = new JobVertex("test", defaultId, Arrays.asList(legacyId1, legacyId2), new ArrayList<>(), new ArrayList<>());
- jobVertex.setInvokableClass(AbstractInvokable.class);
-
- ExecutionGraph executionGraph = TestingExecutionGraphBuilder.newBuilder().build();
-
- ExecutionJobVertex executionJobVertex =
- new ExecutionJobVertex(executionGraph, jobVertex, 1, Time.seconds(1));
-
- Map<JobVertexID, ExecutionJobVertex> idToVertex = new HashMap<>();
- idToVertex.put(executionJobVertex.getJobVertexId(), executionJobVertex);
-
- Assert.assertEquals(executionJobVertex, idToVertex.get(defaultId));
- Assert.assertNull(idToVertex.get(legacyId1));
- Assert.assertNull(idToVertex.get(legacyId2));
-
- idToVertex = ExecutionJobVertex.includeLegacyJobVertexIDs(idToVertex);
-
- Assert.assertEquals(3, idToVertex.size());
- Assert.assertEquals(executionJobVertex, idToVertex.get(defaultId));
- Assert.assertEquals(executionJobVertex, idToVertex.get(legacyId1));
- Assert.assertEquals(executionJobVertex, idToVertex.get(legacyId2));
- }
-}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
index 1065568..52d67f3 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
@@ -25,6 +25,7 @@ import org.apache.flink.api.common.operators.ResourceSpec;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.runtime.OperatorIDPair;
import org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy;
import org.apache.flink.runtime.checkpoint.MasterTriggerRestoreHook;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
@@ -408,21 +409,12 @@ public class StreamingJobGraphGenerator {
JobVertexID jobVertexId = new JobVertexID(hash);
- List<JobVertexID> legacyJobVertexIds = new ArrayList<>(legacyHashes.size());
- for (Map<Integer, byte[]> legacyHash : legacyHashes) {
- hash = legacyHash.get(streamNodeId);
- if (null != hash) {
- legacyJobVertexIds.add(new JobVertexID(hash));
- }
- }
-
List<Tuple2<byte[], byte[]>> chainedOperators = chainedOperatorHashes.get(streamNodeId);
- List<OperatorID> chainedOperatorVertexIds = new ArrayList<>();
- List<OperatorID> userDefinedChainedOperatorVertexIds = new ArrayList<>();
+ List<OperatorIDPair> operatorIDPairs = new ArrayList<>();
if (chainedOperators != null) {
for (Tuple2<byte[], byte[]> chainedOperator : chainedOperators) {
- chainedOperatorVertexIds.add(new OperatorID(chainedOperator.f0));
- userDefinedChainedOperatorVertexIds.add(chainedOperator.f1 != null ? new OperatorID(chainedOperator.f1) : null);
+ OperatorID userDefinedOperatorID = chainedOperator.f1 == null ? null : new OperatorID(chainedOperator.f1);
+ operatorIDPairs.add(OperatorIDPair.of(new OperatorID(chainedOperator.f0), userDefinedOperatorID));
}
}
@@ -430,9 +422,7 @@ public class StreamingJobGraphGenerator {
jobVertex = new InputOutputFormatVertex(
chainedNames.get(streamNodeId),
jobVertexId,
- legacyJobVertexIds,
- chainedOperatorVertexIds,
- userDefinedChainedOperatorVertexIds);
+ operatorIDPairs);
chainedInputOutputFormats
.get(streamNodeId)
@@ -441,9 +431,7 @@ public class StreamingJobGraphGenerator {
jobVertex = new JobVertex(
chainedNames.get(streamNodeId),
jobVertexId,
- legacyJobVertexIds,
- chainedOperatorVertexIds,
- userDefinedChainedOperatorVertexIds);
+ operatorIDPairs);
}
jobVertex.setResources(chainedMinResources.get(streamNodeId), chainedPreferredResources.get(streamNodeId));
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/graph/StreamingJobGraphGeneratorNodeHashTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/graph/StreamingJobGraphGeneratorNodeHashTest.java
index d00d0af..7cf7526 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/graph/StreamingJobGraphGeneratorNodeHashTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/graph/StreamingJobGraphGeneratorNodeHashTest.java
@@ -33,10 +33,8 @@ import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.streaming.api.graph.StreamNode;
import org.apache.flink.util.TestLogger;
-import org.junit.Assert;
import org.junit.Test;
-import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -423,27 +421,6 @@ public class StreamingJobGraphGeneratorNodeHashTest extends TestLogger {
}
@Test
- public void testUserProvidedHashing() {
- StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
-
- List<String> userHashes = Arrays.asList("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa", "bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb");
-
- env.addSource(new NoOpSourceFunction(), "src").setUidHash(userHashes.get(0))
- .map(new NoOpMapFunction())
- .filter(new NoOpFilterFunction())
- .keyBy(new NoOpKeySelector())
- .reduce(new NoOpReduceFunction()).name("reduce").setUidHash(userHashes.get(1));
-
- StreamGraph streamGraph = env.getStreamGraph();
- int idx = 1;
- for (JobVertex jobVertex : streamGraph.getJobGraph().getVertices()) {
- List<JobVertexID> idAlternatives = jobVertex.getIdAlternatives();
- Assert.assertEquals(idAlternatives.get(idAlternatives.size() - 1).toString(), userHashes.get(idx));
- --idx;
- }
- }
-
- @Test
public void testUserProvidedHashingOnChainSupported() {
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();