You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by zh...@apache.org on 2022/07/01 13:31:40 UTC
[flink] 01/04: [FLINK-28134][runtime] Introduce SpeculativeExecutionJobVertex and SpeculativeExecutionVertex
This is an automated email from the ASF dual-hosted git repository.
zhuzh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit b96d476d4cd05de1e6c0b001f2477aaed98bd473
Author: Zhu Zhu <re...@gmail.com>
AuthorDate: Thu Jun 23 20:38:38 2022 +0800
[FLINK-28134][runtime] Introduce SpeculativeExecutionJobVertex and SpeculativeExecutionVertex
This closes #20082.
---
.../ArchivedSpeculativeExecutionVertex.java | 52 ++++
.../executiongraph/DefaultExecutionGraph.java | 11 +-
.../DefaultExecutionGraphBuilder.java | 6 +-
.../runtime/executiongraph/ExecutionJobVertex.java | 31 ++-
.../runtime/executiongraph/ExecutionVertex.java | 135 +++++-----
.../SpeculativeExecutionJobVertex.java | 67 +++++
.../executiongraph/SpeculativeExecutionVertex.java | 276 +++++++++++++++++++++
.../scheduler/DefaultExecutionGraphFactory.java | 14 +-
.../AdaptiveBatchSchedulerFactory.java | 4 +-
.../SpeculativeExecutionVertexTest.java | 226 +++++++++++++++++
.../TestingDefaultExecutionGraphBuilder.java | 10 +-
.../scheduler/DefaultExecutionDeployerTest.java | 13 +-
.../scheduler/TestingInternalFailuresListener.java | 47 ++++
.../AdaptiveBatchSchedulerTestUtils.java | 4 +-
14 files changed, 811 insertions(+), 85 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedSpeculativeExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedSpeculativeExecutionVertex.java
new file mode 100644
index 00000000000..263049414c5
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedSpeculativeExecutionVertex.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+/**
+ * {@link ArchivedSpeculativeExecutionVertex} is a readonly representation of {@link
+ * SpeculativeExecutionVertex}.
+ */
+public class ArchivedSpeculativeExecutionVertex extends ArchivedExecutionVertex {
+
+ private static final long serialVersionUID = 1L;
+
+ public ArchivedSpeculativeExecutionVertex(SpeculativeExecutionVertex vertex) {
+ super(
+ vertex.getParallelSubtaskIndex(),
+ vertex.getTaskNameWithSubtaskIndex(),
+ vertex.getCurrentExecutionAttempt().archive(),
+ getCopyOfExecutionHistory(vertex));
+ }
+
+ private static ExecutionHistory getCopyOfExecutionHistory(SpeculativeExecutionVertex vertex) {
+ final ExecutionHistory executionHistory =
+ ArchivedExecutionVertex.getCopyOfExecutionHistory(vertex);
+
+ // add all the executions to the execution history except for the only admitted current
+ // execution
+ final Execution currentAttempt = vertex.getCurrentExecutionAttempt();
+ for (Execution execution : vertex.getCurrentExecutions()) {
+ if (execution.getAttemptNumber() != currentAttempt.getAttemptNumber()) {
+ executionHistory.add(execution.archive());
+ }
+ }
+
+ return executionHistory;
+ }
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraph.java
index 40082ae1258..4e144050e7e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraph.java
@@ -286,6 +286,8 @@ public class DefaultExecutionGraph implements ExecutionGraph, InternalExecutionG
private final boolean isDynamic;
+ private final ExecutionJobVertex.Factory executionJobVertexFactory;
+
// --------------------------------------------------------------------------------------------
// Constructors
// --------------------------------------------------------------------------------------------
@@ -307,7 +309,8 @@ public class DefaultExecutionGraph implements ExecutionGraph, InternalExecutionG
long initializationTimestamp,
VertexAttemptNumberStore initialAttemptCounts,
VertexParallelismStore vertexParallelismStore,
- boolean isDynamic)
+ boolean isDynamic,
+ ExecutionJobVertex.Factory executionJobVertexFactory)
throws IOException {
this.executionGraphId = new ExecutionGraphID();
@@ -375,6 +378,8 @@ public class DefaultExecutionGraph implements ExecutionGraph, InternalExecutionG
this.isDynamic = isDynamic;
+ this.executionJobVertexFactory = checkNotNull(executionJobVertexFactory);
+
LOG.info(
"Created execution graph {} for job {}.",
executionGraphId,
@@ -840,7 +845,9 @@ public class DefaultExecutionGraph implements ExecutionGraph, InternalExecutionG
parallelismStore.getParallelismInfo(jobVertex.getID());
// create the execution job vertex and attach it to the graph
- ExecutionJobVertex ejv = new ExecutionJobVertex(this, jobVertex, parallelismInfo);
+ ExecutionJobVertex ejv =
+ executionJobVertexFactory.createExecutionJobVertex(
+ this, jobVertex, parallelismInfo);
ExecutionJobVertex previousTask = this.tasks.putIfAbsent(jobVertex.getID(), ejv);
if (previousTask != null) {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphBuilder.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphBuilder.java
index 0d47f5164e2..3b521327bbc 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphBuilder.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphBuilder.java
@@ -91,7 +91,8 @@ public class DefaultExecutionGraphBuilder {
VertexAttemptNumberStore vertexAttemptNumberStore,
VertexParallelismStore vertexParallelismStore,
Supplier<CheckpointStatsTracker> checkpointStatsTrackerFactory,
- boolean isDynamicGraph)
+ boolean isDynamicGraph,
+ ExecutionJobVertex.Factory executionJobVertexFactory)
throws JobExecutionException, JobException {
checkNotNull(jobGraph, "job graph cannot be null");
@@ -136,7 +137,8 @@ public class DefaultExecutionGraphBuilder {
initializationTimestamp,
vertexAttemptNumberStore,
vertexParallelismStore,
- isDynamicGraph);
+ isDynamicGraph,
+ executionJobVertexFactory);
} catch (IOException e) {
throw new JobException("Could not create the ExecutionGraph.", e);
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
index bc9dfd94bb5..7ac2156b5bc 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
@@ -188,7 +188,7 @@ public class ExecutionJobVertex
// create all task vertices
for (int i = 0; i < this.parallelismInfo.getParallelism(); i++) {
ExecutionVertex vertex =
- new ExecutionVertex(
+ createExecutionVertex(
this,
i,
producedDataSets,
@@ -260,6 +260,24 @@ public class ExecutionJobVertex
}
}
+ protected ExecutionVertex createExecutionVertex(
+ ExecutionJobVertex jobVertex,
+ int subTaskIndex,
+ IntermediateResult[] producedDataSets,
+ Time timeout,
+ long createTimestamp,
+ int executionHistorySizeLimit,
+ int initialAttemptCount) {
+ return new ExecutionVertex(
+ jobVertex,
+ subTaskIndex,
+ producedDataSets,
+ timeout,
+ createTimestamp,
+ executionHistorySizeLimit,
+ initialAttemptCount);
+ }
+
public boolean isInitialized() {
return taskVertices != null;
}
@@ -596,4 +614,15 @@ public class ExecutionJobVertex
return ExecutionState.CREATED;
}
}
+
+ /** Factory to create {@link ExecutionJobVertex}. */
+ public static class Factory {
+ ExecutionJobVertex createExecutionJobVertex(
+ InternalExecutionGraphAccessor graph,
+ JobVertex jobVertex,
+ VertexParallelismInformation parallelismInfo)
+ throws JobException {
+ return new ExecutionJobVertex(graph, jobVertex, parallelismInfo);
+ }
+ }
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
index 5da5bf7c8b6..abda5427f4a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
@@ -49,6 +49,7 @@ import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import static org.apache.flink.runtime.execution.ExecutionState.FINISHED;
+import static org.apache.flink.util.Preconditions.checkState;
/**
* The ExecutionVertex is a parallel subtask of the execution. It may be executed once, or several
@@ -61,7 +62,7 @@ public class ExecutionVertex
// --------------------------------------------------------------------------------------------
- private final ExecutionJobVertex jobVertex;
+ final ExecutionJobVertex jobVertex;
private final Map<IntermediateResultPartitionID, IntermediateResultPartition> resultPartitions;
@@ -69,7 +70,7 @@ public class ExecutionVertex
private final ExecutionVertexID executionVertexId;
- private final ExecutionHistory executionHistory;
+ final ExecutionHistory executionHistory;
private final Time timeout;
@@ -77,9 +78,11 @@ public class ExecutionVertex
private final String taskNameWithSubtask;
/** The current or latest execution attempt of this vertex's task. */
- private Execution currentExecution; // this field must never be null
+ Execution currentExecution; // this field must never be null
- private final ArrayList<InputSplit> inputSplits;
+ final ArrayList<InputSplit> inputSplits;
+
+ private int nextAttemptNumber;
/** This field holds the allocation id of the last successful assignment. */
@Nullable private TaskManagerLocation lastAssignedLocation;
@@ -134,24 +137,33 @@ public class ExecutionVertex
this.executionHistory = new ExecutionHistory(executionHistorySizeLimit);
- this.currentExecution =
- new Execution(
- getExecutionGraphAccessor().getFutureExecutor(),
- this,
- initialAttemptCount,
- createTimestamp,
- timeout);
-
- getExecutionGraphAccessor().registerExecution(currentExecution);
+ this.nextAttemptNumber = initialAttemptCount;
this.timeout = timeout;
this.inputSplits = new ArrayList<>();
+
+ this.currentExecution = createNewExecution(createTimestamp);
+
+ getExecutionGraphAccessor().registerExecution(currentExecution);
}
// --------------------------------------------------------------------------------------------
// Properties
// --------------------------------------------------------------------------------------------
+ Execution createNewExecution(final long timestamp) {
+ return new Execution(
+ getExecutionGraphAccessor().getFutureExecutor(),
+ this,
+ nextAttemptNumber++,
+ timestamp,
+ timeout);
+ }
+
+ public Execution getPartitionProducer() {
+ return currentExecution;
+ }
+
public JobID getJobId() {
return this.jobVertex.getJobId();
}
@@ -248,30 +260,30 @@ public class ExecutionVertex
@Override
public ExecutionState getExecutionState() {
- return currentExecution.getState();
+ return getCurrentExecutionAttempt().getState();
}
@Override
public long getStateTimestamp(ExecutionState state) {
- return currentExecution.getStateTimestamp(state);
+ return getCurrentExecutionAttempt().getStateTimestamp(state);
}
@Override
public Optional<ErrorInfo> getFailureInfo() {
- return currentExecution.getFailureInfo();
+ return getCurrentExecutionAttempt().getFailureInfo();
}
public CompletableFuture<TaskManagerLocation> getCurrentTaskManagerLocationFuture() {
- return currentExecution.getTaskManagerLocationFuture();
+ return getCurrentExecutionAttempt().getTaskManagerLocationFuture();
}
public LogicalSlot getCurrentAssignedResource() {
- return currentExecution.getAssignedResource();
+ return getCurrentExecutionAttempt().getAssignedResource();
}
@Override
public TaskManagerLocation getCurrentAssignedResourceLocation() {
- return currentExecution.getAssignedResourceLocation();
+ return getCurrentExecutionAttempt().getAssignedResourceLocation();
}
@Override
@@ -341,57 +353,56 @@ public class ExecutionVertex
}
private void resetForNewExecutionInternal(final long timestamp) {
- final Execution oldExecution = currentExecution;
- final ExecutionState oldState = oldExecution.getState();
-
- if (oldState.isTerminal()) {
- if (oldState == FINISHED) {
- // pipelined partitions are released in Execution#cancel(), covering both job
- // failures and vertex resets
- // do not release pipelined partitions here to save RPC calls
- oldExecution.handlePartitionCleanup(false, true);
- getExecutionGraphAccessor()
- .getPartitionGroupReleaseStrategy()
- .vertexUnfinished(executionVertexId);
- }
+ final boolean isFinished = (getExecutionState() == FINISHED);
- executionHistory.add(oldExecution.archive());
+ resetExecutionsInternal();
- final Execution newExecution =
- new Execution(
- getExecutionGraphAccessor().getFutureExecutor(),
- this,
- oldExecution.getAttemptNumber() + 1,
- timestamp,
- timeout);
+ InputSplitAssigner assigner = jobVertex.getSplitAssigner();
+ if (assigner != null) {
+ assigner.returnInputSplit(inputSplits, getParallelSubtaskIndex());
+ inputSplits.clear();
+ }
- currentExecution = newExecution;
+ // if the execution was 'FINISHED' before, tell the ExecutionGraph that
+ // we take one step back on the road to reaching global FINISHED
+ if (isFinished) {
+ getJobVertex().executionVertexUnFinished();
+ }
- synchronized (inputSplits) {
- InputSplitAssigner assigner = jobVertex.getSplitAssigner();
- if (assigner != null) {
- assigner.returnInputSplit(inputSplits, getParallelSubtaskIndex());
- inputSplits.clear();
- }
- }
+ // reset the intermediate results
+ for (IntermediateResultPartition resultPartition : resultPartitions.values()) {
+ resultPartition.resetForNewExecution();
+ }
- // register this execution at the execution graph, to receive call backs
- getExecutionGraphAccessor().registerExecution(newExecution);
+ final Execution newExecution = createNewExecution(timestamp);
+ currentExecution = newExecution;
- // if the execution was 'FINISHED' before, tell the ExecutionGraph that
- // we take one step back on the road to reaching global FINISHED
- if (oldState == FINISHED) {
- getJobVertex().executionVertexUnFinished();
- }
+ // register this execution to the execution graph, to receive call backs
+ getExecutionGraphAccessor().registerExecution(newExecution);
+ }
- // reset the intermediate results
- for (IntermediateResultPartition resultPartition : resultPartitions.values()) {
- resultPartition.resetForNewExecution();
- }
- } else {
- throw new IllegalStateException(
- "Cannot reset a vertex that is in non-terminal state " + oldState);
+ void resetExecutionsInternal() {
+ resetExecution(currentExecution);
+ }
+
+ void resetExecution(final Execution execution) {
+ final ExecutionState oldState = execution.getState();
+
+ checkState(
+ oldState.isTerminal(),
+ "Cannot reset an execution that is in non-terminal state " + oldState);
+
+ if (oldState == FINISHED) {
+ // pipelined partitions are released in Execution#cancel(), covering both job
+ // failures and vertex resets
+ // do not release pipelined partitions here to save RPC calls
+ execution.handlePartitionCleanup(false, true);
+ getExecutionGraphAccessor()
+ .getPartitionGroupReleaseStrategy()
+ .vertexUnfinished(executionVertexId);
}
+
+ executionHistory.add(execution.archive());
}
public void tryAssignResource(LogicalSlot slot) {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/SpeculativeExecutionJobVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/SpeculativeExecutionJobVertex.java
new file mode 100644
index 00000000000..76118ef5544
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/SpeculativeExecutionJobVertex.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.JobException;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.scheduler.VertexParallelismInformation;
+
+/** The ExecutionJobVertex which supports speculative execution. */
+public class SpeculativeExecutionJobVertex extends ExecutionJobVertex {
+
+ public SpeculativeExecutionJobVertex(
+ InternalExecutionGraphAccessor graph,
+ JobVertex jobVertex,
+ VertexParallelismInformation parallelismInfo)
+ throws JobException {
+ super(graph, jobVertex, parallelismInfo);
+ }
+
+ @Override
+ protected ExecutionVertex createExecutionVertex(
+ ExecutionJobVertex jobVertex,
+ int subTaskIndex,
+ IntermediateResult[] producedDataSets,
+ Time timeout,
+ long createTimestamp,
+ int executionHistorySizeLimit,
+ int initialAttemptCount) {
+ return new SpeculativeExecutionVertex(
+ jobVertex,
+ subTaskIndex,
+ producedDataSets,
+ timeout,
+ createTimestamp,
+ executionHistorySizeLimit,
+ initialAttemptCount);
+ }
+
+ /** Factory to create {@link SpeculativeExecutionJobVertex}. */
+ public static class Factory extends ExecutionJobVertex.Factory {
+ @Override
+ ExecutionJobVertex createExecutionJobVertex(
+ InternalExecutionGraphAccessor graph,
+ JobVertex jobVertex,
+ VertexParallelismInformation parallelismInfo)
+ throws JobException {
+ return new SpeculativeExecutionJobVertex(graph, jobVertex, parallelismInfo);
+ }
+ }
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/SpeculativeExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/SpeculativeExecutionVertex.java
new file mode 100644
index 00000000000..f819c4539b7
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/SpeculativeExecutionVertex.java
@@ -0,0 +1,276 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.jobmaster.LogicalSlot;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.util.concurrent.FutureUtils;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.runtime.execution.ExecutionState.FAILED;
+import static org.apache.flink.runtime.execution.ExecutionState.FINISHED;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/** The ExecutionVertex which supports speculative execution. */
+public class SpeculativeExecutionVertex extends ExecutionVertex {
+
+ private final Map<ExecutionAttemptID, Execution> currentExecutions;
+
+ public SpeculativeExecutionVertex(
+ ExecutionJobVertex jobVertex,
+ int subTaskIndex,
+ IntermediateResult[] producedDataSets,
+ Time timeout,
+ long createTimestamp,
+ int executionHistorySizeLimit,
+ int initialAttemptCount) {
+ super(
+ jobVertex,
+ subTaskIndex,
+ producedDataSets,
+ timeout,
+ createTimestamp,
+ executionHistorySizeLimit,
+ initialAttemptCount);
+
+ this.currentExecutions = new LinkedHashMap<>();
+ this.currentExecutions.put(currentExecution.getAttemptId(), currentExecution);
+ }
+
+ public boolean containsSources() {
+ return getJobVertex().getJobVertex().containsSources();
+ }
+
+ public boolean containsSinks() {
+ return getJobVertex().getJobVertex().containsSinks();
+ }
+
+ public Execution createNewSpeculativeExecution(final long timestamp) {
+ final Execution newExecution = createNewExecution(timestamp);
+ getExecutionGraphAccessor().registerExecution(newExecution);
+ currentExecutions.put(newExecution.getAttemptId(), newExecution);
+ return newExecution;
+ }
+
+ @Override
+ public Collection<Execution> getCurrentExecutions() {
+ return Collections.unmodifiableCollection(currentExecutions.values());
+ }
+
+ @Override
+ public Execution getPartitionProducer() {
+ final Execution finishedExecution = getCurrentExecutionAttempt();
+ checkState(
+ finishedExecution.getState() == FINISHED,
+ "It's not allowed to get the partition producer of an un-finished SpeculativeExecutionVertex");
+ return finishedExecution;
+ }
+
+ @Override
+ public CompletableFuture<?> cancel() {
+ final List<CompletableFuture<?>> cancelResultFutures =
+ new ArrayList<>(currentExecutions.size());
+ for (Execution execution : currentExecutions.values()) {
+ execution.cancel();
+ cancelResultFutures.add(execution.getReleaseFuture());
+ }
+ return FutureUtils.combineAll(cancelResultFutures);
+ }
+
+ @Override
+ public CompletableFuture<?> suspend() {
+ return FutureUtils.combineAll(
+ currentExecutions.values().stream()
+ .map(Execution::suspend)
+ .collect(Collectors.toList()));
+ }
+
+ @Override
+ public void fail(Throwable t) {
+ currentExecutions.values().forEach(e -> e.fail(t));
+ }
+
+ @Override
+ public void markFailed(Throwable t) {
+ currentExecutions.values().forEach(e -> e.markFailed(t));
+ }
+
+ @Override
+ public void resetForNewExecution() {
+ super.resetForNewExecution();
+
+ currentExecutions.clear();
+ currentExecutions.put(currentExecution.getAttemptId(), currentExecution);
+ }
+
+ @Override
+ void resetExecutionsInternal() {
+ for (Execution execution : currentExecutions.values()) {
+ resetExecution(execution);
+ }
+ }
+
+ /**
+ * Remove execution from currentExecutions if it is failed. It is needed to make room for
+ * possible future speculative executions.
+ *
+ * @param executionAttemptId attemptID of the execution to be removed
+ */
+ public void archiveFailedExecution(ExecutionAttemptID executionAttemptId) {
+ if (this.currentExecutions.size() <= 1) {
+ // Leave the last execution because currentExecutions should never be empty. This should
+ // happen only if all current executions have FAILED. A vertex reset will happen soon
+ // and will archive the remaining execution.
+ return;
+ }
+
+ final Execution removedExecution = this.currentExecutions.remove(executionAttemptId);
+ checkNotNull(
+ removedExecution,
+ "Cannot remove execution %s which does not exist.",
+ executionAttemptId);
+ checkState(
+ removedExecution.getState() == FAILED,
+ "Cannot remove execution %s which is not FAILED.",
+ executionAttemptId);
+
+ executionHistory.add(removedExecution.archive());
+ if (removedExecution == this.currentExecution) {
+ this.currentExecution = this.currentExecutions.values().iterator().next();
+ }
+ }
+
+ @Override
+ public Execution getCurrentExecutionAttempt() {
+ // returns the execution which is most likely to reach FINISHED state
+ Execution currentExecution = this.currentExecution;
+ for (Execution execution : currentExecutions.values()) {
+ if (getStatePriority(execution.getState())
+ < getStatePriority(currentExecution.getState())) {
+ currentExecution = execution;
+ }
+ }
+ return currentExecution;
+ }
+
+ private int getStatePriority(ExecutionState state) {
+ // the more likely to reach FINISHED state, the higher priority, the smaller value
+ switch (state) {
+ // CREATED/SCHEDULED/INITIALIZING/RUNNING/FINISHED are healthy states with an
+ // increasing priority
+ case FINISHED:
+ return 0;
+ case RUNNING:
+ return 1;
+ case INITIALIZING:
+ return 2;
+ case DEPLOYING:
+ return 3;
+ case SCHEDULED:
+ return 4;
+ case CREATED:
+ return 5;
+ // if the vertex is not in a healthy state, shows its CANCELING state unless it is
+ // fully FAILED or CANCELED
+ case CANCELING:
+ return 6;
+ case FAILED:
+ return 7;
+ case CANCELED:
+ return 8;
+ default:
+ throw new IllegalStateException("Execution state " + state + " is not supported.");
+ }
+ }
+
+ @Override
+ void notifyPendingDeployment(Execution execution) {
+ getExecutionGraphAccessor()
+ .getExecutionDeploymentListener()
+ .onStartedDeployment(
+ execution.getAttemptId(),
+ execution.getAssignedResourceLocation().getResourceID());
+ }
+
+ @Override
+ void notifyCompletedDeployment(Execution execution) {
+ getExecutionGraphAccessor()
+ .getExecutionDeploymentListener()
+ .onCompletedDeployment(execution.getAttemptId());
+ }
+
+ @Override
+ void notifyStateTransition(
+ Execution execution, ExecutionState previousState, ExecutionState newState) {
+ getExecutionGraphAccessor().notifyExecutionChange(execution, previousState, newState);
+ }
+
+ @Override
+ public ArchivedSpeculativeExecutionVertex archive() {
+ return new ArchivedSpeculativeExecutionVertex(this);
+ }
+
+ @Override
+ void cachePartitionInfo(PartitionInfo partitionInfo) {
+ throw new UnsupportedOperationException(
+ "Method is not supported in SpeculativeExecutionVertex.");
+ }
+
+ @Override
+ public void tryAssignResource(LogicalSlot slot) {
+ throw new UnsupportedOperationException(
+ "Method is not supported in SpeculativeExecutionVertex.");
+ }
+
+ @Override
+ public void deploy() {
+ throw new UnsupportedOperationException(
+ "Method is not supported in SpeculativeExecutionVertex.");
+ }
+
+ @Override
+ public void deployToSlot(LogicalSlot slot) {
+ throw new UnsupportedOperationException(
+ "Method is not supported in SpeculativeExecutionVertex.");
+ }
+
+ @Override
+ public Optional<TaskManagerLocation> getPreferredLocationBasedOnState() {
+ throw new UnsupportedOperationException(
+ "Method is not supported in SpeculativeExecutionVertex.");
+ }
+
+ @Override
+ public CompletableFuture<TaskManagerLocation> getCurrentTaskManagerLocationFuture() {
+ throw new UnsupportedOperationException(
+ "Method is not supported in SpeculativeExecutionVertex.");
+ }
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultExecutionGraphFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultExecutionGraphFactory.java
index 192242fe952..6d3bcb54953 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultExecutionGraphFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultExecutionGraphFactory.java
@@ -31,6 +31,7 @@ import org.apache.flink.runtime.deployment.TaskDeploymentDescriptorFactory;
import org.apache.flink.runtime.executiongraph.DefaultExecutionGraphBuilder;
import org.apache.flink.runtime.executiongraph.ExecutionDeploymentListener;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.ExecutionStateUpdateListener;
import org.apache.flink.runtime.executiongraph.VertexAttemptNumberStore;
import org.apache.flink.runtime.io.network.partition.JobMasterPartitionTracker;
@@ -49,6 +50,8 @@ import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.function.Supplier;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
/** Default {@link ExecutionGraphFactory} implementation. */
public class DefaultExecutionGraphFactory implements ExecutionGraphFactory {
@@ -64,6 +67,7 @@ public class DefaultExecutionGraphFactory implements ExecutionGraphFactory {
private final JobMasterPartitionTracker jobMasterPartitionTracker;
private final Supplier<CheckpointStatsTracker> checkpointStatsTrackerFactory;
private final boolean isDynamicGraph;
+ private final ExecutionJobVertex.Factory executionJobVertexFactory;
public DefaultExecutionGraphFactory(
Configuration configuration,
@@ -87,7 +91,8 @@ public class DefaultExecutionGraphFactory implements ExecutionGraphFactory {
blobWriter,
shuffleMaster,
jobMasterPartitionTracker,
- false);
+ false,
+ new ExecutionJobVertex.Factory());
}
public DefaultExecutionGraphFactory(
@@ -101,7 +106,8 @@ public class DefaultExecutionGraphFactory implements ExecutionGraphFactory {
BlobWriter blobWriter,
ShuffleMaster<?> shuffleMaster,
JobMasterPartitionTracker jobMasterPartitionTracker,
- boolean isDynamicGraph) {
+ boolean isDynamicGraph,
+ ExecutionJobVertex.Factory executionJobVertexFactory) {
this.configuration = configuration;
this.userCodeClassLoader = userCodeClassLoader;
this.executionDeploymentTracker = executionDeploymentTracker;
@@ -120,6 +126,7 @@ public class DefaultExecutionGraphFactory implements ExecutionGraphFactory {
WebOptions.CHECKPOINTS_HISTORY_SIZE),
jobManagerJobMetricGroup));
this.isDynamicGraph = isDynamicGraph;
+ this.executionJobVertexFactory = checkNotNull(executionJobVertexFactory);
}
@Override
@@ -167,7 +174,8 @@ public class DefaultExecutionGraphFactory implements ExecutionGraphFactory {
vertexAttemptNumberStore,
vertexParallelismStore,
checkpointStatsTrackerFactory,
- isDynamicGraph);
+ isDynamicGraph,
+ executionJobVertexFactory);
final CheckpointCoordinator checkpointCoordinator =
newExecutionGraph.getCheckpointCoordinator();
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchSchedulerFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchSchedulerFactory.java
index 0cc501203b8..3aaad1dbd0d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchSchedulerFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchSchedulerFactory.java
@@ -28,6 +28,7 @@ import org.apache.flink.runtime.blob.BlobWriter;
import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
import org.apache.flink.runtime.checkpoint.CheckpointsCleaner;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.JobStatusListener;
import org.apache.flink.runtime.executiongraph.failover.flip1.FailoverStrategyFactoryLoader;
import org.apache.flink.runtime.executiongraph.failover.flip1.RestartBackoffTimeStrategy;
@@ -145,7 +146,8 @@ public class AdaptiveBatchSchedulerFactory implements SchedulerNGFactory {
blobWriter,
shuffleMaster,
partitionTracker,
- true);
+ true,
+ new ExecutionJobVertex.Factory());
return new AdaptiveBatchScheduler(
log,
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/SpeculativeExecutionVertexTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/SpeculativeExecutionVertexTest.java
new file mode 100644
index 00000000000..dff88244fee
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/SpeculativeExecutionVertexTest.java
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.scheduler.TestingInternalFailuresListener;
+import org.apache.flink.testutils.TestingUtils;
+import org.apache.flink.testutils.executor.TestExecutorExtension;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ScheduledExecutorService;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for the {@link SpeculativeExecutionVertex}. */
+class SpeculativeExecutionVertexTest {
+
+ @RegisterExtension
+ private static final TestExecutorExtension<ScheduledExecutorService> EXECUTOR_RESOURCE =
+ TestingUtils.defaultExecutorExtension();
+
+ private TestingInternalFailuresListener internalFailuresListener;
+
+ @BeforeEach
+ void setUp() {
+ internalFailuresListener = new TestingInternalFailuresListener();
+ }
+
+ @Test
+ void testCreateSpeculativeExecution() throws Exception {
+ final SpeculativeExecutionVertex ev = createSpeculativeExecutionVertex();
+ assertThat(ev.getCurrentExecutions()).hasSize(1);
+
+ ev.createNewSpeculativeExecution(System.currentTimeMillis());
+ assertThat(ev.getCurrentExecutions()).hasSize(2);
+ }
+
+ @Test
+ void testResetExecutionVertex() throws Exception {
+ final SpeculativeExecutionVertex ev = createSpeculativeExecutionVertex();
+ final Execution e1 = ev.getCurrentExecutionAttempt();
+ final Execution e2 = ev.createNewSpeculativeExecution(System.currentTimeMillis());
+
+ e1.transitionState(ExecutionState.RUNNING);
+ e1.markFinished();
+ e2.cancel();
+ ev.resetForNewExecution();
+
+ assertThat(ev.getExecutionHistory().getHistoricalExecution(0).get().getAttemptId())
+ .isEqualTo(e1.getAttemptId());
+ assertThat(ev.getExecutionHistory().getHistoricalExecution(1).get().getAttemptId())
+ .isEqualTo(e2.getAttemptId());
+ assertThat(ev.getCurrentExecutions()).hasSize(1);
+ assertThat(ev.getCurrentExecutionAttempt().getAttemptNumber()).isEqualTo(2);
+ }
+
+ @Test
+ void testCancel() throws Exception {
+ final SpeculativeExecutionVertex ev = createSpeculativeExecutionVertex();
+ final Execution e1 = ev.getCurrentExecutionAttempt();
+ final Execution e2 = ev.createNewSpeculativeExecution(System.currentTimeMillis());
+
+ ev.cancel();
+ assertThat(e1.getState()).isSameAs(ExecutionState.CANCELED);
+ assertThat(e2.getState()).isSameAs(ExecutionState.CANCELED);
+ }
+
+ @Test
+ void testSuspend() throws Exception {
+ final SpeculativeExecutionVertex ev = createSpeculativeExecutionVertex();
+ final Execution e1 = ev.getCurrentExecutionAttempt();
+ final Execution e2 = ev.createNewSpeculativeExecution(System.currentTimeMillis());
+
+ ev.suspend();
+ assertThat(e1.getState()).isSameAs(ExecutionState.CANCELED);
+ assertThat(e2.getState()).isSameAs(ExecutionState.CANCELED);
+ }
+
+ @Test
+ void testFail() throws Exception {
+ final SpeculativeExecutionVertex ev = createSpeculativeExecutionVertex();
+ final Execution e1 = ev.getCurrentExecutionAttempt();
+ final Execution e2 = ev.createNewSpeculativeExecution(System.currentTimeMillis());
+
+ ev.fail(new Exception("Forced test failure."));
+ assertThat(internalFailuresListener.getFailedTasks())
+ .containsExactly(e1.getAttemptId(), e2.getAttemptId());
+ }
+
+ @Test
+ void testMarkFailed() throws Exception {
+ final SpeculativeExecutionVertex ev = createSpeculativeExecutionVertex();
+ final Execution e1 = ev.getCurrentExecutionAttempt();
+ final Execution e2 = ev.createNewSpeculativeExecution(System.currentTimeMillis());
+
+ ev.markFailed(new Exception("Forced test failure."));
+ assertThat(internalFailuresListener.getFailedTasks())
+ .containsExactly(e1.getAttemptId(), e2.getAttemptId());
+ }
+
+ @Test
+ void testArchiveFailedExecutions() throws Exception {
+ final SpeculativeExecutionVertex ev = createSpeculativeExecutionVertex();
+
+ final Execution e1 = ev.getCurrentExecutionAttempt();
+ e1.transitionState(ExecutionState.RUNNING);
+
+ final Execution e2 = ev.createNewSpeculativeExecution(0);
+ e2.transitionState(ExecutionState.FAILED);
+
+ ev.archiveFailedExecution(e2.getAttemptId());
+ assertThat(ev.getCurrentExecutions()).hasSize(1);
+ assertThat(ev.currentExecution).isSameAs(e1);
+
+ final Execution e3 = ev.createNewSpeculativeExecution(0);
+ e3.transitionState(ExecutionState.RUNNING);
+ e1.transitionState(ExecutionState.FAILED);
+
+ ev.archiveFailedExecution(e1.getAttemptId());
+ assertThat(ev.getCurrentExecutions()).hasSize(1);
+ assertThat(ev.currentExecution).isSameAs(e3);
+ }
+
+ @Test
+ void testArchiveTheOnlyCurrentExecution() throws Exception {
+ final SpeculativeExecutionVertex ev = createSpeculativeExecutionVertex();
+
+ final Execution e1 = ev.getCurrentExecutionAttempt();
+ e1.transitionState(ExecutionState.FAILED);
+
+ ev.archiveFailedExecution(e1.getAttemptId());
+
+ assertThat(ev.getCurrentExecutions()).hasSize(1);
+ assertThat(ev.currentExecution).isSameAs(e1);
+ }
+
+ @Test
+ void testArchiveNonFailedExecutionWithArchiveFailedExecutionMethod() {
+ Assertions.assertThrows(
+ IllegalStateException.class,
+ () -> {
+ final SpeculativeExecutionVertex ev = createSpeculativeExecutionVertex();
+
+ final Execution e1 = ev.getCurrentExecutionAttempt();
+ e1.transitionState(ExecutionState.FAILED);
+
+ final Execution e2 = ev.createNewSpeculativeExecution(0);
+ e2.transitionState(ExecutionState.RUNNING);
+
+ ev.archiveFailedExecution(e2.getAttemptId());
+ });
+ }
+
+ @Test
+ void testGetExecutionState() throws Exception {
+ final SpeculativeExecutionVertex ev = createSpeculativeExecutionVertex();
+
+ final Execution e1 = ev.getCurrentExecutionAttempt();
+ e1.transitionState(ExecutionState.CANCELED);
+ assertThat(ev.getExecutionState()).isSameAs(ExecutionState.CANCELED);
+
+ // the latter added state is more likely to reach FINISH state
+ final List<ExecutionState> statesSortedByPriority = new ArrayList<>();
+ statesSortedByPriority.add(ExecutionState.FAILED);
+ statesSortedByPriority.add(ExecutionState.CANCELING);
+ statesSortedByPriority.add(ExecutionState.CREATED);
+ statesSortedByPriority.add(ExecutionState.SCHEDULED);
+ statesSortedByPriority.add(ExecutionState.DEPLOYING);
+ statesSortedByPriority.add(ExecutionState.INITIALIZING);
+ statesSortedByPriority.add(ExecutionState.RUNNING);
+ statesSortedByPriority.add(ExecutionState.FINISHED);
+
+ for (ExecutionState state : statesSortedByPriority) {
+ final Execution execution = ev.createNewSpeculativeExecution(0);
+ execution.transitionState(state);
+ assertThat(ev.getExecutionState()).isSameAs(state);
+ }
+ }
+
+ private SpeculativeExecutionVertex createSpeculativeExecutionVertex() throws Exception {
+ final JobVertex jobVertex = ExecutionGraphTestUtils.createNoOpVertex(1);
+ final JobGraph jobGraph = JobGraphTestUtils.batchJobGraph(jobVertex);
+ final ExecutionGraph executionGraph = createExecutionGraph(jobGraph);
+ return (SpeculativeExecutionVertex)
+ executionGraph.getJobVertex(jobVertex.getID()).getTaskVertices()[0];
+ }
+
+ private ExecutionGraph createExecutionGraph(final JobGraph jobGraph) throws Exception {
+ final ExecutionGraph executionGraph =
+ TestingDefaultExecutionGraphBuilder.newBuilder()
+ .setJobGraph(jobGraph)
+ .setExecutionJobVertexFactory(new SpeculativeExecutionJobVertex.Factory())
+ .build(EXECUTOR_RESOURCE.getExecutor());
+
+ executionGraph.setInternalTaskFailuresListener(internalFailuresListener);
+ executionGraph.start(ComponentMainThreadExecutorServiceAdapter.forMainThread());
+
+ return executionGraph;
+ }
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TestingDefaultExecutionGraphBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TestingDefaultExecutionGraphBuilder.java
index 8e5e57ddea0..62933dcee34 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TestingDefaultExecutionGraphBuilder.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TestingDefaultExecutionGraphBuilder.java
@@ -73,6 +73,7 @@ public class TestingDefaultExecutionGraphBuilder {
private ExecutionStateUpdateListener executionStateUpdateListener =
(execution, previousState, newState) -> {};
private VertexParallelismStore vertexParallelismStore;
+ private ExecutionJobVertex.Factory executionJobVertexFactory = new ExecutionJobVertex.Factory();
private TestingDefaultExecutionGraphBuilder() {}
@@ -142,6 +143,12 @@ public class TestingDefaultExecutionGraphBuilder {
return this;
}
+ public TestingDefaultExecutionGraphBuilder setExecutionJobVertexFactory(
+ ExecutionJobVertex.Factory executionJobVertexFactory) {
+ this.executionJobVertexFactory = executionJobVertexFactory;
+ return this;
+ }
+
private DefaultExecutionGraph build(
boolean isDynamicGraph, ScheduledExecutorService executorService)
throws JobException, JobExecutionException {
@@ -168,7 +175,8 @@ public class TestingDefaultExecutionGraphBuilder {
Optional.ofNullable(vertexParallelismStore)
.orElseGet(() -> SchedulerBase.computeVertexParallelismStore(jobGraph)),
() -> new CheckpointStatsTracker(0, new UnregisteredMetricsGroup()),
- isDynamicGraph);
+ isDynamicGraph,
+ executionJobVertexFactory);
}
public DefaultExecutionGraph build(ScheduledExecutorService executorService)
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultExecutionDeployerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultExecutionDeployerTest.java
index 96c89093b92..3525fea6eae 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultExecutionDeployerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultExecutionDeployerTest.java
@@ -333,18 +333,7 @@ class DefaultExecutionDeployerTest {
.setPartitionTracker(partitionTracker)
.build(executor);
- executionGraph.setInternalTaskFailuresListener(
- new InternalFailuresListener() {
- @Override
- public void notifyTaskFailure(
- ExecutionAttemptID attemptId,
- Throwable t,
- boolean cancelTask,
- boolean releasePartitions) {}
-
- @Override
- public void notifyGlobalFailure(Throwable t) {}
- });
+ executionGraph.setInternalTaskFailuresListener(new TestingInternalFailuresListener());
executionGraph.start(ComponentMainThreadExecutorServiceAdapter.forMainThread());
return executionGraph;
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestingInternalFailuresListener.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestingInternalFailuresListener.java
new file mode 100644
index 00000000000..56a257243ee
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestingInternalFailuresListener.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+/** An {@link InternalFailuresListener} implementation for testing purpose. */
+public class TestingInternalFailuresListener implements InternalFailuresListener {
+
+ private final List<ExecutionAttemptID> failedTasks = new ArrayList<>();
+
+ @Override
+ public void notifyTaskFailure(
+ ExecutionAttemptID attemptId,
+ Throwable t,
+ boolean cancelTask,
+ boolean releasePartitions) {
+ failedTasks.add(attemptId);
+ }
+
+ @Override
+ public void notifyGlobalFailure(Throwable t) {}
+
+ public List<ExecutionAttemptID> getFailedTasks() {
+ return Collections.unmodifiableList(failedTasks);
+ }
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchSchedulerTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchSchedulerTestUtils.java
index 12740adb7cf..d462c726bc4 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchSchedulerTestUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchSchedulerTestUtils.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.scheduler.adaptivebatch;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobmaster.DefaultExecutionDeploymentTracker;
import org.apache.flink.runtime.scheduler.DefaultExecutionGraphFactory;
@@ -72,7 +73,8 @@ public class AdaptiveBatchSchedulerTestUtils {
blobWriter,
shuffleMaster,
partitionTracker,
- true);
+ true,
+ new ExecutionJobVertex.Factory());
return new AdaptiveBatchScheduler(
log,