You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by zh...@apache.org on 2022/07/01 13:31:40 UTC

[flink] 01/04: [FLINK-28134][runtime] Introduce SpeculativeExecutionJobVertex and SpeculativeExecutionVertex

This is an automated email from the ASF dual-hosted git repository.

zhuzh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit b96d476d4cd05de1e6c0b001f2477aaed98bd473
Author: Zhu Zhu <re...@gmail.com>
AuthorDate: Thu Jun 23 20:38:38 2022 +0800

    [FLINK-28134][runtime] Introduce SpeculativeExecutionJobVertex and SpeculativeExecutionVertex
    
    This closes #20082.
---
 .../ArchivedSpeculativeExecutionVertex.java        |  52 ++++
 .../executiongraph/DefaultExecutionGraph.java      |  11 +-
 .../DefaultExecutionGraphBuilder.java              |   6 +-
 .../runtime/executiongraph/ExecutionJobVertex.java |  31 ++-
 .../runtime/executiongraph/ExecutionVertex.java    | 135 +++++-----
 .../SpeculativeExecutionJobVertex.java             |  67 +++++
 .../executiongraph/SpeculativeExecutionVertex.java | 276 +++++++++++++++++++++
 .../scheduler/DefaultExecutionGraphFactory.java    |  14 +-
 .../AdaptiveBatchSchedulerFactory.java             |   4 +-
 .../SpeculativeExecutionVertexTest.java            | 226 +++++++++++++++++
 .../TestingDefaultExecutionGraphBuilder.java       |  10 +-
 .../scheduler/DefaultExecutionDeployerTest.java    |  13 +-
 .../scheduler/TestingInternalFailuresListener.java |  47 ++++
 .../AdaptiveBatchSchedulerTestUtils.java           |   4 +-
 14 files changed, 811 insertions(+), 85 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedSpeculativeExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedSpeculativeExecutionVertex.java
new file mode 100644
index 00000000000..263049414c5
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedSpeculativeExecutionVertex.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+/**
+ * {@link ArchivedSpeculativeExecutionVertex} is a readonly representation of {@link
+ * SpeculativeExecutionVertex}.
+ */
+public class ArchivedSpeculativeExecutionVertex extends ArchivedExecutionVertex {
+
+    private static final long serialVersionUID = 1L;
+
+    public ArchivedSpeculativeExecutionVertex(SpeculativeExecutionVertex vertex) {
+        super(
+                vertex.getParallelSubtaskIndex(),
+                vertex.getTaskNameWithSubtaskIndex(),
+                vertex.getCurrentExecutionAttempt().archive(),
+                getCopyOfExecutionHistory(vertex));
+    }
+
+    private static ExecutionHistory getCopyOfExecutionHistory(SpeculativeExecutionVertex vertex) {
+        final ExecutionHistory executionHistory =
+                ArchivedExecutionVertex.getCopyOfExecutionHistory(vertex);
+
+        // add all the executions to the execution history except for the only admitted current
+        // execution
+        final Execution currentAttempt = vertex.getCurrentExecutionAttempt();
+        for (Execution execution : vertex.getCurrentExecutions()) {
+            if (execution.getAttemptNumber() != currentAttempt.getAttemptNumber()) {
+                executionHistory.add(execution.archive());
+            }
+        }
+
+        return executionHistory;
+    }
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraph.java
index 40082ae1258..4e144050e7e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraph.java
@@ -286,6 +286,8 @@ public class DefaultExecutionGraph implements ExecutionGraph, InternalExecutionG
 
     private final boolean isDynamic;
 
+    private final ExecutionJobVertex.Factory executionJobVertexFactory;
+
     // --------------------------------------------------------------------------------------------
     //   Constructors
     // --------------------------------------------------------------------------------------------
@@ -307,7 +309,8 @@ public class DefaultExecutionGraph implements ExecutionGraph, InternalExecutionG
             long initializationTimestamp,
             VertexAttemptNumberStore initialAttemptCounts,
             VertexParallelismStore vertexParallelismStore,
-            boolean isDynamic)
+            boolean isDynamic,
+            ExecutionJobVertex.Factory executionJobVertexFactory)
             throws IOException {
 
         this.executionGraphId = new ExecutionGraphID();
@@ -375,6 +378,8 @@ public class DefaultExecutionGraph implements ExecutionGraph, InternalExecutionG
 
         this.isDynamic = isDynamic;
 
+        this.executionJobVertexFactory = checkNotNull(executionJobVertexFactory);
+
         LOG.info(
                 "Created execution graph {} for job {}.",
                 executionGraphId,
@@ -840,7 +845,9 @@ public class DefaultExecutionGraph implements ExecutionGraph, InternalExecutionG
                     parallelismStore.getParallelismInfo(jobVertex.getID());
 
             // create the execution job vertex and attach it to the graph
-            ExecutionJobVertex ejv = new ExecutionJobVertex(this, jobVertex, parallelismInfo);
+            ExecutionJobVertex ejv =
+                    executionJobVertexFactory.createExecutionJobVertex(
+                            this, jobVertex, parallelismInfo);
 
             ExecutionJobVertex previousTask = this.tasks.putIfAbsent(jobVertex.getID(), ejv);
             if (previousTask != null) {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphBuilder.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphBuilder.java
index 0d47f5164e2..3b521327bbc 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphBuilder.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphBuilder.java
@@ -91,7 +91,8 @@ public class DefaultExecutionGraphBuilder {
             VertexAttemptNumberStore vertexAttemptNumberStore,
             VertexParallelismStore vertexParallelismStore,
             Supplier<CheckpointStatsTracker> checkpointStatsTrackerFactory,
-            boolean isDynamicGraph)
+            boolean isDynamicGraph,
+            ExecutionJobVertex.Factory executionJobVertexFactory)
             throws JobExecutionException, JobException {
 
         checkNotNull(jobGraph, "job graph cannot be null");
@@ -136,7 +137,8 @@ public class DefaultExecutionGraphBuilder {
                             initializationTimestamp,
                             vertexAttemptNumberStore,
                             vertexParallelismStore,
-                            isDynamicGraph);
+                            isDynamicGraph,
+                            executionJobVertexFactory);
         } catch (IOException e) {
             throw new JobException("Could not create the ExecutionGraph.", e);
         }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
index bc9dfd94bb5..7ac2156b5bc 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
@@ -188,7 +188,7 @@ public class ExecutionJobVertex
         // create all task vertices
         for (int i = 0; i < this.parallelismInfo.getParallelism(); i++) {
             ExecutionVertex vertex =
-                    new ExecutionVertex(
+                    createExecutionVertex(
                             this,
                             i,
                             producedDataSets,
@@ -260,6 +260,24 @@ public class ExecutionJobVertex
         }
     }
 
+    protected ExecutionVertex createExecutionVertex(
+            ExecutionJobVertex jobVertex,
+            int subTaskIndex,
+            IntermediateResult[] producedDataSets,
+            Time timeout,
+            long createTimestamp,
+            int executionHistorySizeLimit,
+            int initialAttemptCount) {
+        return new ExecutionVertex(
+                jobVertex,
+                subTaskIndex,
+                producedDataSets,
+                timeout,
+                createTimestamp,
+                executionHistorySizeLimit,
+                initialAttemptCount);
+    }
+
     public boolean isInitialized() {
         return taskVertices != null;
     }
@@ -596,4 +614,15 @@ public class ExecutionJobVertex
             return ExecutionState.CREATED;
         }
     }
+
+    /** Factory to create {@link ExecutionJobVertex}. */
+    public static class Factory {
+        ExecutionJobVertex createExecutionJobVertex(
+                InternalExecutionGraphAccessor graph,
+                JobVertex jobVertex,
+                VertexParallelismInformation parallelismInfo)
+                throws JobException {
+            return new ExecutionJobVertex(graph, jobVertex, parallelismInfo);
+        }
+    }
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
index 5da5bf7c8b6..abda5427f4a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
@@ -49,6 +49,7 @@ import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 
 import static org.apache.flink.runtime.execution.ExecutionState.FINISHED;
+import static org.apache.flink.util.Preconditions.checkState;
 
 /**
  * The ExecutionVertex is a parallel subtask of the execution. It may be executed once, or several
@@ -61,7 +62,7 @@ public class ExecutionVertex
 
     // --------------------------------------------------------------------------------------------
 
-    private final ExecutionJobVertex jobVertex;
+    final ExecutionJobVertex jobVertex;
 
     private final Map<IntermediateResultPartitionID, IntermediateResultPartition> resultPartitions;
 
@@ -69,7 +70,7 @@ public class ExecutionVertex
 
     private final ExecutionVertexID executionVertexId;
 
-    private final ExecutionHistory executionHistory;
+    final ExecutionHistory executionHistory;
 
     private final Time timeout;
 
@@ -77,9 +78,11 @@ public class ExecutionVertex
     private final String taskNameWithSubtask;
 
     /** The current or latest execution attempt of this vertex's task. */
-    private Execution currentExecution; // this field must never be null
+    Execution currentExecution; // this field must never be null
 
-    private final ArrayList<InputSplit> inputSplits;
+    final ArrayList<InputSplit> inputSplits;
+
+    private int nextAttemptNumber;
 
     /** This field holds the allocation id of the last successful assignment. */
     @Nullable private TaskManagerLocation lastAssignedLocation;
@@ -134,24 +137,33 @@ public class ExecutionVertex
 
         this.executionHistory = new ExecutionHistory(executionHistorySizeLimit);
 
-        this.currentExecution =
-                new Execution(
-                        getExecutionGraphAccessor().getFutureExecutor(),
-                        this,
-                        initialAttemptCount,
-                        createTimestamp,
-                        timeout);
-
-        getExecutionGraphAccessor().registerExecution(currentExecution);
+        this.nextAttemptNumber = initialAttemptCount;
 
         this.timeout = timeout;
         this.inputSplits = new ArrayList<>();
+
+        this.currentExecution = createNewExecution(createTimestamp);
+
+        getExecutionGraphAccessor().registerExecution(currentExecution);
     }
 
     // --------------------------------------------------------------------------------------------
     //  Properties
     // --------------------------------------------------------------------------------------------
 
+    Execution createNewExecution(final long timestamp) {
+        return new Execution(
+                getExecutionGraphAccessor().getFutureExecutor(),
+                this,
+                nextAttemptNumber++,
+                timestamp,
+                timeout);
+    }
+
+    public Execution getPartitionProducer() {
+        return currentExecution;
+    }
+
     public JobID getJobId() {
         return this.jobVertex.getJobId();
     }
@@ -248,30 +260,30 @@ public class ExecutionVertex
 
     @Override
     public ExecutionState getExecutionState() {
-        return currentExecution.getState();
+        return getCurrentExecutionAttempt().getState();
     }
 
     @Override
     public long getStateTimestamp(ExecutionState state) {
-        return currentExecution.getStateTimestamp(state);
+        return getCurrentExecutionAttempt().getStateTimestamp(state);
     }
 
     @Override
     public Optional<ErrorInfo> getFailureInfo() {
-        return currentExecution.getFailureInfo();
+        return getCurrentExecutionAttempt().getFailureInfo();
     }
 
     public CompletableFuture<TaskManagerLocation> getCurrentTaskManagerLocationFuture() {
-        return currentExecution.getTaskManagerLocationFuture();
+        return getCurrentExecutionAttempt().getTaskManagerLocationFuture();
     }
 
     public LogicalSlot getCurrentAssignedResource() {
-        return currentExecution.getAssignedResource();
+        return getCurrentExecutionAttempt().getAssignedResource();
     }
 
     @Override
     public TaskManagerLocation getCurrentAssignedResourceLocation() {
-        return currentExecution.getAssignedResourceLocation();
+        return getCurrentExecutionAttempt().getAssignedResourceLocation();
     }
 
     @Override
@@ -341,57 +353,56 @@ public class ExecutionVertex
     }
 
     private void resetForNewExecutionInternal(final long timestamp) {
-        final Execution oldExecution = currentExecution;
-        final ExecutionState oldState = oldExecution.getState();
-
-        if (oldState.isTerminal()) {
-            if (oldState == FINISHED) {
-                // pipelined partitions are released in Execution#cancel(), covering both job
-                // failures and vertex resets
-                // do not release pipelined partitions here to save RPC calls
-                oldExecution.handlePartitionCleanup(false, true);
-                getExecutionGraphAccessor()
-                        .getPartitionGroupReleaseStrategy()
-                        .vertexUnfinished(executionVertexId);
-            }
+        final boolean isFinished = (getExecutionState() == FINISHED);
 
-            executionHistory.add(oldExecution.archive());
+        resetExecutionsInternal();
 
-            final Execution newExecution =
-                    new Execution(
-                            getExecutionGraphAccessor().getFutureExecutor(),
-                            this,
-                            oldExecution.getAttemptNumber() + 1,
-                            timestamp,
-                            timeout);
+        InputSplitAssigner assigner = jobVertex.getSplitAssigner();
+        if (assigner != null) {
+            assigner.returnInputSplit(inputSplits, getParallelSubtaskIndex());
+            inputSplits.clear();
+        }
 
-            currentExecution = newExecution;
+        // if the execution was 'FINISHED' before, tell the ExecutionGraph that
+        // we take one step back on the road to reaching global FINISHED
+        if (isFinished) {
+            getJobVertex().executionVertexUnFinished();
+        }
 
-            synchronized (inputSplits) {
-                InputSplitAssigner assigner = jobVertex.getSplitAssigner();
-                if (assigner != null) {
-                    assigner.returnInputSplit(inputSplits, getParallelSubtaskIndex());
-                    inputSplits.clear();
-                }
-            }
+        // reset the intermediate results
+        for (IntermediateResultPartition resultPartition : resultPartitions.values()) {
+            resultPartition.resetForNewExecution();
+        }
 
-            // register this execution at the execution graph, to receive call backs
-            getExecutionGraphAccessor().registerExecution(newExecution);
+        final Execution newExecution = createNewExecution(timestamp);
+        currentExecution = newExecution;
 
-            // if the execution was 'FINISHED' before, tell the ExecutionGraph that
-            // we take one step back on the road to reaching global FINISHED
-            if (oldState == FINISHED) {
-                getJobVertex().executionVertexUnFinished();
-            }
+        // register this execution to the execution graph, to receive call backs
+        getExecutionGraphAccessor().registerExecution(newExecution);
+    }
 
-            // reset the intermediate results
-            for (IntermediateResultPartition resultPartition : resultPartitions.values()) {
-                resultPartition.resetForNewExecution();
-            }
-        } else {
-            throw new IllegalStateException(
-                    "Cannot reset a vertex that is in non-terminal state " + oldState);
+    void resetExecutionsInternal() {
+        resetExecution(currentExecution);
+    }
+
+    void resetExecution(final Execution execution) {
+        final ExecutionState oldState = execution.getState();
+
+        checkState(
+                oldState.isTerminal(),
+                "Cannot reset an execution that is in non-terminal state " + oldState);
+
+        if (oldState == FINISHED) {
+            // pipelined partitions are released in Execution#cancel(), covering both job
+            // failures and vertex resets
+            // do not release pipelined partitions here to save RPC calls
+            execution.handlePartitionCleanup(false, true);
+            getExecutionGraphAccessor()
+                    .getPartitionGroupReleaseStrategy()
+                    .vertexUnfinished(executionVertexId);
         }
+
+        executionHistory.add(execution.archive());
     }
 
     public void tryAssignResource(LogicalSlot slot) {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/SpeculativeExecutionJobVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/SpeculativeExecutionJobVertex.java
new file mode 100644
index 00000000000..76118ef5544
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/SpeculativeExecutionJobVertex.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.JobException;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.scheduler.VertexParallelismInformation;
+
+/** The ExecutionJobVertex which supports speculative execution. */
+public class SpeculativeExecutionJobVertex extends ExecutionJobVertex {
+
+    public SpeculativeExecutionJobVertex(
+            InternalExecutionGraphAccessor graph,
+            JobVertex jobVertex,
+            VertexParallelismInformation parallelismInfo)
+            throws JobException {
+        super(graph, jobVertex, parallelismInfo);
+    }
+
+    @Override
+    protected ExecutionVertex createExecutionVertex(
+            ExecutionJobVertex jobVertex,
+            int subTaskIndex,
+            IntermediateResult[] producedDataSets,
+            Time timeout,
+            long createTimestamp,
+            int executionHistorySizeLimit,
+            int initialAttemptCount) {
+        return new SpeculativeExecutionVertex(
+                jobVertex,
+                subTaskIndex,
+                producedDataSets,
+                timeout,
+                createTimestamp,
+                executionHistorySizeLimit,
+                initialAttemptCount);
+    }
+
+    /** Factory to create {@link SpeculativeExecutionJobVertex}. */
+    public static class Factory extends ExecutionJobVertex.Factory {
+        @Override
+        ExecutionJobVertex createExecutionJobVertex(
+                InternalExecutionGraphAccessor graph,
+                JobVertex jobVertex,
+                VertexParallelismInformation parallelismInfo)
+                throws JobException {
+            return new SpeculativeExecutionJobVertex(graph, jobVertex, parallelismInfo);
+        }
+    }
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/SpeculativeExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/SpeculativeExecutionVertex.java
new file mode 100644
index 00000000000..f819c4539b7
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/SpeculativeExecutionVertex.java
@@ -0,0 +1,276 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.jobmaster.LogicalSlot;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.util.concurrent.FutureUtils;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.runtime.execution.ExecutionState.FAILED;
+import static org.apache.flink.runtime.execution.ExecutionState.FINISHED;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/** The ExecutionVertex which supports speculative execution. */
+public class SpeculativeExecutionVertex extends ExecutionVertex {
+
+    private final Map<ExecutionAttemptID, Execution> currentExecutions;
+
+    public SpeculativeExecutionVertex(
+            ExecutionJobVertex jobVertex,
+            int subTaskIndex,
+            IntermediateResult[] producedDataSets,
+            Time timeout,
+            long createTimestamp,
+            int executionHistorySizeLimit,
+            int initialAttemptCount) {
+        super(
+                jobVertex,
+                subTaskIndex,
+                producedDataSets,
+                timeout,
+                createTimestamp,
+                executionHistorySizeLimit,
+                initialAttemptCount);
+
+        this.currentExecutions = new LinkedHashMap<>();
+        this.currentExecutions.put(currentExecution.getAttemptId(), currentExecution);
+    }
+
+    public boolean containsSources() {
+        return getJobVertex().getJobVertex().containsSources();
+    }
+
+    public boolean containsSinks() {
+        return getJobVertex().getJobVertex().containsSinks();
+    }
+
+    public Execution createNewSpeculativeExecution(final long timestamp) {
+        final Execution newExecution = createNewExecution(timestamp);
+        getExecutionGraphAccessor().registerExecution(newExecution);
+        currentExecutions.put(newExecution.getAttemptId(), newExecution);
+        return newExecution;
+    }
+
+    @Override
+    public Collection<Execution> getCurrentExecutions() {
+        return Collections.unmodifiableCollection(currentExecutions.values());
+    }
+
+    @Override
+    public Execution getPartitionProducer() {
+        final Execution finishedExecution = getCurrentExecutionAttempt();
+        checkState(
+                finishedExecution.getState() == FINISHED,
+                "It's not allowed to get the partition producer of an un-finished SpeculativeExecutionVertex");
+        return finishedExecution;
+    }
+
+    @Override
+    public CompletableFuture<?> cancel() {
+        final List<CompletableFuture<?>> cancelResultFutures =
+                new ArrayList<>(currentExecutions.size());
+        for (Execution execution : currentExecutions.values()) {
+            execution.cancel();
+            cancelResultFutures.add(execution.getReleaseFuture());
+        }
+        return FutureUtils.combineAll(cancelResultFutures);
+    }
+
+    @Override
+    public CompletableFuture<?> suspend() {
+        return FutureUtils.combineAll(
+                currentExecutions.values().stream()
+                        .map(Execution::suspend)
+                        .collect(Collectors.toList()));
+    }
+
+    @Override
+    public void fail(Throwable t) {
+        currentExecutions.values().forEach(e -> e.fail(t));
+    }
+
+    @Override
+    public void markFailed(Throwable t) {
+        currentExecutions.values().forEach(e -> e.markFailed(t));
+    }
+
+    @Override
+    public void resetForNewExecution() {
+        super.resetForNewExecution();
+
+        currentExecutions.clear();
+        currentExecutions.put(currentExecution.getAttemptId(), currentExecution);
+    }
+
+    @Override
+    void resetExecutionsInternal() {
+        for (Execution execution : currentExecutions.values()) {
+            resetExecution(execution);
+        }
+    }
+
+    /**
+     * Remove execution from currentExecutions if it is failed. It is needed to make room for
+     * possible future speculative executions.
+     *
+     * @param executionAttemptId attemptID of the execution to be removed
+     */
+    public void archiveFailedExecution(ExecutionAttemptID executionAttemptId) {
+        if (this.currentExecutions.size() <= 1) {
+            // Leave the last execution because currentExecutions should never be empty. This should
+            // happen only if all current executions have FAILED. A vertex reset will happen soon
+            // and will archive the remaining execution.
+            return;
+        }
+
+        final Execution removedExecution = this.currentExecutions.remove(executionAttemptId);
+        checkNotNull(
+                removedExecution,
+                "Cannot remove execution %s which does not exist.",
+                executionAttemptId);
+        checkState(
+                removedExecution.getState() == FAILED,
+                "Cannot remove execution %s which is not FAILED.",
+                executionAttemptId);
+
+        executionHistory.add(removedExecution.archive());
+        if (removedExecution == this.currentExecution) {
+            this.currentExecution = this.currentExecutions.values().iterator().next();
+        }
+    }
+
+    @Override
+    public Execution getCurrentExecutionAttempt() {
+        // returns the execution which is most likely to reach FINISHED state
+        Execution currentExecution = this.currentExecution;
+        for (Execution execution : currentExecutions.values()) {
+            if (getStatePriority(execution.getState())
+                    < getStatePriority(currentExecution.getState())) {
+                currentExecution = execution;
+            }
+        }
+        return currentExecution;
+    }
+
+    private int getStatePriority(ExecutionState state) {
+        // the more likely to reach FINISHED state, the higher priority, the smaller value
+        switch (state) {
+                // CREATED/SCHEDULED/INITIALIZING/RUNNING/FINISHED are healthy states with an
+                // increasing priority
+            case FINISHED:
+                return 0;
+            case RUNNING:
+                return 1;
+            case INITIALIZING:
+                return 2;
+            case DEPLOYING:
+                return 3;
+            case SCHEDULED:
+                return 4;
+            case CREATED:
+                return 5;
+                // if the vertex is not in a healthy state, shows its CANCELING state unless it is
+                // fully FAILED or CANCELED
+            case CANCELING:
+                return 6;
+            case FAILED:
+                return 7;
+            case CANCELED:
+                return 8;
+            default:
+                throw new IllegalStateException("Execution state " + state + " is not supported.");
+        }
+    }
+
+    @Override
+    void notifyPendingDeployment(Execution execution) {
+        getExecutionGraphAccessor()
+                .getExecutionDeploymentListener()
+                .onStartedDeployment(
+                        execution.getAttemptId(),
+                        execution.getAssignedResourceLocation().getResourceID());
+    }
+
+    @Override
+    void notifyCompletedDeployment(Execution execution) {
+        getExecutionGraphAccessor()
+                .getExecutionDeploymentListener()
+                .onCompletedDeployment(execution.getAttemptId());
+    }
+
+    @Override
+    void notifyStateTransition(
+            Execution execution, ExecutionState previousState, ExecutionState newState) {
+        getExecutionGraphAccessor().notifyExecutionChange(execution, previousState, newState);
+    }
+
+    @Override
+    public ArchivedSpeculativeExecutionVertex archive() {
+        return new ArchivedSpeculativeExecutionVertex(this);
+    }
+
+    @Override
+    void cachePartitionInfo(PartitionInfo partitionInfo) {
+        throw new UnsupportedOperationException(
+                "Method is not supported in SpeculativeExecutionVertex.");
+    }
+
+    @Override
+    public void tryAssignResource(LogicalSlot slot) {
+        throw new UnsupportedOperationException(
+                "Method is not supported in SpeculativeExecutionVertex.");
+    }
+
+    @Override
+    public void deploy() {
+        throw new UnsupportedOperationException(
+                "Method is not supported in SpeculativeExecutionVertex.");
+    }
+
+    @Override
+    public void deployToSlot(LogicalSlot slot) {
+        throw new UnsupportedOperationException(
+                "Method is not supported in SpeculativeExecutionVertex.");
+    }
+
+    @Override
+    public Optional<TaskManagerLocation> getPreferredLocationBasedOnState() {
+        throw new UnsupportedOperationException(
+                "Method is not supported in SpeculativeExecutionVertex.");
+    }
+
+    @Override
+    public CompletableFuture<TaskManagerLocation> getCurrentTaskManagerLocationFuture() {
+        throw new UnsupportedOperationException(
+                "Method is not supported in SpeculativeExecutionVertex.");
+    }
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultExecutionGraphFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultExecutionGraphFactory.java
index 192242fe952..6d3bcb54953 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultExecutionGraphFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultExecutionGraphFactory.java
@@ -31,6 +31,7 @@ import org.apache.flink.runtime.deployment.TaskDeploymentDescriptorFactory;
 import org.apache.flink.runtime.executiongraph.DefaultExecutionGraphBuilder;
 import org.apache.flink.runtime.executiongraph.ExecutionDeploymentListener;
 import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
 import org.apache.flink.runtime.executiongraph.ExecutionStateUpdateListener;
 import org.apache.flink.runtime.executiongraph.VertexAttemptNumberStore;
 import org.apache.flink.runtime.io.network.partition.JobMasterPartitionTracker;
@@ -49,6 +50,8 @@ import java.util.concurrent.Executor;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.function.Supplier;
 
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 /** Default {@link ExecutionGraphFactory} implementation. */
 public class DefaultExecutionGraphFactory implements ExecutionGraphFactory {
 
@@ -64,6 +67,7 @@ public class DefaultExecutionGraphFactory implements ExecutionGraphFactory {
     private final JobMasterPartitionTracker jobMasterPartitionTracker;
     private final Supplier<CheckpointStatsTracker> checkpointStatsTrackerFactory;
     private final boolean isDynamicGraph;
+    private final ExecutionJobVertex.Factory executionJobVertexFactory;
 
     public DefaultExecutionGraphFactory(
             Configuration configuration,
@@ -87,7 +91,8 @@ public class DefaultExecutionGraphFactory implements ExecutionGraphFactory {
                 blobWriter,
                 shuffleMaster,
                 jobMasterPartitionTracker,
-                false);
+                false,
+                new ExecutionJobVertex.Factory());
     }
 
     public DefaultExecutionGraphFactory(
@@ -101,7 +106,8 @@ public class DefaultExecutionGraphFactory implements ExecutionGraphFactory {
             BlobWriter blobWriter,
             ShuffleMaster<?> shuffleMaster,
             JobMasterPartitionTracker jobMasterPartitionTracker,
-            boolean isDynamicGraph) {
+            boolean isDynamicGraph,
+            ExecutionJobVertex.Factory executionJobVertexFactory) {
         this.configuration = configuration;
         this.userCodeClassLoader = userCodeClassLoader;
         this.executionDeploymentTracker = executionDeploymentTracker;
@@ -120,6 +126,7 @@ public class DefaultExecutionGraphFactory implements ExecutionGraphFactory {
                                                 WebOptions.CHECKPOINTS_HISTORY_SIZE),
                                         jobManagerJobMetricGroup));
         this.isDynamicGraph = isDynamicGraph;
+        this.executionJobVertexFactory = checkNotNull(executionJobVertexFactory);
     }
 
     @Override
@@ -167,7 +174,8 @@ public class DefaultExecutionGraphFactory implements ExecutionGraphFactory {
                         vertexAttemptNumberStore,
                         vertexParallelismStore,
                         checkpointStatsTrackerFactory,
-                        isDynamicGraph);
+                        isDynamicGraph,
+                        executionJobVertexFactory);
 
         final CheckpointCoordinator checkpointCoordinator =
                 newExecutionGraph.getCheckpointCoordinator();
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchSchedulerFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchSchedulerFactory.java
index 0cc501203b8..3aaad1dbd0d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchSchedulerFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchSchedulerFactory.java
@@ -28,6 +28,7 @@ import org.apache.flink.runtime.blob.BlobWriter;
 import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
 import org.apache.flink.runtime.checkpoint.CheckpointsCleaner;
 import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
 import org.apache.flink.runtime.executiongraph.JobStatusListener;
 import org.apache.flink.runtime.executiongraph.failover.flip1.FailoverStrategyFactoryLoader;
 import org.apache.flink.runtime.executiongraph.failover.flip1.RestartBackoffTimeStrategy;
@@ -145,7 +146,8 @@ public class AdaptiveBatchSchedulerFactory implements SchedulerNGFactory {
                         blobWriter,
                         shuffleMaster,
                         partitionTracker,
-                        true);
+                        true,
+                        new ExecutionJobVertex.Factory());
 
         return new AdaptiveBatchScheduler(
                 log,
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/SpeculativeExecutionVertexTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/SpeculativeExecutionVertexTest.java
new file mode 100644
index 00000000000..dff88244fee
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/SpeculativeExecutionVertexTest.java
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.scheduler.TestingInternalFailuresListener;
+import org.apache.flink.testutils.TestingUtils;
+import org.apache.flink.testutils.executor.TestExecutorExtension;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ScheduledExecutorService;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for the {@link SpeculativeExecutionVertex}. */
+class SpeculativeExecutionVertexTest {
+
+    @RegisterExtension
+    private static final TestExecutorExtension<ScheduledExecutorService> EXECUTOR_RESOURCE =
+            TestingUtils.defaultExecutorExtension();
+
+    private TestingInternalFailuresListener internalFailuresListener;
+
+    @BeforeEach
+    void setUp() {
+        internalFailuresListener = new TestingInternalFailuresListener();
+    }
+
+    @Test
+    void testCreateSpeculativeExecution() throws Exception {
+        final SpeculativeExecutionVertex ev = createSpeculativeExecutionVertex();
+        assertThat(ev.getCurrentExecutions()).hasSize(1);
+
+        ev.createNewSpeculativeExecution(System.currentTimeMillis());
+        assertThat(ev.getCurrentExecutions()).hasSize(2);
+    }
+
+    @Test
+    void testResetExecutionVertex() throws Exception {
+        final SpeculativeExecutionVertex ev = createSpeculativeExecutionVertex();
+        final Execution e1 = ev.getCurrentExecutionAttempt();
+        final Execution e2 = ev.createNewSpeculativeExecution(System.currentTimeMillis());
+
+        e1.transitionState(ExecutionState.RUNNING);
+        e1.markFinished();
+        e2.cancel();
+        ev.resetForNewExecution();
+
+        assertThat(ev.getExecutionHistory().getHistoricalExecution(0).get().getAttemptId())
+                .isEqualTo(e1.getAttemptId());
+        assertThat(ev.getExecutionHistory().getHistoricalExecution(1).get().getAttemptId())
+                .isEqualTo(e2.getAttemptId());
+        assertThat(ev.getCurrentExecutions()).hasSize(1);
+        assertThat(ev.getCurrentExecutionAttempt().getAttemptNumber()).isEqualTo(2);
+    }
+
+    @Test
+    void testCancel() throws Exception {
+        final SpeculativeExecutionVertex ev = createSpeculativeExecutionVertex();
+        final Execution e1 = ev.getCurrentExecutionAttempt();
+        final Execution e2 = ev.createNewSpeculativeExecution(System.currentTimeMillis());
+
+        ev.cancel();
+        assertThat(e1.getState()).isSameAs(ExecutionState.CANCELED);
+        assertThat(e2.getState()).isSameAs(ExecutionState.CANCELED);
+    }
+
+    @Test
+    void testSuspend() throws Exception {
+        final SpeculativeExecutionVertex ev = createSpeculativeExecutionVertex();
+        final Execution e1 = ev.getCurrentExecutionAttempt();
+        final Execution e2 = ev.createNewSpeculativeExecution(System.currentTimeMillis());
+
+        ev.suspend();
+        assertThat(e1.getState()).isSameAs(ExecutionState.CANCELED);
+        assertThat(e2.getState()).isSameAs(ExecutionState.CANCELED);
+    }
+
+    @Test
+    void testFail() throws Exception {
+        final SpeculativeExecutionVertex ev = createSpeculativeExecutionVertex();
+        final Execution e1 = ev.getCurrentExecutionAttempt();
+        final Execution e2 = ev.createNewSpeculativeExecution(System.currentTimeMillis());
+
+        ev.fail(new Exception("Forced test failure."));
+        assertThat(internalFailuresListener.getFailedTasks())
+                .containsExactly(e1.getAttemptId(), e2.getAttemptId());
+    }
+
+    @Test
+    void testMarkFailed() throws Exception {
+        final SpeculativeExecutionVertex ev = createSpeculativeExecutionVertex();
+        final Execution e1 = ev.getCurrentExecutionAttempt();
+        final Execution e2 = ev.createNewSpeculativeExecution(System.currentTimeMillis());
+
+        ev.markFailed(new Exception("Forced test failure."));
+        assertThat(internalFailuresListener.getFailedTasks())
+                .containsExactly(e1.getAttemptId(), e2.getAttemptId());
+    }
+
+    @Test
+    void testArchiveFailedExecutions() throws Exception {
+        final SpeculativeExecutionVertex ev = createSpeculativeExecutionVertex();
+
+        final Execution e1 = ev.getCurrentExecutionAttempt();
+        e1.transitionState(ExecutionState.RUNNING);
+
+        final Execution e2 = ev.createNewSpeculativeExecution(0);
+        e2.transitionState(ExecutionState.FAILED);
+
+        ev.archiveFailedExecution(e2.getAttemptId());
+        assertThat(ev.getCurrentExecutions()).hasSize(1);
+        assertThat(ev.currentExecution).isSameAs(e1);
+
+        final Execution e3 = ev.createNewSpeculativeExecution(0);
+        e3.transitionState(ExecutionState.RUNNING);
+        e1.transitionState(ExecutionState.FAILED);
+
+        ev.archiveFailedExecution(e1.getAttemptId());
+        assertThat(ev.getCurrentExecutions()).hasSize(1);
+        assertThat(ev.currentExecution).isSameAs(e3);
+    }
+
+    @Test
+    void testArchiveTheOnlyCurrentExecution() throws Exception {
+        final SpeculativeExecutionVertex ev = createSpeculativeExecutionVertex();
+
+        final Execution e1 = ev.getCurrentExecutionAttempt();
+        e1.transitionState(ExecutionState.FAILED);
+
+        ev.archiveFailedExecution(e1.getAttemptId());
+
+        assertThat(ev.getCurrentExecutions()).hasSize(1);
+        assertThat(ev.currentExecution).isSameAs(e1);
+    }
+
+    @Test
+    void testArchiveNonFailedExecutionWithArchiveFailedExecutionMethod() {
+        Assertions.assertThrows(
+                IllegalStateException.class,
+                () -> {
+                    final SpeculativeExecutionVertex ev = createSpeculativeExecutionVertex();
+
+                    final Execution e1 = ev.getCurrentExecutionAttempt();
+                    e1.transitionState(ExecutionState.FAILED);
+
+                    final Execution e2 = ev.createNewSpeculativeExecution(0);
+                    e2.transitionState(ExecutionState.RUNNING);
+
+                    ev.archiveFailedExecution(e2.getAttemptId());
+                });
+    }
+
+    @Test
+    void testGetExecutionState() throws Exception {
+        final SpeculativeExecutionVertex ev = createSpeculativeExecutionVertex();
+
+        final Execution e1 = ev.getCurrentExecutionAttempt();
+        e1.transitionState(ExecutionState.CANCELED);
+        assertThat(ev.getExecutionState()).isSameAs(ExecutionState.CANCELED);
+
+        // the latter added state is more likely to reach FINISH state
+        final List<ExecutionState> statesSortedByPriority = new ArrayList<>();
+        statesSortedByPriority.add(ExecutionState.FAILED);
+        statesSortedByPriority.add(ExecutionState.CANCELING);
+        statesSortedByPriority.add(ExecutionState.CREATED);
+        statesSortedByPriority.add(ExecutionState.SCHEDULED);
+        statesSortedByPriority.add(ExecutionState.DEPLOYING);
+        statesSortedByPriority.add(ExecutionState.INITIALIZING);
+        statesSortedByPriority.add(ExecutionState.RUNNING);
+        statesSortedByPriority.add(ExecutionState.FINISHED);
+
+        for (ExecutionState state : statesSortedByPriority) {
+            final Execution execution = ev.createNewSpeculativeExecution(0);
+            execution.transitionState(state);
+            assertThat(ev.getExecutionState()).isSameAs(state);
+        }
+    }
+
+    private SpeculativeExecutionVertex createSpeculativeExecutionVertex() throws Exception {
+        final JobVertex jobVertex = ExecutionGraphTestUtils.createNoOpVertex(1);
+        final JobGraph jobGraph = JobGraphTestUtils.batchJobGraph(jobVertex);
+        final ExecutionGraph executionGraph = createExecutionGraph(jobGraph);
+        return (SpeculativeExecutionVertex)
+                executionGraph.getJobVertex(jobVertex.getID()).getTaskVertices()[0];
+    }
+
+    private ExecutionGraph createExecutionGraph(final JobGraph jobGraph) throws Exception {
+        final ExecutionGraph executionGraph =
+                TestingDefaultExecutionGraphBuilder.newBuilder()
+                        .setJobGraph(jobGraph)
+                        .setExecutionJobVertexFactory(new SpeculativeExecutionJobVertex.Factory())
+                        .build(EXECUTOR_RESOURCE.getExecutor());
+
+        executionGraph.setInternalTaskFailuresListener(internalFailuresListener);
+        executionGraph.start(ComponentMainThreadExecutorServiceAdapter.forMainThread());
+
+        return executionGraph;
+    }
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TestingDefaultExecutionGraphBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TestingDefaultExecutionGraphBuilder.java
index 8e5e57ddea0..62933dcee34 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TestingDefaultExecutionGraphBuilder.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TestingDefaultExecutionGraphBuilder.java
@@ -73,6 +73,7 @@ public class TestingDefaultExecutionGraphBuilder {
     private ExecutionStateUpdateListener executionStateUpdateListener =
             (execution, previousState, newState) -> {};
     private VertexParallelismStore vertexParallelismStore;
+    private ExecutionJobVertex.Factory executionJobVertexFactory = new ExecutionJobVertex.Factory();
 
     private TestingDefaultExecutionGraphBuilder() {}
 
@@ -142,6 +143,12 @@ public class TestingDefaultExecutionGraphBuilder {
         return this;
     }
 
+    public TestingDefaultExecutionGraphBuilder setExecutionJobVertexFactory(
+            ExecutionJobVertex.Factory executionJobVertexFactory) {
+        this.executionJobVertexFactory = executionJobVertexFactory;
+        return this;
+    }
+
     private DefaultExecutionGraph build(
             boolean isDynamicGraph, ScheduledExecutorService executorService)
             throws JobException, JobExecutionException {
@@ -168,7 +175,8 @@ public class TestingDefaultExecutionGraphBuilder {
                 Optional.ofNullable(vertexParallelismStore)
                         .orElseGet(() -> SchedulerBase.computeVertexParallelismStore(jobGraph)),
                 () -> new CheckpointStatsTracker(0, new UnregisteredMetricsGroup()),
-                isDynamicGraph);
+                isDynamicGraph,
+                executionJobVertexFactory);
     }
 
     public DefaultExecutionGraph build(ScheduledExecutorService executorService)
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultExecutionDeployerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultExecutionDeployerTest.java
index 96c89093b92..3525fea6eae 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultExecutionDeployerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultExecutionDeployerTest.java
@@ -333,18 +333,7 @@ class DefaultExecutionDeployerTest {
                         .setPartitionTracker(partitionTracker)
                         .build(executor);
 
-        executionGraph.setInternalTaskFailuresListener(
-                new InternalFailuresListener() {
-                    @Override
-                    public void notifyTaskFailure(
-                            ExecutionAttemptID attemptId,
-                            Throwable t,
-                            boolean cancelTask,
-                            boolean releasePartitions) {}
-
-                    @Override
-                    public void notifyGlobalFailure(Throwable t) {}
-                });
+        executionGraph.setInternalTaskFailuresListener(new TestingInternalFailuresListener());
         executionGraph.start(ComponentMainThreadExecutorServiceAdapter.forMainThread());
 
         return executionGraph;
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestingInternalFailuresListener.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestingInternalFailuresListener.java
new file mode 100644
index 00000000000..56a257243ee
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestingInternalFailuresListener.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+/** An {@link InternalFailuresListener} implementation for testing purpose. */
+public class TestingInternalFailuresListener implements InternalFailuresListener {
+
+    private final List<ExecutionAttemptID> failedTasks = new ArrayList<>();
+
+    @Override
+    public void notifyTaskFailure(
+            ExecutionAttemptID attemptId,
+            Throwable t,
+            boolean cancelTask,
+            boolean releasePartitions) {
+        failedTasks.add(attemptId);
+    }
+
+    @Override
+    public void notifyGlobalFailure(Throwable t) {}
+
+    public List<ExecutionAttemptID> getFailedTasks() {
+        return Collections.unmodifiableList(failedTasks);
+    }
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchSchedulerTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchSchedulerTestUtils.java
index 12740adb7cf..d462c726bc4 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchSchedulerTestUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchSchedulerTestUtils.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.scheduler.adaptivebatch;
 
 import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobmaster.DefaultExecutionDeploymentTracker;
 import org.apache.flink.runtime.scheduler.DefaultExecutionGraphFactory;
@@ -72,7 +73,8 @@ public class AdaptiveBatchSchedulerTestUtils {
                             blobWriter,
                             shuffleMaster,
                             partitionTracker,
-                            true);
+                            true,
+                            new ExecutionJobVertex.Factory());
 
             return new AdaptiveBatchScheduler(
                     log,