You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by zh...@apache.org on 2022/07/13 15:14:54 UTC

[flink] branch master updated (85a5e1d199e -> 265612c2cf9)

This is an automated email from the ASF dual-hosted git repository.

zhuzh pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


    from 85a5e1d199e [FLINK-27884][connectors] Move OutputFormatBase to flink-core
     new 331264ccf65 [hotfix] Remove the initialCapacity param from the constructor of DualKeyLinkedMap
     new 51010a100ce [hotfix][tests] Merge DefaultSchedulerBuilder and AdaptiveBatchSchedulerBuilder
     new 81c739ae462 [FLINK-28137][runtime] Introduce SpeculativeScheduler
     new c75a242e94c [hotfix] Migrate SlotSharingExecutionSlotAllocatorTest to JUnit5
     new 265612c2cf9 [FLINK-28137][runtime] Enable SimpleExecutionSlotAllocator to do batch slot request timeout check

The 5 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 ...klistListener.java => BlocklistOperations.java} |  13 +-
 .../RestartPipelinedRegionFailoverStrategy.java    |  12 -
 .../apache/flink/runtime/jobgraph/JobVertex.java   |   7 +-
 .../DefaultSlotPoolServiceSchedulerFactory.java    |   7 +-
 .../apache/flink/runtime/jobmaster/JobMaster.java  |   3 +-
 .../jobmaster/SlotPoolServiceSchedulerFactory.java |   4 +-
 .../slotpool/DeclarativeSlotPoolBridge.java        |   5 +-
 .../jobmaster/slotpool/PhysicalSlotProvider.java   |   6 +
 .../slotpool/PhysicalSlotProviderImpl.java         |   4 +
 .../flink/runtime/scheduler/DefaultScheduler.java  | 137 ++++---
 .../runtime/scheduler/DefaultSchedulerFactory.java |   4 +-
 .../flink/runtime/scheduler/ExecutionDeployer.java |   2 +-
 .../scheduler/ExecutionVertexVersioner.java        |   9 +
 .../flink/runtime/scheduler/SchedulerBase.java     |  50 +--
 .../runtime/scheduler/SchedulerNGFactory.java      |   4 +-
 .../apache/flink/runtime/scheduler/SharedSlot.java |   3 +-
 .../scheduler/SimpleExecutionSlotAllocator.java    | 189 +++++++++
 .../SlotSharingExecutionSlotAllocator.java         |   2 +
 .../adaptive/AdaptiveSchedulerFactory.java         |   4 +-
 .../adaptivebatch/AdaptiveBatchScheduler.java      |  16 +-
 .../AdaptiveBatchSchedulerFactory.java             | 139 ++++---
 .../adaptivebatch/SpeculativeScheduler.java        | 314 +++++++++++++++
 .../flink/runtime/util/DualKeyLinkedMap.java       |   6 +-
 .../DefaultSchedulerCheckpointCoordinatorTest.java |   4 +-
 .../DefaultExecutionGraphDeploymentTest.java       |   7 +-
 .../ExecutionGraphCoLocationRestartTest.java       |   3 +-
 .../executiongraph/ExecutionGraphFinishTest.java   |   4 +-
 .../ExecutionGraphPartitionReleaseTest.java        |   3 +-
 .../executiongraph/ExecutionGraphRestartTest.java  |  15 +-
 .../executiongraph/ExecutionGraphSuspendTest.java  |   5 +-
 .../executiongraph/ExecutionGraphTestUtils.java    |   4 +-
 .../ExecutionPartitionLifecycleTest.java           |   3 +-
 .../runtime/executiongraph/ExecutionTest.java      |   9 +-
 .../executiongraph/ExecutionVertexTest.java        |   5 +-
 .../IntermediateResultPartitionTest.java           |   4 +-
 .../RemoveCachedShuffleDescriptorTest.java         |   4 +-
 .../scheduler/UpdatePartitionConsumersTest.java    |   4 +-
 .../runtime/jobmaster/JobMasterSchedulerTest.java  |   4 +-
 ...erImplWithDefaultSlotSelectionStrategyTest.java |   6 +-
 ...lSlotProviderImplWithSpreadOutStrategyTest.java |   6 +-
 .../slotpool/SlotPoolBatchSlotRequestTest.java     |  28 --
 .../OperatorCoordinatorSchedulerTest.java          |  69 +++-
 .../DefaultSchedulerBatchSchedulingTest.java       |   2 +-
 .../runtime/scheduler/DefaultSchedulerBuilder.java | 362 +++++++++++++++++
 .../runtime/scheduler/DefaultSchedulerTest.java    |  13 +-
 .../runtime/scheduler/SchedulerTestingUtils.java   | 316 ---------------
 .../SimpleExecutionSlotAllocatorTest.java          | 275 +++++++++++++
 .../SlotSharingExecutionSlotAllocatorTest.java     | 202 +++++-----
 .../scheduler/TestingPhysicalSlotProvider.java     |  11 +
 .../scheduler/TestingSchedulerNGFactory.java       |   4 +-
 .../adaptivebatch/AdaptiveBatchSchedulerTest.java  |  16 +-
 .../AdaptiveBatchSchedulerTestUtils.java           | 106 -----
 .../adaptivebatch/SpeculativeSchedulerTest.java    | 426 +++++++++++++++++++++
 .../benchmark/SchedulerBenchmarkUtils.java         |   5 +-
 .../e2e/SchedulerEndToEndBenchmarkBase.java        |   4 +-
 .../ExecutionTimeBasedSlowTaskDetectorTest.java    |   6 +-
 .../flink/runtime/util/DualKeyLinkedMapTest.java   |  16 +-
 57 files changed, 2074 insertions(+), 817 deletions(-)
 copy flink-runtime/src/main/java/org/apache/flink/runtime/blocklist/{BlocklistListener.java => BlocklistOperations.java} (69%)
 create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SimpleExecutionSlotAllocator.java
 create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/SpeculativeScheduler.java
 create mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerBuilder.java
 create mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SimpleExecutionSlotAllocatorTest.java
 delete mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchSchedulerTestUtils.java
 create mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/SpeculativeSchedulerTest.java


[flink] 02/05: [hotfix][tests] Merge DefaultSchedulerBuilder and AdaptiveBatchSchedulerBuilder

Posted by zh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

zhuzh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 51010a100ce7c1403c99bf639ea8fc1aac7ba135
Author: Zhu Zhu <re...@gmail.com>
AuthorDate: Fri Jul 8 20:52:04 2022 +0800

    [hotfix][tests] Merge DefaultSchedulerBuilder and AdaptiveBatchSchedulerBuilder
    
    The extending AdaptiveBatchSchedulerBuilder is in a good shape, which requires hack logics to create AdaptiveBatchScheduler. Merging them can make it simpler to create different schedulers.
---
 .../adaptivebatch/AdaptiveBatchScheduler.java      |   2 +-
 .../DefaultSchedulerCheckpointCoordinatorTest.java |   4 +-
 .../DefaultExecutionGraphDeploymentTest.java       |   7 +-
 .../ExecutionGraphCoLocationRestartTest.java       |   3 +-
 .../executiongraph/ExecutionGraphFinishTest.java   |   4 +-
 .../ExecutionGraphPartitionReleaseTest.java        |   3 +-
 .../executiongraph/ExecutionGraphRestartTest.java  |  15 +-
 .../executiongraph/ExecutionGraphSuspendTest.java  |   5 +-
 .../executiongraph/ExecutionGraphTestUtils.java    |   4 +-
 .../ExecutionPartitionLifecycleTest.java           |   3 +-
 .../runtime/executiongraph/ExecutionTest.java      |   9 +-
 .../executiongraph/ExecutionVertexTest.java        |   5 +-
 .../IntermediateResultPartitionTest.java           |   4 +-
 .../RemoveCachedShuffleDescriptorTest.java         |   4 +-
 .../scheduler/UpdatePartitionConsumersTest.java    |   4 +-
 .../OperatorCoordinatorSchedulerTest.java          |  69 ++++-
 .../DefaultSchedulerBatchSchedulingTest.java       |   2 +-
 .../runtime/scheduler/DefaultSchedulerBuilder.java | 319 +++++++++++++++++++++
 .../runtime/scheduler/DefaultSchedulerTest.java    |   9 +-
 .../runtime/scheduler/SchedulerTestingUtils.java   | 316 --------------------
 .../adaptivebatch/AdaptiveBatchSchedulerTest.java  |  16 +-
 .../AdaptiveBatchSchedulerTestUtils.java           | 106 -------
 .../benchmark/SchedulerBenchmarkUtils.java         |   5 +-
 .../e2e/SchedulerEndToEndBenchmarkBase.java        |   4 +-
 .../ExecutionTimeBasedSlowTaskDetectorTest.java    |   6 +-
 25 files changed, 445 insertions(+), 483 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchScheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchScheduler.java
index 6507774b3e6..f744d68f28b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchScheduler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchScheduler.java
@@ -80,7 +80,7 @@ public class AdaptiveBatchScheduler extends DefaultScheduler implements Schedule
 
     private final Map<JobVertexID, ForwardGroup> forwardGroupsByJobVertexId;
 
-    AdaptiveBatchScheduler(
+    public AdaptiveBatchScheduler(
             final Logger log,
             final JobGraph jobGraph,
             final Executor ioExecutor,
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/DefaultSchedulerCheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/DefaultSchedulerCheckpointCoordinatorTest.java
index 1fe91122849..5b746502ca7 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/DefaultSchedulerCheckpointCoordinatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/DefaultSchedulerCheckpointCoordinatorTest.java
@@ -32,8 +32,8 @@ import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
 import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
 import org.apache.flink.runtime.scheduler.DefaultScheduler;
+import org.apache.flink.runtime.scheduler.DefaultSchedulerBuilder;
 import org.apache.flink.runtime.scheduler.SchedulerBase;
-import org.apache.flink.runtime.scheduler.SchedulerTestingUtils;
 import org.apache.flink.runtime.taskmanager.TaskExecutionState;
 import org.apache.flink.testutils.TestingUtils;
 import org.apache.flink.testutils.executor.TestExecutorResource;
@@ -208,7 +208,7 @@ public class DefaultSchedulerCheckpointCoordinatorTest extends TestLogger {
                         .setJobCheckpointingSettings(checkpointingSettings)
                         .build();
 
-        return new SchedulerTestingUtils.DefaultSchedulerBuilder(
+        return new DefaultSchedulerBuilder(
                         jobGraph,
                         ComponentMainThreadExecutorServiceAdapter.forMainThread(),
                         EXECUTOR_RESOURCE.getExecutor())
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphDeploymentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphDeploymentTest.java
index a2abb0b5901..05237422a71 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphDeploymentTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphDeploymentTest.java
@@ -50,6 +50,7 @@ import org.apache.flink.runtime.jobmaster.RpcTaskManagerGateway;
 import org.apache.flink.runtime.jobmaster.TestingLogicalSlotBuilder;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.operators.BatchTask;
+import org.apache.flink.runtime.scheduler.DefaultSchedulerBuilder;
 import org.apache.flink.runtime.scheduler.SchedulerBase;
 import org.apache.flink.runtime.scheduler.SchedulerNG;
 import org.apache.flink.runtime.scheduler.SchedulerTestingUtils;
@@ -477,7 +478,7 @@ public class DefaultExecutionGraphDeploymentTest extends TestLogger {
 
         // execution graph that executes actions synchronously
         final SchedulerBase scheduler =
-                new SchedulerTestingUtils.DefaultSchedulerBuilder(
+                new DefaultSchedulerBuilder(
                                 graph,
                                 ComponentMainThreadExecutorServiceAdapter.forMainThread(),
                                 EXECUTOR_RESOURCE.getExecutor())
@@ -538,7 +539,7 @@ public class DefaultExecutionGraphDeploymentTest extends TestLogger {
 
         // execution graph that executes actions synchronously
         final SchedulerBase scheduler =
-                new SchedulerTestingUtils.DefaultSchedulerBuilder(
+                new DefaultSchedulerBuilder(
                                 JobGraphTestUtils.streamingJobGraph(v1, v2),
                                 ComponentMainThreadExecutorServiceAdapter.forMainThread(),
                                 EXECUTOR_RESOURCE.getExecutor())
@@ -624,7 +625,7 @@ public class DefaultExecutionGraphDeploymentTest extends TestLogger {
         final TestingPhysicalSlotProvider physicalSlotProvider =
                 TestingPhysicalSlotProvider.createWithoutImmediatePhysicalSlotCreation();
         final SchedulerBase scheduler =
-                new SchedulerTestingUtils.DefaultSchedulerBuilder(
+                new DefaultSchedulerBuilder(
                                 jobGraph,
                                 ComponentMainThreadExecutorServiceAdapter.forMainThread(),
                                 EXECUTOR_RESOURCE.getExecutor())
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphCoLocationRestartTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphCoLocationRestartTest.java
index d7a9d89557b..96b23b8dba5 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphCoLocationRestartTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphCoLocationRestartTest.java
@@ -29,6 +29,7 @@ import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
+import org.apache.flink.runtime.scheduler.DefaultSchedulerBuilder;
 import org.apache.flink.runtime.scheduler.SchedulerBase;
 import org.apache.flink.runtime.scheduler.SchedulerTestingUtils;
 import org.apache.flink.runtime.scheduler.TestingPhysicalSlot;
@@ -80,7 +81,7 @@ public class ExecutionGraphCoLocationRestartTest {
         final ManuallyTriggeredScheduledExecutorService delayExecutor =
                 new ManuallyTriggeredScheduledExecutorService();
         final SchedulerBase scheduler =
-                new SchedulerTestingUtils.DefaultSchedulerBuilder(
+                new DefaultSchedulerBuilder(
                                 jobGraph,
                                 ComponentMainThreadExecutorServiceAdapter.forMainThread(),
                                 EXECUTOR_RESOURCE.getExecutor())
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphFinishTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphFinishTest.java
index d23b13cc59f..f8959481fd3 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphFinishTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphFinishTest.java
@@ -22,8 +22,8 @@ import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
+import org.apache.flink.runtime.scheduler.DefaultSchedulerBuilder;
 import org.apache.flink.runtime.scheduler.SchedulerBase;
-import org.apache.flink.runtime.scheduler.SchedulerTestingUtils;
 import org.apache.flink.runtime.testtasks.NoOpInvokable;
 import org.apache.flink.testutils.TestingUtils;
 import org.apache.flink.testutils.executor.TestExecutorResource;
@@ -55,7 +55,7 @@ public class ExecutionGraphFinishTest extends TestLogger {
                         ExecutionGraphTestUtils.createJobVertex("Task2", 2, NoOpInvokable.class));
 
         SchedulerBase scheduler =
-                new SchedulerTestingUtils.DefaultSchedulerBuilder(
+                new DefaultSchedulerBuilder(
                                 jobGraph,
                                 ComponentMainThreadExecutorServiceAdapter.forMainThread(),
                                 EXECUTOR_RESOURCE.getExecutor())
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphPartitionReleaseTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphPartitionReleaseTest.java
index 0ca4f106f95..29711bc9956 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphPartitionReleaseTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphPartitionReleaseTest.java
@@ -28,6 +28,7 @@ import org.apache.flink.runtime.jobgraph.DistributionPattern;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
 import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.scheduler.DefaultSchedulerBuilder;
 import org.apache.flink.runtime.scheduler.SchedulerBase;
 import org.apache.flink.runtime.scheduler.SchedulerTestingUtils;
 import org.apache.flink.runtime.taskmanager.TaskExecutionState;
@@ -251,7 +252,7 @@ public class ExecutionGraphPartitionReleaseTest extends TestLogger {
 
         final JobGraph jobGraph = JobGraphTestUtils.batchJobGraph(vertices);
         final SchedulerBase scheduler =
-                new SchedulerTestingUtils.DefaultSchedulerBuilder(
+                new DefaultSchedulerBuilder(
                                 jobGraph,
                                 mainThreadExecutor.getMainThreadExecutor(),
                                 EXECUTOR_RESOURCE.getExecutor())
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
index facd4be2fbe..a08d5444083 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
@@ -40,6 +40,7 @@ import org.apache.flink.runtime.jobmaster.slotpool.SlotPool;
 import org.apache.flink.runtime.jobmaster.slotpool.SlotPoolUtils;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
 import org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway;
+import org.apache.flink.runtime.scheduler.DefaultSchedulerBuilder;
 import org.apache.flink.runtime.scheduler.ExecutionSlotAllocatorFactory;
 import org.apache.flink.runtime.scheduler.SchedulerBase;
 import org.apache.flink.runtime.scheduler.SchedulerTestingUtils;
@@ -107,7 +108,7 @@ public class ExecutionGraphRestartTest extends TestLogger {
                             "Task", NUM_TASKS + numTasksExceedSlotPool, NoOpInvokable.class);
             JobGraph graph = JobGraphTestUtils.streamingJobGraph(sender);
             SchedulerBase scheduler =
-                    new SchedulerTestingUtils.DefaultSchedulerBuilder(
+                    new DefaultSchedulerBuilder(
                                     graph, mainThreadExecutor, EXECUTOR_RESOURCE.getExecutor())
                             .setExecutionSlotAllocatorFactory(
                                     createExecutionSlotAllocatorFactory(slotPool))
@@ -136,7 +137,7 @@ public class ExecutionGraphRestartTest extends TestLogger {
                             "Task", NUM_TASKS + numTasksExceedSlotPool, NoOpInvokable.class);
             JobGraph graph = JobGraphTestUtils.streamingJobGraph(sender);
             SchedulerBase scheduler =
-                    new SchedulerTestingUtils.DefaultSchedulerBuilder(
+                    new DefaultSchedulerBuilder(
                                     graph, mainThreadExecutor, EXECUTOR_RESOURCE.getExecutor())
                             .setExecutionSlotAllocatorFactory(
                                     createExecutionSlotAllocatorFactory(slotPool))
@@ -159,7 +160,7 @@ public class ExecutionGraphRestartTest extends TestLogger {
         // We want to manually control the restart and delay
         try (SlotPool slotPool = SlotPoolUtils.createDeclarativeSlotPoolBridge()) {
             SchedulerBase scheduler =
-                    new SchedulerTestingUtils.DefaultSchedulerBuilder(
+                    new DefaultSchedulerBuilder(
                                     createJobGraph(),
                                     mainThreadExecutor,
                                     EXECUTOR_RESOURCE.getExecutor())
@@ -202,7 +203,7 @@ public class ExecutionGraphRestartTest extends TestLogger {
     public void testCancelWhileFailing() throws Exception {
         try (SlotPool slotPool = SlotPoolUtils.createDeclarativeSlotPoolBridge()) {
             SchedulerBase scheduler =
-                    new SchedulerTestingUtils.DefaultSchedulerBuilder(
+                    new DefaultSchedulerBuilder(
                                     createJobGraph(),
                                     mainThreadExecutor,
                                     EXECUTOR_RESOURCE.getExecutor())
@@ -240,7 +241,7 @@ public class ExecutionGraphRestartTest extends TestLogger {
     public void testFailWhileCanceling() throws Exception {
         try (SlotPool slotPool = SlotPoolUtils.createDeclarativeSlotPoolBridge()) {
             SchedulerBase scheduler =
-                    new SchedulerTestingUtils.DefaultSchedulerBuilder(
+                    new DefaultSchedulerBuilder(
                                     createJobGraph(),
                                     mainThreadExecutor,
                                     EXECUTOR_RESOURCE.getExecutor())
@@ -291,7 +292,7 @@ public class ExecutionGraphRestartTest extends TestLogger {
 
         try (SlotPool slotPool = SlotPoolUtils.createDeclarativeSlotPoolBridge()) {
             SchedulerBase scheduler =
-                    new SchedulerTestingUtils.DefaultSchedulerBuilder(
+                    new DefaultSchedulerBuilder(
                                     jobGraph, mainThreadExecutor, EXECUTOR_RESOURCE.getExecutor())
                             .setExecutionSlotAllocatorFactory(
                                     createExecutionSlotAllocatorFactory(slotPool))
@@ -351,7 +352,7 @@ public class ExecutionGraphRestartTest extends TestLogger {
     public void testFailExecutionAfterCancel() throws Exception {
         try (SlotPool slotPool = SlotPoolUtils.createDeclarativeSlotPoolBridge()) {
             SchedulerBase scheduler =
-                    new SchedulerTestingUtils.DefaultSchedulerBuilder(
+                    new DefaultSchedulerBuilder(
                                     createJobGraphToCancel(),
                                     mainThreadExecutor,
                                     EXECUTOR_RESOURCE.getExecutor())
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSuspendTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSuspendTest.java
index 5804f782ff9..3277cf3dbf5 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSuspendTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSuspendTest.java
@@ -25,6 +25,7 @@ import org.apache.flink.runtime.executiongraph.failover.flip1.TestRestartBackoff
 import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
+import org.apache.flink.runtime.scheduler.DefaultSchedulerBuilder;
 import org.apache.flink.runtime.scheduler.SchedulerBase;
 import org.apache.flink.runtime.scheduler.SchedulerTestingUtils;
 import org.apache.flink.runtime.scheduler.TestingPhysicalSlotProvider;
@@ -237,7 +238,7 @@ public class ExecutionGraphSuspendTest extends TestLogger {
         final ManuallyTriggeredScheduledExecutor taskRestartExecutor =
                 new ManuallyTriggeredScheduledExecutor();
         final SchedulerBase scheduler =
-                new SchedulerTestingUtils.DefaultSchedulerBuilder(
+                new DefaultSchedulerBuilder(
                                 JobGraphTestUtils.emptyJobGraph(),
                                 ComponentMainThreadExecutorServiceAdapter.forMainThread(),
                                 EXECUTOR_RESOURCE.getExecutor())
@@ -318,7 +319,7 @@ public class ExecutionGraphSuspendTest extends TestLogger {
         vertex.setParallelism(parallelism);
 
         final SchedulerBase scheduler =
-                new SchedulerTestingUtils.DefaultSchedulerBuilder(
+                new DefaultSchedulerBuilder(
                                 JobGraphTestUtils.streamingJobGraph(vertex),
                                 ComponentMainThreadExecutorServiceAdapter.forMainThread(),
                                 EXECUTOR_RESOURCE.getExecutor())
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
index 10ff1baa915..ff2baf807e1 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
@@ -31,8 +31,8 @@ import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
 import org.apache.flink.runtime.jobmaster.LogicalSlot;
+import org.apache.flink.runtime.scheduler.DefaultSchedulerBuilder;
 import org.apache.flink.runtime.scheduler.SchedulerBase;
-import org.apache.flink.runtime.scheduler.SchedulerTestingUtils;
 import org.apache.flink.runtime.scheduler.strategy.ConsumedPartitionGroup;
 import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
 import org.apache.flink.runtime.testtasks.NoOpInvokable;
@@ -391,7 +391,7 @@ public class ExecutionGraphTestUtils {
         JobGraph jobGraph = JobGraphTestUtils.batchJobGraph(jobVertex);
 
         SchedulerBase scheduler =
-                new SchedulerTestingUtils.DefaultSchedulerBuilder(
+                new DefaultSchedulerBuilder(
                                 jobGraph,
                                 ComponentMainThreadExecutorServiceAdapter.forMainThread(),
                                 executor)
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionPartitionLifecycleTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionPartitionLifecycleTest.java
index df765c69cd7..6ad9d321b96 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionPartitionLifecycleTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionPartitionLifecycleTest.java
@@ -36,6 +36,7 @@ import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
+import org.apache.flink.runtime.scheduler.DefaultSchedulerBuilder;
 import org.apache.flink.runtime.scheduler.SchedulerBase;
 import org.apache.flink.runtime.scheduler.SchedulerTestingUtils;
 import org.apache.flink.runtime.scheduler.TestingPhysicalSlot;
@@ -279,7 +280,7 @@ public class ExecutionPartitionLifecycleTest extends TestLogger {
 
         final JobGraph jobGraph = JobGraphTestUtils.batchJobGraph(producerVertex, consumerVertex);
         final SchedulerBase scheduler =
-                new SchedulerTestingUtils.DefaultSchedulerBuilder(
+                new DefaultSchedulerBuilder(
                                 jobGraph,
                                 ComponentMainThreadExecutorServiceAdapter.forMainThread(),
                                 EXECUTOR_RESOURCE.getExecutor())
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java
index 9f11c727632..0c991f55ea3 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java
@@ -26,6 +26,7 @@ import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlot;
+import org.apache.flink.runtime.scheduler.DefaultSchedulerBuilder;
 import org.apache.flink.runtime.scheduler.SchedulerBase;
 import org.apache.flink.runtime.scheduler.SchedulerTestingUtils;
 import org.apache.flink.runtime.scheduler.TestingPhysicalSlot;
@@ -79,7 +80,7 @@ public class ExecutionTest extends TestLogger {
         final TestingPhysicalSlotProvider physicalSlotProvider =
                 TestingPhysicalSlotProvider.createWithLimitedAmountOfPhysicalSlots(1);
         final SchedulerBase scheduler =
-                new SchedulerTestingUtils.DefaultSchedulerBuilder(
+                new DefaultSchedulerBuilder(
                                 JobGraphTestUtils.streamingJobGraph(jobVertex),
                                 ComponentMainThreadExecutorServiceAdapter.forMainThread(),
                                 EXECUTOR_RESOURCE.getExecutor())
@@ -123,7 +124,7 @@ public class ExecutionTest extends TestLogger {
         final JobVertexID jobVertexId = jobVertex.getID();
 
         final SchedulerBase scheduler =
-                new SchedulerTestingUtils.DefaultSchedulerBuilder(
+                new DefaultSchedulerBuilder(
                                 JobGraphTestUtils.streamingJobGraph(jobVertex),
                                 ComponentMainThreadExecutorServiceAdapter.forMainThread(),
                                 EXECUTOR_RESOURCE.getExecutor())
@@ -168,7 +169,7 @@ public class ExecutionTest extends TestLogger {
                                                 .withTaskManagerGateway(taskManagerGateway)
                                                 .build()));
         final SchedulerBase scheduler =
-                new SchedulerTestingUtils.DefaultSchedulerBuilder(
+                new DefaultSchedulerBuilder(
                                 JobGraphTestUtils.streamingJobGraph(jobVertex),
                                 testMainThreadUtil.getMainThreadExecutor(),
                                 EXECUTOR_RESOURCE.getExecutor())
@@ -208,7 +209,7 @@ public class ExecutionTest extends TestLogger {
         final TestingPhysicalSlotProvider physicalSlotProvider =
                 TestingPhysicalSlotProvider.createWithLimitedAmountOfPhysicalSlots(1);
         final SchedulerBase scheduler =
-                new SchedulerTestingUtils.DefaultSchedulerBuilder(
+                new DefaultSchedulerBuilder(
                                 JobGraphTestUtils.streamingJobGraph(jobVertex),
                                 testMainThreadUtil.getMainThreadExecutor(),
                                 EXECUTOR_RESOURCE.getExecutor())
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexTest.java
index dd87d04abcb..d5f6f4f13e9 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexTest.java
@@ -30,6 +30,7 @@ import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
 import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.scheduler.DefaultSchedulerBuilder;
 import org.apache.flink.runtime.scheduler.SchedulerBase;
 import org.apache.flink.runtime.scheduler.SchedulerTestingUtils;
 import org.apache.flink.runtime.scheduler.TestingPhysicalSlot;
@@ -74,7 +75,7 @@ public class ExecutionVertexTest extends TestLogger {
         final JobGraph jobGraph =
                 JobGraphTestUtils.streamingJobGraph(producerJobVertex, consumerJobVertex);
         final SchedulerBase scheduler =
-                new SchedulerTestingUtils.DefaultSchedulerBuilder(
+                new DefaultSchedulerBuilder(
                                 jobGraph,
                                 ComponentMainThreadExecutorServiceAdapter.forMainThread(),
                                 EXECUTOR_RESOURCE.getExecutor())
@@ -122,7 +123,7 @@ public class ExecutionVertexTest extends TestLogger {
         // make sure that retrieving the last (al)location is independent from the history size
         configuration.set(JobManagerOptions.MAX_ATTEMPTS_HISTORY_SIZE, 1);
         final SchedulerBase scheduler =
-                new SchedulerTestingUtils.DefaultSchedulerBuilder(
+                new DefaultSchedulerBuilder(
                                 jobGraph,
                                 ComponentMainThreadExecutorServiceAdapter.forMainThread(),
                                 EXECUTOR_RESOURCE.getExecutor())
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartitionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartitionTest.java
index e5470cb99cd..456de025668 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartitionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartitionTest.java
@@ -26,8 +26,8 @@ import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobGraphBuilder;
 import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
 import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.scheduler.DefaultSchedulerBuilder;
 import org.apache.flink.runtime.scheduler.SchedulerBase;
-import org.apache.flink.runtime.scheduler.SchedulerTestingUtils;
 import org.apache.flink.runtime.scheduler.VertexParallelismStore;
 import org.apache.flink.runtime.scheduler.adaptivebatch.AdaptiveBatchScheduler;
 import org.apache.flink.runtime.scheduler.strategy.ConsumedPartitionGroup;
@@ -304,7 +304,7 @@ public class IntermediateResultPartitionTest extends TestLogger {
         JobGraph jobGraph = JobGraphTestUtils.batchJobGraph(source, sink);
 
         SchedulerBase scheduler =
-                new SchedulerTestingUtils.DefaultSchedulerBuilder(
+                new DefaultSchedulerBuilder(
                                 jobGraph,
                                 ComponentMainThreadExecutorServiceAdapter.forMainThread(),
                                 executorService)
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/RemoveCachedShuffleDescriptorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/RemoveCachedShuffleDescriptorTest.java
index 2ba942dcecc..73dd9cdeb3a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/RemoveCachedShuffleDescriptorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/RemoveCachedShuffleDescriptorTest.java
@@ -39,7 +39,7 @@ import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobmaster.LogicalSlot;
 import org.apache.flink.runtime.jobmaster.TestingLogicalSlotBuilder;
 import org.apache.flink.runtime.scheduler.DefaultScheduler;
-import org.apache.flink.runtime.scheduler.SchedulerTestingUtils;
+import org.apache.flink.runtime.scheduler.DefaultSchedulerBuilder;
 import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
 import org.apache.flink.runtime.taskmanager.TaskExecutionState;
 import org.apache.flink.testutils.TestingUtils;
@@ -393,7 +393,7 @@ public class RemoveCachedShuffleDescriptorTest extends TestLogger {
                         .addJobVertices(jobVertices)
                         .build();
 
-        return new SchedulerTestingUtils.DefaultSchedulerBuilder(
+        return new DefaultSchedulerBuilder(
                         jobGraph, mainThreadExecutor, EXECUTOR_RESOURCE.getExecutor())
                 .setRestartBackoffTimeStrategy(new TestRestartBackoffTimeStrategy(true, 0))
                 .setBlobWriter(blobWriter)
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/UpdatePartitionConsumersTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/UpdatePartitionConsumersTest.java
index b1b46dee1a0..65e7975105d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/UpdatePartitionConsumersTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/UpdatePartitionConsumersTest.java
@@ -31,8 +31,8 @@ import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.scheduler.DefaultSchedulerBuilder;
 import org.apache.flink.runtime.scheduler.SchedulerBase;
-import org.apache.flink.runtime.scheduler.SchedulerTestingUtils;
 import org.apache.flink.runtime.scheduler.TestExecutionSlotAllocatorFactory;
 import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
 import org.apache.flink.runtime.shuffle.NettyShuffleDescriptor;
@@ -132,7 +132,7 @@ public class UpdatePartitionConsumersTest extends TestLogger {
                 new SimpleAckingTaskManagerGateway();
 
         final SchedulerBase scheduler =
-                new SchedulerTestingUtils.DefaultSchedulerBuilder(
+                new DefaultSchedulerBuilder(
                                 jobGraph,
                                 ComponentMainThreadExecutorServiceAdapter.forMainThread(),
                                 EXECUTOR_RESOURCE.getExecutor())
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorSchedulerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorSchedulerTest.java
index e8022c97ada..0bc4aba7187 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorSchedulerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorSchedulerTest.java
@@ -37,17 +37,23 @@ import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
 import org.apache.flink.runtime.executiongraph.failover.flip1.RestartAllFailoverStrategy;
+import org.apache.flink.runtime.executiongraph.failover.flip1.TestRestartBackoffTimeStrategy;
+import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobGraphBuilder;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
+import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
 import org.apache.flink.runtime.scheduler.DefaultScheduler;
+import org.apache.flink.runtime.scheduler.DefaultSchedulerBuilder;
 import org.apache.flink.runtime.scheduler.SchedulerTestingUtils;
+import org.apache.flink.runtime.scheduler.TestExecutionSlotAllocatorFactory;
 import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.scheduler.strategy.PipelinedRegionSchedulingStrategy;
 import org.apache.flink.runtime.state.CheckpointableKeyedStateBackend;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.KeyedStateHandle;
@@ -694,11 +700,11 @@ public class OperatorCoordinatorSchedulerTest extends TestLogger {
                 new ComponentMainThreadExecutorServiceAdapter(
                         (ScheduledExecutorService) executor, Thread.currentThread());
 
-        final SchedulerTestingUtils.DefaultSchedulerBuilder schedulerBuilder =
+        final DefaultSchedulerBuilder schedulerBuilder =
                 taskExecutorOperatorEventGateway == null
-                        ? SchedulerTestingUtils.createSchedulerBuilder(
+                        ? createSchedulerBuilder(
                                 jobGraph, mainThreadExecutor, EXECUTOR_RESOURCE.getExecutor())
-                        : SchedulerTestingUtils.createSchedulerBuilder(
+                        : createSchedulerBuilder(
                                 jobGraph,
                                 mainThreadExecutor,
                                 taskExecutorOperatorEventGateway,
@@ -714,6 +720,46 @@ public class OperatorCoordinatorSchedulerTest extends TestLogger {
         return scheduler;
     }
 
+    private static DefaultSchedulerBuilder createSchedulerBuilder(
+            JobGraph jobGraph,
+            ComponentMainThreadExecutor mainThreadExecutor,
+            ScheduledExecutorService scheduledExecutorService) {
+
+        return createSchedulerBuilder(
+                jobGraph,
+                mainThreadExecutor,
+                new SimpleAckingTaskManagerGateway(),
+                scheduledExecutorService);
+    }
+
+    private static DefaultSchedulerBuilder createSchedulerBuilder(
+            JobGraph jobGraph,
+            ComponentMainThreadExecutor mainThreadExecutor,
+            TaskExecutorOperatorEventGateway operatorEventGateway,
+            ScheduledExecutorService scheduledExecutorService) {
+
+        final TaskManagerGateway gateway =
+                operatorEventGateway instanceof TaskManagerGateway
+                        ? (TaskManagerGateway) operatorEventGateway
+                        : new TaskExecutorOperatorEventGatewayAdapter(operatorEventGateway);
+
+        return createSchedulerBuilder(
+                jobGraph, mainThreadExecutor, gateway, scheduledExecutorService);
+    }
+
+    private static DefaultSchedulerBuilder createSchedulerBuilder(
+            JobGraph jobGraph,
+            ComponentMainThreadExecutor mainThreadExecutor,
+            TaskManagerGateway taskManagerGateway,
+            ScheduledExecutorService executorService) {
+
+        return new DefaultSchedulerBuilder(jobGraph, mainThreadExecutor, executorService)
+                .setSchedulingStrategyFactory(new PipelinedRegionSchedulingStrategy.Factory())
+                .setRestartBackoffTimeStrategy(new TestRestartBackoffTimeStrategy(true, 0))
+                .setExecutionSlotAllocatorFactory(
+                        new TestExecutionSlotAllocatorFactory(taskManagerGateway));
+    }
+
     private void scheduleAllTasksToRunning(DefaultScheduler scheduler) {
         scheduler.startScheduling();
         executor.triggerAll();
@@ -1014,4 +1060,21 @@ public class OperatorCoordinatorSchedulerTest extends TestLogger {
             throw new UnsupportedOperationException();
         }
     }
+
+    private static final class TaskExecutorOperatorEventGatewayAdapter
+            extends SimpleAckingTaskManagerGateway {
+
+        private final TaskExecutorOperatorEventGateway operatorGateway;
+
+        private TaskExecutorOperatorEventGatewayAdapter(
+                TaskExecutorOperatorEventGateway operatorGateway) {
+            this.operatorGateway = operatorGateway;
+        }
+
+        @Override
+        public CompletableFuture<Acknowledge> sendOperatorEventToTask(
+                ExecutionAttemptID task, OperatorID operator, SerializedValue<OperatorEvent> evt) {
+            return operatorGateway.sendOperatorEventToTask(task, operator, evt);
+        }
+    }
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerBatchSchedulingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerBatchSchedulingTest.java
index 2b72068abc7..442b0adb9b1 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerBatchSchedulingTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerBatchSchedulingTest.java
@@ -205,7 +205,7 @@ public class DefaultSchedulerBatchSchedulingTest extends TestLogger {
             Time slotRequestTimeout,
             JobStatusListener jobStatusListener)
             throws Exception {
-        return new SchedulerTestingUtils.DefaultSchedulerBuilder(
+        return new DefaultSchedulerBuilder(
                         jobGraph, mainThreadExecutor, EXECUTOR_RESOURCE.getExecutor())
                 .setExecutionSlotAllocatorFactory(
                         SchedulerTestingUtils.newSlotSharingExecutionSlotAllocatorFactory(
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerBuilder.java
new file mode 100644
index 00000000000..4c99e8ce5ec
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerBuilder.java
@@ -0,0 +1,319 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.runtime.blob.BlobWriter;
+import org.apache.flink.runtime.blob.VoidBlobWriter;
+import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
+import org.apache.flink.runtime.checkpoint.CheckpointsCleaner;
+import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.JobStatusListener;
+import org.apache.flink.runtime.executiongraph.failover.flip1.FailoverStrategy;
+import org.apache.flink.runtime.executiongraph.failover.flip1.NoRestartBackoffTimeStrategy;
+import org.apache.flink.runtime.executiongraph.failover.flip1.RestartBackoffTimeStrategy;
+import org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy;
+import org.apache.flink.runtime.io.network.partition.JobMasterPartitionTracker;
+import org.apache.flink.runtime.io.network.partition.NoOpJobMasterPartitionTracker;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobmaster.DefaultExecutionDeploymentTracker;
+import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
+import org.apache.flink.runtime.scheduler.adaptivebatch.AdaptiveBatchScheduler;
+import org.apache.flink.runtime.scheduler.adaptivebatch.VertexParallelismDecider;
+import org.apache.flink.runtime.scheduler.strategy.PipelinedRegionSchedulingStrategy;
+import org.apache.flink.runtime.scheduler.strategy.SchedulingStrategyFactory;
+import org.apache.flink.runtime.scheduler.strategy.VertexwiseSchedulingStrategy;
+import org.apache.flink.runtime.shuffle.ShuffleMaster;
+import org.apache.flink.runtime.shuffle.ShuffleTestUtils;
+import org.apache.flink.util.concurrent.ScheduledExecutor;
+import org.apache.flink.util.concurrent.ScheduledExecutorServiceAdapter;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.Executor;
+import java.util.concurrent.ScheduledExecutorService;
+
+import static org.apache.flink.runtime.scheduler.SchedulerBase.computeVertexParallelismStore;
+
+/** A builder to create {@link DefaultScheduler} instances for testing. */
+public class DefaultSchedulerBuilder {
+    private static final Logger LOG = LoggerFactory.getLogger(DefaultSchedulerBuilder.class);
+
+    private final JobGraph jobGraph;
+    private final ComponentMainThreadExecutor mainThreadExecutor;
+
+    private Executor ioExecutor;
+    private ScheduledExecutorService futureExecutor;
+    private ScheduledExecutor delayExecutor;
+    private Logger log = LOG;
+    private Configuration jobMasterConfiguration = new Configuration();
+    private ClassLoader userCodeLoader = ClassLoader.getSystemClassLoader();
+    private CheckpointsCleaner checkpointCleaner = new CheckpointsCleaner();
+    private CheckpointRecoveryFactory checkpointRecoveryFactory =
+            new StandaloneCheckpointRecoveryFactory();
+    private Time rpcTimeout = Time.seconds(300);
+    private BlobWriter blobWriter = VoidBlobWriter.getInstance();
+    private JobManagerJobMetricGroup jobManagerJobMetricGroup =
+            UnregisteredMetricGroups.createUnregisteredJobManagerJobMetricGroup();
+    private ShuffleMaster<?> shuffleMaster = ShuffleTestUtils.DEFAULT_SHUFFLE_MASTER;
+    private JobMasterPartitionTracker partitionTracker = NoOpJobMasterPartitionTracker.INSTANCE;
+    private SchedulingStrategyFactory schedulingStrategyFactory =
+            new PipelinedRegionSchedulingStrategy.Factory();
+    private FailoverStrategy.Factory failoverStrategyFactory =
+            new RestartPipelinedRegionFailoverStrategy.Factory();
+    private RestartBackoffTimeStrategy restartBackoffTimeStrategy =
+            NoRestartBackoffTimeStrategy.INSTANCE;
+    private ExecutionOperations executionOperations = new DefaultExecutionOperations();
+    private ExecutionVertexVersioner executionVertexVersioner = new ExecutionVertexVersioner();
+    private ExecutionSlotAllocatorFactory executionSlotAllocatorFactory =
+            new TestExecutionSlotAllocatorFactory();
+    private JobStatusListener jobStatusListener = (ignoredA, ignoredB, ignoredC) -> {};
+    private ExecutionDeployer.Factory executionDeployerFactory =
+            new DefaultExecutionDeployer.Factory();
+    private VertexParallelismDecider vertexParallelismDecider = (ignored) -> 0;
+    private int defaultMaxParallelism =
+            JobManagerOptions.ADAPTIVE_BATCH_SCHEDULER_MAX_PARALLELISM.defaultValue();
+
+    public DefaultSchedulerBuilder(
+            JobGraph jobGraph,
+            ComponentMainThreadExecutor mainThreadExecutor,
+            ScheduledExecutorService generalExecutorService) {
+        this(
+                jobGraph,
+                mainThreadExecutor,
+                generalExecutorService,
+                generalExecutorService,
+                new ScheduledExecutorServiceAdapter(generalExecutorService));
+    }
+
+    public DefaultSchedulerBuilder(
+            JobGraph jobGraph,
+            ComponentMainThreadExecutor mainThreadExecutor,
+            Executor ioExecutor,
+            ScheduledExecutorService futureExecutor,
+            ScheduledExecutor delayExecutor) {
+        this.jobGraph = jobGraph;
+        this.mainThreadExecutor = mainThreadExecutor;
+        this.ioExecutor = ioExecutor;
+        this.futureExecutor = futureExecutor;
+        this.delayExecutor = delayExecutor;
+    }
+
+    public DefaultSchedulerBuilder setIoExecutor(Executor ioExecutor) {
+        this.ioExecutor = ioExecutor;
+        return this;
+    }
+
+    public DefaultSchedulerBuilder setFutureExecutor(ScheduledExecutorService futureExecutor) {
+        this.futureExecutor = futureExecutor;
+        return this;
+    }
+
+    public DefaultSchedulerBuilder setDelayExecutor(ScheduledExecutor delayExecutor) {
+        this.delayExecutor = delayExecutor;
+        return this;
+    }
+
+    public DefaultSchedulerBuilder setLogger(Logger log) {
+        this.log = log;
+        return this;
+    }
+
+    public DefaultSchedulerBuilder setJobMasterConfiguration(Configuration jobMasterConfiguration) {
+        this.jobMasterConfiguration = jobMasterConfiguration;
+        return this;
+    }
+
+    public DefaultSchedulerBuilder setUserCodeLoader(ClassLoader userCodeLoader) {
+        this.userCodeLoader = userCodeLoader;
+        return this;
+    }
+
+    public DefaultSchedulerBuilder setCheckpointCleaner(CheckpointsCleaner checkpointsCleaner) {
+        this.checkpointCleaner = checkpointsCleaner;
+        return this;
+    }
+
+    public DefaultSchedulerBuilder setCheckpointRecoveryFactory(
+            CheckpointRecoveryFactory checkpointRecoveryFactory) {
+        this.checkpointRecoveryFactory = checkpointRecoveryFactory;
+        return this;
+    }
+
+    public DefaultSchedulerBuilder setRpcTimeout(Time rpcTimeout) {
+        this.rpcTimeout = rpcTimeout;
+        return this;
+    }
+
+    public DefaultSchedulerBuilder setBlobWriter(BlobWriter blobWriter) {
+        this.blobWriter = blobWriter;
+        return this;
+    }
+
+    public DefaultSchedulerBuilder setJobManagerJobMetricGroup(
+            JobManagerJobMetricGroup jobManagerJobMetricGroup) {
+        this.jobManagerJobMetricGroup = jobManagerJobMetricGroup;
+        return this;
+    }
+
+    public DefaultSchedulerBuilder setShuffleMaster(ShuffleMaster<?> shuffleMaster) {
+        this.shuffleMaster = shuffleMaster;
+        return this;
+    }
+
+    public DefaultSchedulerBuilder setPartitionTracker(JobMasterPartitionTracker partitionTracker) {
+        this.partitionTracker = partitionTracker;
+        return this;
+    }
+
+    public DefaultSchedulerBuilder setSchedulingStrategyFactory(
+            SchedulingStrategyFactory schedulingStrategyFactory) {
+        this.schedulingStrategyFactory = schedulingStrategyFactory;
+        return this;
+    }
+
+    public DefaultSchedulerBuilder setFailoverStrategyFactory(
+            FailoverStrategy.Factory failoverStrategyFactory) {
+        this.failoverStrategyFactory = failoverStrategyFactory;
+        return this;
+    }
+
+    public DefaultSchedulerBuilder setRestartBackoffTimeStrategy(
+            RestartBackoffTimeStrategy restartBackoffTimeStrategy) {
+        this.restartBackoffTimeStrategy = restartBackoffTimeStrategy;
+        return this;
+    }
+
+    public DefaultSchedulerBuilder setExecutionOperations(ExecutionOperations executionOperations) {
+        this.executionOperations = executionOperations;
+        return this;
+    }
+
+    public DefaultSchedulerBuilder setExecutionVertexVersioner(
+            ExecutionVertexVersioner executionVertexVersioner) {
+        this.executionVertexVersioner = executionVertexVersioner;
+        return this;
+    }
+
+    public DefaultSchedulerBuilder setExecutionSlotAllocatorFactory(
+            ExecutionSlotAllocatorFactory executionSlotAllocatorFactory) {
+        this.executionSlotAllocatorFactory = executionSlotAllocatorFactory;
+        return this;
+    }
+
+    public DefaultSchedulerBuilder setJobStatusListener(JobStatusListener jobStatusListener) {
+        this.jobStatusListener = jobStatusListener;
+        return this;
+    }
+
+    public DefaultSchedulerBuilder setExecutionDeployerFactory(
+            ExecutionDeployer.Factory executionDeployerFactory) {
+        this.executionDeployerFactory = executionDeployerFactory;
+        return this;
+    }
+
+    public DefaultSchedulerBuilder setVertexParallelismDecider(
+            VertexParallelismDecider vertexParallelismDecider) {
+        this.vertexParallelismDecider = vertexParallelismDecider;
+        return this;
+    }
+
+    public DefaultSchedulerBuilder setDefaultMaxParallelism(int defaultMaxParallelism) {
+        this.defaultMaxParallelism = defaultMaxParallelism;
+        return this;
+    }
+
+    public DefaultScheduler build() throws Exception {
+        return new DefaultScheduler(
+                log,
+                jobGraph,
+                ioExecutor,
+                jobMasterConfiguration,
+                componentMainThreadExecutor -> {},
+                delayExecutor,
+                userCodeLoader,
+                checkpointCleaner,
+                checkpointRecoveryFactory,
+                jobManagerJobMetricGroup,
+                schedulingStrategyFactory,
+                failoverStrategyFactory,
+                restartBackoffTimeStrategy,
+                executionOperations,
+                executionVertexVersioner,
+                executionSlotAllocatorFactory,
+                System.currentTimeMillis(),
+                mainThreadExecutor,
+                jobStatusListener,
+                createExecutionGraphFactory(false),
+                shuffleMaster,
+                rpcTimeout,
+                computeVertexParallelismStore(jobGraph),
+                executionDeployerFactory);
+    }
+
+    public AdaptiveBatchScheduler buildAdaptiveBatchJobScheduler() throws Exception {
+        return new AdaptiveBatchScheduler(
+                log,
+                jobGraph,
+                ioExecutor,
+                jobMasterConfiguration,
+                componentMainThreadExecutor -> {},
+                delayExecutor,
+                userCodeLoader,
+                checkpointCleaner,
+                checkpointRecoveryFactory,
+                jobManagerJobMetricGroup,
+                new VertexwiseSchedulingStrategy.Factory(),
+                failoverStrategyFactory,
+                restartBackoffTimeStrategy,
+                executionOperations,
+                executionVertexVersioner,
+                executionSlotAllocatorFactory,
+                System.currentTimeMillis(),
+                mainThreadExecutor,
+                jobStatusListener,
+                createExecutionGraphFactory(true),
+                shuffleMaster,
+                rpcTimeout,
+                vertexParallelismDecider,
+                defaultMaxParallelism);
+    }
+
+    private ExecutionGraphFactory createExecutionGraphFactory(boolean isDynamicGraph) {
+        return new DefaultExecutionGraphFactory(
+                jobMasterConfiguration,
+                userCodeLoader,
+                new DefaultExecutionDeploymentTracker(),
+                futureExecutor,
+                ioExecutor,
+                rpcTimeout,
+                jobManagerJobMetricGroup,
+                blobWriter,
+                shuffleMaster,
+                partitionTracker,
+                isDynamicGraph,
+                new ExecutionJobVertex.Factory());
+    }
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
index 733dbbc3d6e..d78610c25aa 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
@@ -1599,7 +1599,7 @@ public class DefaultSchedulerTest extends TestLogger {
                         final JobGraph jobGraph = singleJobVertexJobGraph(1);
                         enableCheckpointing(jobGraph);
                         try {
-                            return new SchedulerTestingUtils.DefaultSchedulerBuilder(
+                            return new DefaultSchedulerBuilder(
                                             jobGraph,
                                             ComponentMainThreadExecutorServiceAdapter
                                                     .forSingleThreadExecutor(executorService),
@@ -1844,10 +1844,9 @@ public class DefaultSchedulerTest extends TestLogger {
                 .build();
     }
 
-    private SchedulerTestingUtils.DefaultSchedulerBuilder createSchedulerBuilder(
-            final JobGraph jobGraph, final ComponentMainThreadExecutor mainThreadExecutor)
-            throws Exception {
-        return new SchedulerTestingUtils.DefaultSchedulerBuilder(
+    private DefaultSchedulerBuilder createSchedulerBuilder(
+            final JobGraph jobGraph, final ComponentMainThreadExecutor mainThreadExecutor) {
+        return new DefaultSchedulerBuilder(
                         jobGraph,
                         mainThreadExecutor,
                         executor,
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerTestingUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerTestingUtils.java
index ba6b7a0d880..eb09a7e139c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerTestingUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerTestingUtils.java
@@ -20,72 +20,38 @@ package org.apache.flink.runtime.scheduler;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.blob.BlobWriter;
-import org.apache.flink.runtime.blob.VoidBlobWriter;
 import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
 import org.apache.flink.runtime.checkpoint.CheckpointException;
 import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
-import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
 import org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy;
-import org.apache.flink.runtime.checkpoint.CheckpointsCleaner;
 import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
 import org.apache.flink.runtime.checkpoint.PendingCheckpoint;
-import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
 import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
-import org.apache.flink.runtime.executiongraph.JobStatusListener;
-import org.apache.flink.runtime.executiongraph.failover.flip1.FailoverStrategy;
-import org.apache.flink.runtime.executiongraph.failover.flip1.NoRestartBackoffTimeStrategy;
-import org.apache.flink.runtime.executiongraph.failover.flip1.RestartBackoffTimeStrategy;
-import org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy;
-import org.apache.flink.runtime.executiongraph.failover.flip1.TestRestartBackoffTimeStrategy;
-import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
-import org.apache.flink.runtime.io.network.partition.JobMasterPartitionTracker;
-import org.apache.flink.runtime.io.network.partition.NoOpJobMasterPartitionTracker;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
 import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
-import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
-import org.apache.flink.runtime.jobmaster.DefaultExecutionDeploymentTracker;
 import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotProvider;
-import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
-import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup;
-import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
-import org.apache.flink.runtime.operators.coordination.OperatorEvent;
 import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
-import org.apache.flink.runtime.scheduler.strategy.PipelinedRegionSchedulingStrategy;
-import org.apache.flink.runtime.scheduler.strategy.SchedulingStrategyFactory;
-import org.apache.flink.runtime.shuffle.ShuffleMaster;
-import org.apache.flink.runtime.shuffle.ShuffleTestUtils;
 import org.apache.flink.runtime.state.CheckpointStorage;
 import org.apache.flink.runtime.state.StateBackend;
-import org.apache.flink.runtime.taskexecutor.TaskExecutorOperatorEventGateway;
 import org.apache.flink.runtime.taskmanager.TaskExecutionState;
 import org.apache.flink.util.SerializedValue;
 import org.apache.flink.util.TernaryBoolean;
-import org.apache.flink.util.concurrent.ScheduledExecutor;
-import org.apache.flink.util.concurrent.ScheduledExecutorServiceAdapter;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
 
 import java.io.IOException;
 import java.util.Collection;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Executor;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.stream.Collectors;
 import java.util.stream.StreamSupport;
 
-import static org.apache.flink.runtime.scheduler.SchedulerBase.computeVertexParallelismStore;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
@@ -94,8 +60,6 @@ import static org.junit.Assert.fail;
 /** A utility class to create {@link DefaultScheduler} instances for testing. */
 public class SchedulerTestingUtils {
 
-    private static final Logger LOG = LoggerFactory.getLogger(SchedulerTestingUtils.class);
-
     private static final long DEFAULT_CHECKPOINT_TIMEOUT_MS = 10 * 60 * 1000;
 
     private static final Time DEFAULT_TIMEOUT = Time.seconds(300);
@@ -110,47 +74,6 @@ public class SchedulerTestingUtils {
         return new DefaultSchedulerBuilder(jobGraph, mainThreadExecutor, executorService).build();
     }
 
-    public static DefaultSchedulerBuilder createSchedulerBuilder(
-            JobGraph jobGraph,
-            ComponentMainThreadExecutor mainThreadExecutor,
-            ScheduledExecutorService scheduledExecutorService) {
-
-        return createSchedulerBuilder(
-                jobGraph,
-                mainThreadExecutor,
-                new SimpleAckingTaskManagerGateway(),
-                scheduledExecutorService);
-    }
-
-    public static DefaultSchedulerBuilder createSchedulerBuilder(
-            JobGraph jobGraph,
-            ComponentMainThreadExecutor mainThreadExecutor,
-            TaskExecutorOperatorEventGateway operatorEventGateway,
-            ScheduledExecutorService scheduledExecutorService) {
-
-        final TaskManagerGateway gateway =
-                operatorEventGateway instanceof TaskManagerGateway
-                        ? (TaskManagerGateway) operatorEventGateway
-                        : new TaskExecutorOperatorEventGatewayAdapter(operatorEventGateway);
-
-        return createSchedulerBuilder(
-                jobGraph, mainThreadExecutor, gateway, scheduledExecutorService);
-    }
-
-    private static DefaultSchedulerBuilder createSchedulerBuilder(
-            JobGraph jobGraph,
-            ComponentMainThreadExecutor mainThreadExecutor,
-            TaskManagerGateway taskManagerGateway,
-            ScheduledExecutorService executorService) {
-
-        return new SchedulerTestingUtils.DefaultSchedulerBuilder(
-                        jobGraph, mainThreadExecutor, executorService)
-                .setSchedulingStrategyFactory(new PipelinedRegionSchedulingStrategy.Factory())
-                .setRestartBackoffTimeStrategy(new TestRestartBackoffTimeStrategy(true, 0))
-                .setExecutionSlotAllocatorFactory(
-                        new TestExecutionSlotAllocatorFactory(taskManagerGateway));
-    }
-
     public static void enableCheckpointing(final JobGraph jobGraph) {
         enableCheckpointing(jobGraph, null, null);
     }
@@ -353,22 +276,6 @@ public class SchedulerTestingUtils {
 
     // ------------------------------------------------------------------------
 
-    private static final class TaskExecutorOperatorEventGatewayAdapter
-            extends SimpleAckingTaskManagerGateway {
-
-        private final TaskExecutorOperatorEventGateway operatorGateway;
-
-        TaskExecutorOperatorEventGatewayAdapter(TaskExecutorOperatorEventGateway operatorGateway) {
-            this.operatorGateway = operatorGateway;
-        }
-
-        @Override
-        public CompletableFuture<Acknowledge> sendOperatorEventToTask(
-                ExecutionAttemptID task, OperatorID operator, SerializedValue<OperatorEvent> evt) {
-            return operatorGateway.sendOperatorEventToTask(task, operator, evt);
-        }
-    }
-
     public static SlotSharingExecutionSlotAllocatorFactory
             newSlotSharingExecutionSlotAllocatorFactory() {
         return newSlotSharingExecutionSlotAllocatorFactory(
@@ -390,227 +297,4 @@ public class SchedulerTestingUtils {
                 allocationTimeout,
                 new LocalInputPreferredSlotSharingStrategy.Factory());
     }
-
-    /** Builder for {@link DefaultScheduler}. */
-    public static class DefaultSchedulerBuilder {
-        protected final JobGraph jobGraph;
-
-        protected final ComponentMainThreadExecutor mainThreadExecutor;
-
-        protected SchedulingStrategyFactory schedulingStrategyFactory =
-                new PipelinedRegionSchedulingStrategy.Factory();
-
-        protected Logger log = LOG;
-        protected Executor ioExecutor;
-        protected Configuration jobMasterConfiguration = new Configuration();
-        protected ScheduledExecutorService futureExecutor;
-        protected ScheduledExecutor delayExecutor;
-        protected ClassLoader userCodeLoader = ClassLoader.getSystemClassLoader();
-        protected CheckpointsCleaner checkpointCleaner = new CheckpointsCleaner();
-        protected CheckpointRecoveryFactory checkpointRecoveryFactory =
-                new StandaloneCheckpointRecoveryFactory();
-        protected Time rpcTimeout = DEFAULT_TIMEOUT;
-        protected BlobWriter blobWriter = VoidBlobWriter.getInstance();
-        protected JobManagerJobMetricGroup jobManagerJobMetricGroup =
-                UnregisteredMetricGroups.createUnregisteredJobManagerJobMetricGroup();
-        protected ShuffleMaster<?> shuffleMaster = ShuffleTestUtils.DEFAULT_SHUFFLE_MASTER;
-        protected JobMasterPartitionTracker partitionTracker =
-                NoOpJobMasterPartitionTracker.INSTANCE;
-        protected FailoverStrategy.Factory failoverStrategyFactory =
-                new RestartPipelinedRegionFailoverStrategy.Factory();
-        protected RestartBackoffTimeStrategy restartBackoffTimeStrategy =
-                NoRestartBackoffTimeStrategy.INSTANCE;
-        protected ExecutionOperations executionOperations = new DefaultExecutionOperations();
-        protected ExecutionVertexVersioner executionVertexVersioner =
-                new ExecutionVertexVersioner();
-        protected ExecutionSlotAllocatorFactory executionSlotAllocatorFactory =
-                new TestExecutionSlotAllocatorFactory();
-        protected JobStatusListener jobStatusListener = (ignoredA, ignoredB, ignoredC) -> {};
-        protected ExecutionDeployer.Factory executionDeployerFactory =
-                new DefaultExecutionDeployer.Factory();
-
-        public DefaultSchedulerBuilder(
-                final JobGraph jobGraph,
-                ComponentMainThreadExecutor mainThreadExecutor,
-                ScheduledExecutorService generalExecutorService) {
-            this(
-                    jobGraph,
-                    mainThreadExecutor,
-                    generalExecutorService,
-                    generalExecutorService,
-                    new ScheduledExecutorServiceAdapter(generalExecutorService));
-        }
-
-        public DefaultSchedulerBuilder(
-                final JobGraph jobGraph,
-                ComponentMainThreadExecutor mainThreadExecutor,
-                Executor ioExecutor,
-                ScheduledExecutorService futureExecutor,
-                ScheduledExecutor delayExecuto) {
-            this.jobGraph = jobGraph;
-            this.mainThreadExecutor = mainThreadExecutor;
-            this.ioExecutor = ioExecutor;
-            this.futureExecutor = futureExecutor;
-            this.delayExecutor = delayExecuto;
-        }
-
-        public DefaultSchedulerBuilder setLogger(final Logger log) {
-            this.log = log;
-            return this;
-        }
-
-        public DefaultSchedulerBuilder setIoExecutor(final Executor ioExecutor) {
-            this.ioExecutor = ioExecutor;
-            return this;
-        }
-
-        public DefaultSchedulerBuilder setJobMasterConfiguration(
-                final Configuration jobMasterConfiguration) {
-            this.jobMasterConfiguration = jobMasterConfiguration;
-            return this;
-        }
-
-        public DefaultSchedulerBuilder setFutureExecutor(
-                final ScheduledExecutorService futureExecutor) {
-            this.futureExecutor = futureExecutor;
-            return this;
-        }
-
-        public DefaultSchedulerBuilder setDelayExecutor(final ScheduledExecutor delayExecutor) {
-            this.delayExecutor = delayExecutor;
-            return this;
-        }
-
-        public DefaultSchedulerBuilder setUserCodeLoader(final ClassLoader userCodeLoader) {
-            this.userCodeLoader = userCodeLoader;
-            return this;
-        }
-
-        public DefaultSchedulerBuilder setCheckpointCleaner(
-                final CheckpointsCleaner checkpointsCleaner) {
-            this.checkpointCleaner = checkpointsCleaner;
-            return this;
-        }
-
-        public DefaultSchedulerBuilder setCheckpointRecoveryFactory(
-                final CheckpointRecoveryFactory checkpointRecoveryFactory) {
-            this.checkpointRecoveryFactory = checkpointRecoveryFactory;
-            return this;
-        }
-
-        public DefaultSchedulerBuilder setRpcTimeout(final Time rpcTimeout) {
-            this.rpcTimeout = rpcTimeout;
-            return this;
-        }
-
-        public DefaultSchedulerBuilder setBlobWriter(final BlobWriter blobWriter) {
-            this.blobWriter = blobWriter;
-            return this;
-        }
-
-        public DefaultSchedulerBuilder setJobManagerJobMetricGroup(
-                final JobManagerJobMetricGroup jobManagerJobMetricGroup) {
-            this.jobManagerJobMetricGroup = jobManagerJobMetricGroup;
-            return this;
-        }
-
-        public DefaultSchedulerBuilder setShuffleMaster(final ShuffleMaster<?> shuffleMaster) {
-            this.shuffleMaster = shuffleMaster;
-            return this;
-        }
-
-        public DefaultSchedulerBuilder setPartitionTracker(
-                final JobMasterPartitionTracker partitionTracker) {
-            this.partitionTracker = partitionTracker;
-            return this;
-        }
-
-        public DefaultSchedulerBuilder setSchedulingStrategyFactory(
-                final SchedulingStrategyFactory schedulingStrategyFactory) {
-            this.schedulingStrategyFactory = schedulingStrategyFactory;
-            return this;
-        }
-
-        public DefaultSchedulerBuilder setFailoverStrategyFactory(
-                final FailoverStrategy.Factory failoverStrategyFactory) {
-            this.failoverStrategyFactory = failoverStrategyFactory;
-            return this;
-        }
-
-        public DefaultSchedulerBuilder setRestartBackoffTimeStrategy(
-                final RestartBackoffTimeStrategy restartBackoffTimeStrategy) {
-            this.restartBackoffTimeStrategy = restartBackoffTimeStrategy;
-            return this;
-        }
-
-        public DefaultSchedulerBuilder setExecutionOperations(
-                final ExecutionOperations executionOperations) {
-            this.executionOperations = executionOperations;
-            return this;
-        }
-
-        public DefaultSchedulerBuilder setExecutionVertexVersioner(
-                final ExecutionVertexVersioner executionVertexVersioner) {
-            this.executionVertexVersioner = executionVertexVersioner;
-            return this;
-        }
-
-        public DefaultSchedulerBuilder setExecutionSlotAllocatorFactory(
-                final ExecutionSlotAllocatorFactory executionSlotAllocatorFactory) {
-            this.executionSlotAllocatorFactory = executionSlotAllocatorFactory;
-            return this;
-        }
-
-        public DefaultSchedulerBuilder setJobStatusListener(JobStatusListener jobStatusListener) {
-            this.jobStatusListener = jobStatusListener;
-            return this;
-        }
-
-        public DefaultSchedulerBuilder setExecutionDeployerFactory(
-                ExecutionDeployer.Factory executionDeployerFactory) {
-            this.executionDeployerFactory = executionDeployerFactory;
-            return this;
-        }
-
-        public DefaultScheduler build() throws Exception {
-            final ExecutionGraphFactory executionGraphFactory =
-                    new DefaultExecutionGraphFactory(
-                            jobMasterConfiguration,
-                            userCodeLoader,
-                            new DefaultExecutionDeploymentTracker(),
-                            futureExecutor,
-                            ioExecutor,
-                            rpcTimeout,
-                            jobManagerJobMetricGroup,
-                            blobWriter,
-                            shuffleMaster,
-                            partitionTracker);
-
-            return new DefaultScheduler(
-                    log,
-                    jobGraph,
-                    ioExecutor,
-                    jobMasterConfiguration,
-                    componentMainThreadExecutor -> {},
-                    delayExecutor,
-                    userCodeLoader,
-                    checkpointCleaner,
-                    checkpointRecoveryFactory,
-                    jobManagerJobMetricGroup,
-                    schedulingStrategyFactory,
-                    failoverStrategyFactory,
-                    restartBackoffTimeStrategy,
-                    executionOperations,
-                    executionVertexVersioner,
-                    executionSlotAllocatorFactory,
-                    System.currentTimeMillis(),
-                    mainThreadExecutor,
-                    jobStatusListener,
-                    executionGraphFactory,
-                    shuffleMaster,
-                    rpcTimeout,
-                    computeVertexParallelismStore(jobGraph),
-                    executionDeployerFactory);
-        }
-    }
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchSchedulerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchSchedulerTest.java
index e8636ea8435..d6508d8b66a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchSchedulerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchSchedulerTest.java
@@ -35,6 +35,7 @@ import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
 import org.apache.flink.runtime.jobgraph.DistributionPattern;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.scheduler.DefaultSchedulerBuilder;
 import org.apache.flink.runtime.scheduler.SchedulerBase;
 import org.apache.flink.runtime.taskmanager.TaskExecutionState;
 import org.apache.flink.runtime.testtasks.NoOpInvokable;
@@ -177,15 +178,10 @@ public class AdaptiveBatchSchedulerTest extends TestLogger {
         configuration.set(
                 JobManagerOptions.SCHEDULER, JobManagerOptions.SchedulerType.AdaptiveBatch);
 
-        final AdaptiveBatchSchedulerTestUtils.AdaptiveBatchSchedulerBuilder schedulerBuilder =
-                (AdaptiveBatchSchedulerTestUtils.AdaptiveBatchSchedulerBuilder)
-                        new AdaptiveBatchSchedulerTestUtils.AdaptiveBatchSchedulerBuilder(
-                                        jobGraph,
-                                        mainThreadExecutor,
-                                        EXECUTOR_RESOURCE.getExecutor())
-                                .setJobMasterConfiguration(configuration);
-        schedulerBuilder.setVertexParallelismDecider((ignored) -> 10);
-
-        return schedulerBuilder.build();
+        return new DefaultSchedulerBuilder(
+                        jobGraph, mainThreadExecutor, EXECUTOR_RESOURCE.getExecutor())
+                .setJobMasterConfiguration(configuration)
+                .setVertexParallelismDecider((ignored) -> 10)
+                .buildAdaptiveBatchJobScheduler();
     }
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchSchedulerTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchSchedulerTestUtils.java
deleted file mode 100644
index d462c726bc4..00000000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchSchedulerTestUtils.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.flink.runtime.scheduler.adaptivebatch;
-
-import org.apache.flink.configuration.JobManagerOptions;
-import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
-import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
-import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.jobmaster.DefaultExecutionDeploymentTracker;
-import org.apache.flink.runtime.scheduler.DefaultExecutionGraphFactory;
-import org.apache.flink.runtime.scheduler.ExecutionGraphFactory;
-import org.apache.flink.runtime.scheduler.SchedulerTestingUtils;
-import org.apache.flink.runtime.scheduler.strategy.VertexwiseSchedulingStrategy;
-
-import java.util.concurrent.ScheduledExecutorService;
-
-/** A utility class to create {@link AdaptiveBatchScheduler} instances for testing. */
-public class AdaptiveBatchSchedulerTestUtils {
-
-    /** Builder for {@link AdaptiveBatchScheduler}. */
-    public static class AdaptiveBatchSchedulerBuilder
-            extends SchedulerTestingUtils.DefaultSchedulerBuilder {
-
-        private VertexParallelismDecider vertexParallelismDecider = (ignored) -> 0;
-
-        private int defaultMaxParallelism =
-                JobManagerOptions.ADAPTIVE_BATCH_SCHEDULER_MAX_PARALLELISM.defaultValue();
-
-        public AdaptiveBatchSchedulerBuilder(
-                JobGraph jobGraph,
-                ComponentMainThreadExecutor mainThreadExecutor,
-                ScheduledExecutorService executorService) {
-            super(jobGraph, mainThreadExecutor, executorService);
-            setSchedulingStrategyFactory(new VertexwiseSchedulingStrategy.Factory());
-        }
-
-        public void setVertexParallelismDecider(VertexParallelismDecider vertexParallelismDecider) {
-            this.vertexParallelismDecider = vertexParallelismDecider;
-        }
-
-        public void setDefaultMaxParallelism(int defaultMaxParallelism) {
-            this.defaultMaxParallelism = defaultMaxParallelism;
-        }
-
-        @Override
-        public AdaptiveBatchScheduler build() throws Exception {
-            final ExecutionGraphFactory executionGraphFactory =
-                    new DefaultExecutionGraphFactory(
-                            jobMasterConfiguration,
-                            userCodeLoader,
-                            new DefaultExecutionDeploymentTracker(),
-                            futureExecutor,
-                            ioExecutor,
-                            rpcTimeout,
-                            jobManagerJobMetricGroup,
-                            blobWriter,
-                            shuffleMaster,
-                            partitionTracker,
-                            true,
-                            new ExecutionJobVertex.Factory());
-
-            return new AdaptiveBatchScheduler(
-                    log,
-                    jobGraph,
-                    ioExecutor,
-                    jobMasterConfiguration,
-                    componentMainThreadExecutor -> {},
-                    delayExecutor,
-                    userCodeLoader,
-                    checkpointCleaner,
-                    checkpointRecoveryFactory,
-                    jobManagerJobMetricGroup,
-                    schedulingStrategyFactory,
-                    failoverStrategyFactory,
-                    restartBackoffTimeStrategy,
-                    executionOperations,
-                    executionVertexVersioner,
-                    executionSlotAllocatorFactory,
-                    System.currentTimeMillis(),
-                    mainThreadExecutor,
-                    jobStatusListener,
-                    executionGraphFactory,
-                    shuffleMaster,
-                    rpcTimeout,
-                    vertexParallelismDecider,
-                    defaultMaxParallelism);
-        }
-    }
-}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/benchmark/SchedulerBenchmarkUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/benchmark/SchedulerBenchmarkUtils.java
index ed5ef5f7eea..887634cd974 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/benchmark/SchedulerBenchmarkUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/benchmark/SchedulerBenchmarkUtils.java
@@ -36,7 +36,7 @@ import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobmaster.LogicalSlot;
 import org.apache.flink.runtime.jobmaster.TestingLogicalSlotBuilder;
 import org.apache.flink.runtime.scheduler.DefaultScheduler;
-import org.apache.flink.runtime.scheduler.SchedulerTestingUtils;
+import org.apache.flink.runtime.scheduler.DefaultSchedulerBuilder;
 import org.apache.flink.runtime.taskmanager.TaskExecutionState;
 import org.apache.flink.runtime.testtasks.NoOpInvokable;
 import org.apache.flink.util.concurrent.ScheduledExecutorServiceAdapter;
@@ -104,8 +104,7 @@ public class SchedulerBenchmarkUtils {
                 ComponentMainThreadExecutorServiceAdapter.forMainThread();
 
         final DefaultScheduler scheduler =
-                SchedulerTestingUtils.createSchedulerBuilder(
-                                jobGraph, mainThreadExecutor, scheduledExecutorService)
+                new DefaultSchedulerBuilder(jobGraph, mainThreadExecutor, scheduledExecutorService)
                         .setIoExecutor(scheduledExecutorService)
                         .setFutureExecutor(scheduledExecutorService)
                         .setDelayExecutor(
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/benchmark/e2e/SchedulerEndToEndBenchmarkBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/benchmark/e2e/SchedulerEndToEndBenchmarkBase.java
index 42fcf4cf317..318375d8bc0 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/benchmark/e2e/SchedulerEndToEndBenchmarkBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/benchmark/e2e/SchedulerEndToEndBenchmarkBase.java
@@ -28,6 +28,7 @@ import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotProvider;
 import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotProviderImpl;
 import org.apache.flink.runtime.jobmaster.slotpool.SlotPool;
 import org.apache.flink.runtime.scheduler.DefaultScheduler;
+import org.apache.flink.runtime.scheduler.DefaultSchedulerBuilder;
 import org.apache.flink.runtime.scheduler.SchedulerTestingUtils;
 import org.apache.flink.runtime.scheduler.benchmark.JobConfiguration;
 import org.apache.flink.runtime.scheduler.benchmark.SchedulerBenchmarkBase;
@@ -76,8 +77,7 @@ public class SchedulerEndToEndBenchmarkBase extends SchedulerBenchmarkBase {
             ComponentMainThreadExecutor mainThreadExecutor,
             ScheduledExecutorService executorService)
             throws Exception {
-        return new SchedulerTestingUtils.DefaultSchedulerBuilder(
-                        jobGraph, mainThreadExecutor, executorService)
+        return new DefaultSchedulerBuilder(jobGraph, mainThreadExecutor, executorService)
                 .setExecutionSlotAllocatorFactory(
                         SchedulerTestingUtils.newSlotSharingExecutionSlotAllocatorFactory(
                                 physicalSlotProvider))
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/slowtaskdetector/ExecutionTimeBasedSlowTaskDetectorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/slowtaskdetector/ExecutionTimeBasedSlowTaskDetectorTest.java
index af2624a55be..620fcbe67f7 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/slowtaskdetector/ExecutionTimeBasedSlowTaskDetectorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/slowtaskdetector/ExecutionTimeBasedSlowTaskDetectorTest.java
@@ -31,9 +31,9 @@ import org.apache.flink.runtime.jobgraph.DistributionPattern;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
 import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.scheduler.DefaultSchedulerBuilder;
 import org.apache.flink.runtime.scheduler.SchedulerBase;
 import org.apache.flink.runtime.scheduler.SchedulerTestingUtils;
-import org.apache.flink.runtime.scheduler.adaptivebatch.AdaptiveBatchSchedulerTestUtils;
 import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
 import org.apache.flink.runtime.testtasks.NoOpInvokable;
 import org.apache.flink.testutils.TestingUtils;
@@ -221,11 +221,11 @@ class ExecutionTimeBasedSlowTaskDetectorTest {
         final JobGraph jobGraph = JobGraphTestUtils.streamingJobGraph(jobVertices);
 
         final SchedulerBase scheduler =
-                new AdaptiveBatchSchedulerTestUtils.AdaptiveBatchSchedulerBuilder(
+                new DefaultSchedulerBuilder(
                                 jobGraph,
                                 ComponentMainThreadExecutorServiceAdapter.forMainThread(),
                                 EXECUTOR_RESOURCE.getExecutor())
-                        .build();
+                        .buildAdaptiveBatchJobScheduler();
 
         final ExecutionGraph executionGraph = scheduler.getExecutionGraph();
 


[flink] 04/05: [hotfix] Migrate SlotSharingExecutionSlotAllocatorTest to JUnit5

Posted by zh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

zhuzh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit c75a242e94c8482a53214d523c7f204c18be1f4a
Author: Zhu Zhu <re...@gmail.com>
AuthorDate: Mon Jul 11 18:02:18 2022 +0800

    [hotfix] Migrate SlotSharingExecutionSlotAllocatorTest to JUnit5
---
 .../SlotSharingExecutionSlotAllocatorTest.java     | 196 +++++++++------------
 1 file changed, 86 insertions(+), 110 deletions(-)

diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SlotSharingExecutionSlotAllocatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SlotSharingExecutionSlotAllocatorTest.java
index aeac6d7cc11..8fbbf3adc91 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SlotSharingExecutionSlotAllocatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SlotSharingExecutionSlotAllocatorTest.java
@@ -34,10 +34,9 @@ import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotRequestBulkChecke
 import org.apache.flink.runtime.scheduler.SharedSlotProfileRetriever.SharedSlotProfileRetrieverFactory;
 import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
 import org.apache.flink.util.FlinkException;
-import org.apache.flink.util.TestLogger;
 import org.apache.flink.util.function.BiConsumerWithException;
 
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
 
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -56,18 +55,11 @@ import java.util.stream.Collectors;
 
 import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.createExecutionAttemptId;
 import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.createRandomExecutionVertexId;
-import static org.hamcrest.CoreMatchers.is;
-import static org.hamcrest.Matchers.containsInAnyOrder;
-import static org.hamcrest.Matchers.empty;
-import static org.hamcrest.Matchers.hasSize;
-import static org.hamcrest.Matchers.instanceOf;
-import static org.hamcrest.Matchers.notNullValue;
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /** Test suite for {@link SlotSharingExecutionSlotAllocator}. */
-public class SlotSharingExecutionSlotAllocatorTest extends TestLogger {
+class SlotSharingExecutionSlotAllocatorTest {
     private static final Time ALLOCATION_TIMEOUT = Time.milliseconds(100L);
     private static final ResourceProfile RESOURCE_PROFILE = ResourceProfile.fromResources(3, 5);
 
@@ -77,7 +69,7 @@ public class SlotSharingExecutionSlotAllocatorTest extends TestLogger {
     private static final ExecutionVertexID EV4 = createRandomExecutionVertexId();
 
     @Test
-    public void testSlotProfileRequestAskedBulkAndGroup() {
+    void testSlotProfileRequestAskedBulkAndGroup() {
         AllocationContext context = AllocationContext.newBuilder().addGroup(EV1, EV2).build();
         ExecutionSlotSharingGroup executionSlotSharingGroup =
                 context.getSlotSharingStrategy().getExecutionSlotSharingGroup(EV1);
@@ -86,15 +78,14 @@ public class SlotSharingExecutionSlotAllocatorTest extends TestLogger {
 
         List<Set<ExecutionVertexID>> askedBulks =
                 context.getSlotProfileRetrieverFactory().getAskedBulks();
-        assertThat(askedBulks, hasSize(1));
-        assertThat(askedBulks.get(0), containsInAnyOrder(EV1, EV2));
-        assertThat(
-                context.getSlotProfileRetrieverFactory().getAskedGroups(),
-                containsInAnyOrder(executionSlotSharingGroup));
+        assertThat(askedBulks).hasSize(1);
+        assertThat(askedBulks.get(0)).containsExactlyInAnyOrder(EV1, EV2);
+        assertThat(context.getSlotProfileRetrieverFactory().getAskedGroups())
+                .containsExactly(executionSlotSharingGroup);
     }
 
     @Test
-    public void testSlotRequestProfile() {
+    void testSlotRequestProfile() {
         AllocationContext context = AllocationContext.newBuilder().addGroup(EV1, EV2, EV3).build();
         ResourceProfile physicalsSlotResourceProfile = RESOURCE_PROFILE.multiply(3);
 
@@ -102,16 +93,13 @@ public class SlotSharingExecutionSlotAllocatorTest extends TestLogger {
 
         Optional<PhysicalSlotRequest> slotRequest =
                 context.getSlotProvider().getRequests().values().stream().findFirst();
-        assertThat(slotRequest.isPresent(), is(true));
-        slotRequest.ifPresent(
-                r ->
-                        assertThat(
-                                r.getSlotProfile().getPhysicalSlotResourceProfile(),
-                                is(physicalsSlotResourceProfile)));
+        assertThat(slotRequest).isPresent();
+        assertThat(slotRequest.get().getSlotProfile().getPhysicalSlotResourceProfile())
+                .isEqualTo(physicalsSlotResourceProfile);
     }
 
     @Test
-    public void testAllocatePhysicalSlotForNewSharedSlot() {
+    void testAllocatePhysicalSlotForNewSharedSlot() {
         AllocationContext context =
                 AllocationContext.newBuilder().addGroup(EV1, EV2).addGroup(EV3, EV4).build();
 
@@ -119,12 +107,12 @@ public class SlotSharingExecutionSlotAllocatorTest extends TestLogger {
                 context.allocateSlotsFor(EV1, EV2, EV3, EV4);
         Collection<ExecutionVertexID> assignIds = getAssignIds(executionSlotAssignments);
 
-        assertThat(assignIds, containsInAnyOrder(EV1, EV2, EV3, EV4));
-        assertThat(context.getSlotProvider().getRequests().keySet(), hasSize(2));
+        assertThat(assignIds).containsExactlyInAnyOrder(EV1, EV2, EV3, EV4);
+        assertThat(context.getSlotProvider().getRequests()).hasSize(2);
     }
 
     @Test
-    public void testAllocateLogicalSlotFromAvailableSharedSlot() {
+    void testAllocateLogicalSlotFromAvailableSharedSlot() {
         AllocationContext context = AllocationContext.newBuilder().addGroup(EV1, EV2).build();
 
         context.allocateSlotsFor(EV1);
@@ -134,26 +122,24 @@ public class SlotSharingExecutionSlotAllocatorTest extends TestLogger {
         // execution 0 from the first allocateSlotsFor call and execution 1 from the second
         // allocateSlotsFor call
         // share a slot, therefore only one physical slot allocation should happen
-        assertThat(assignIds, containsInAnyOrder(EV2));
-        assertThat(context.getSlotProvider().getRequests().keySet(), hasSize(1));
+        assertThat(assignIds).containsExactly(EV2);
+        assertThat(context.getSlotProvider().getRequests()).hasSize(1);
     }
 
     @Test
-    public void testDuplicateAllocationDoesNotRecreateLogicalSlotFuture()
+    void testDuplicateAllocationDoesNotRecreateLogicalSlotFuture()
             throws ExecutionException, InterruptedException {
         AllocationContext context = AllocationContext.newBuilder().addGroup(EV1).build();
 
         ExecutionSlotAssignment assignment1 = context.allocateSlotsFor(EV1).get(0);
         ExecutionSlotAssignment assignment2 = context.allocateSlotsFor(EV1).get(0);
 
-        assertThat(
-                assignment1.getLogicalSlotFuture().get()
-                        == assignment2.getLogicalSlotFuture().get(),
-                is(true));
+        assertThat(assignment1.getLogicalSlotFuture().get())
+                .isSameAs(assignment2.getLogicalSlotFuture().get());
     }
 
     @Test
-    public void testFailedPhysicalSlotRequestFailsLogicalSlotFuturesAndRemovesSharedSlot() {
+    void testFailedPhysicalSlotRequestFailsLogicalSlotFuturesAndRemovesSharedSlot() {
         AllocationContext context =
                 AllocationContext.newBuilder()
                         .addGroup(EV1)
@@ -166,27 +152,25 @@ public class SlotSharingExecutionSlotAllocatorTest extends TestLogger {
         SlotRequestId slotRequestId =
                 context.getSlotProvider().getFirstRequestOrFail().getSlotRequestId();
 
-        assertThat(logicalSlotFuture.isDone(), is(false));
+        assertThat(logicalSlotFuture).isNotDone();
         context.getSlotProvider()
                 .getResponses()
                 .get(slotRequestId)
                 .completeExceptionally(new Throwable());
-        assertThat(logicalSlotFuture.isCompletedExceptionally(), is(true));
+        assertThat(logicalSlotFuture).isCompletedExceptionally();
 
         // next allocation allocates new shared slot
         context.allocateSlotsFor(EV1);
-        assertThat(context.getSlotProvider().getRequests().keySet(), hasSize(2));
+        assertThat(context.getSlotProvider().getRequests()).hasSize(2);
     }
 
     @Test
-    public void testSlotWillBeOccupiedIndefinitelyFalse()
-            throws ExecutionException, InterruptedException {
+    void testSlotWillBeOccupiedIndefinitelyFalse() throws ExecutionException, InterruptedException {
         testSlotWillBeOccupiedIndefinitely(false);
     }
 
     @Test
-    public void testSlotWillBeOccupiedIndefinitelyTrue()
-            throws ExecutionException, InterruptedException {
+    void testSlotWillBeOccupiedIndefinitelyTrue() throws ExecutionException, InterruptedException {
         testSlotWillBeOccupiedIndefinitely(true);
     }
 
@@ -200,19 +184,18 @@ public class SlotSharingExecutionSlotAllocatorTest extends TestLogger {
         context.allocateSlotsFor(EV1);
 
         PhysicalSlotRequest slotRequest = context.getSlotProvider().getFirstRequestOrFail();
-        assertThat(
-                slotRequest.willSlotBeOccupiedIndefinitely(), is(slotWillBeOccupiedIndefinitely));
+        assertThat(slotRequest.willSlotBeOccupiedIndefinitely())
+                .isEqualTo(slotWillBeOccupiedIndefinitely);
 
         TestingPhysicalSlot physicalSlot =
                 context.getSlotProvider().getResponses().get(slotRequest.getSlotRequestId()).get();
-        assertThat(physicalSlot.getPayload(), notNullValue());
-        assertThat(
-                physicalSlot.getPayload().willOccupySlotIndefinitely(),
-                is(slotWillBeOccupiedIndefinitely));
+        assertThat(physicalSlot.getPayload()).isNotNull();
+        assertThat(physicalSlot.getPayload().willOccupySlotIndefinitely())
+                .isEqualTo(slotWillBeOccupiedIndefinitely);
     }
 
     @Test
-    public void testReturningLogicalSlotsRemovesSharedSlot() throws Exception {
+    void testReturningLogicalSlotsRemovesSharedSlot() throws Exception {
         // physical slot request is completed and completes logical requests
         testLogicalSlotRequestCancellationOrRelease(
                 false,
@@ -221,24 +204,26 @@ public class SlotSharingExecutionSlotAllocatorTest extends TestLogger {
     }
 
     @Test
-    public void testLogicalSlotCancelsPhysicalSlotRequestAndRemovesSharedSlot() throws Exception {
+    void testLogicalSlotCancelsPhysicalSlotRequestAndRemovesSharedSlot() throws Exception {
         // physical slot request is not completed and does not complete logical requests
         testLogicalSlotRequestCancellationOrRelease(
                 true,
                 true,
                 (context, assignment) -> {
                     context.getAllocator().cancel(assignment.getExecutionAttemptId());
-                    try {
-                        assignment.getLogicalSlotFuture().get();
-                        fail("The logical future must finish with the cancellation exception");
-                    } catch (InterruptedException | ExecutionException e) {
-                        assertThat(e.getCause(), instanceOf(CancellationException.class));
-                    }
+                    assertThatThrownBy(
+                                    () -> {
+                                        context.getAllocator()
+                                                .cancel(assignment.getExecutionAttemptId());
+                                        assignment.getLogicalSlotFuture().get();
+                                    })
+                            .as("The logical future must finish with the cancellation exception.")
+                            .hasCauseInstanceOf(CancellationException.class);
                 });
     }
 
     @Test
-    public void
+    void
             testCompletedLogicalSlotCancelationDoesNotCancelPhysicalSlotRequestAndDoesNotRemoveSharedSlot()
                     throws Exception {
         // physical slot request is completed and completes logical requests
@@ -266,7 +251,7 @@ public class SlotSharingExecutionSlotAllocatorTest extends TestLogger {
         AllocationContext context = allocationContextBuilder.build();
 
         List<ExecutionSlotAssignment> assignments = context.allocateSlotsFor(EV1, EV2);
-        assertThat(context.getSlotProvider().getRequests().keySet(), hasSize(1));
+        assertThat(context.getSlotProvider().getRequests()).hasSize(1);
 
         // cancel or release only one sharing logical slots
         cancelOrReleaseAction.accept(context, assignments.get(0));
@@ -274,7 +259,7 @@ public class SlotSharingExecutionSlotAllocatorTest extends TestLogger {
                 context.allocateSlotsFor(EV1, EV2);
         // there should be no more physical slot allocations, as the first logical slot reuses the
         // previous shared slot
-        assertThat(context.getSlotProvider().getRequests().keySet(), hasSize(1));
+        assertThat(context.getSlotProvider().getRequests()).hasSize(1);
 
         // cancel or release all sharing logical slots
         for (ExecutionSlotAssignment assignment : assignmentsAfterOneCancellation) {
@@ -282,22 +267,18 @@ public class SlotSharingExecutionSlotAllocatorTest extends TestLogger {
         }
         SlotRequestId slotRequestId =
                 context.getSlotProvider().getFirstRequestOrFail().getSlotRequestId();
-        assertThat(
-                context.getSlotProvider().getCancellations().containsKey(slotRequestId),
-                is(cancelsPhysicalSlotRequestAndRemovesSharedSlot));
+        assertThat(context.getSlotProvider().getCancellations().containsKey(slotRequestId))
+                .isEqualTo(cancelsPhysicalSlotRequestAndRemovesSharedSlot);
 
         context.allocateSlotsFor(EV3);
         // there should be one more physical slot allocation if the first allocation should be
         // removed with all logical slots
         int expectedNumberOfRequests = cancelsPhysicalSlotRequestAndRemovesSharedSlot ? 2 : 1;
-        assertThat(
-                context.getSlotProvider().getRequests().keySet(),
-                hasSize(expectedNumberOfRequests));
+        assertThat(context.getSlotProvider().getRequests()).hasSize(expectedNumberOfRequests);
     }
 
     @Test
-    public void testPhysicalSlotReleaseLogicalSlots()
-            throws ExecutionException, InterruptedException {
+    void testPhysicalSlotReleaseLogicalSlots() throws ExecutionException, InterruptedException {
         AllocationContext context = AllocationContext.newBuilder().addGroup(EV1, EV2).build();
         List<ExecutionSlotAssignment> assignments = context.allocateSlotsFor(EV1, EV2);
         List<TestingPayload> payloads =
@@ -317,26 +298,23 @@ public class SlotSharingExecutionSlotAllocatorTest extends TestLogger {
                 context.getSlotProvider().getFirstRequestOrFail().getSlotRequestId();
         TestingPhysicalSlot physicalSlot = context.getSlotProvider().getFirstResponseOrFail().get();
 
-        assertThat(
-                payloads.stream().allMatch(payload -> payload.getTerminalStateFuture().isDone()),
-                is(false));
-        assertThat(physicalSlot.getPayload(), notNullValue());
+        assertThat(payloads.stream().allMatch(payload -> payload.getTerminalStateFuture().isDone()))
+                .isFalse();
+        assertThat(physicalSlot.getPayload()).isNotNull();
         physicalSlot.getPayload().release(new Throwable());
-        assertThat(
-                payloads.stream().allMatch(payload -> payload.getTerminalStateFuture().isDone()),
-                is(true));
+        assertThat(payloads.stream().allMatch(payload -> payload.getTerminalStateFuture().isDone()))
+                .isTrue();
 
-        assertThat(
-                context.getSlotProvider().getCancellations().containsKey(slotRequestId), is(true));
+        assertThat(context.getSlotProvider().getCancellations()).containsKey(slotRequestId);
 
         context.allocateSlotsFor(EV1, EV2);
         // there should be one more physical slot allocation, as the first allocation should be
         // removed after releasing all logical slots
-        assertThat(context.getSlotProvider().getRequests().keySet(), hasSize(2));
+        assertThat(context.getSlotProvider().getRequests()).hasSize(2);
     }
 
     @Test
-    public void testSchedulePendingRequestBulkTimeoutCheck() {
+    void testSchedulePendingRequestBulkTimeoutCheck() {
         TestingPhysicalSlotRequestBulkChecker bulkChecker =
                 new TestingPhysicalSlotRequestBulkChecker();
         AllocationContext context = createBulkCheckerContextWithEv12GroupAndEv3Group(bulkChecker);
@@ -344,16 +322,15 @@ public class SlotSharingExecutionSlotAllocatorTest extends TestLogger {
         context.allocateSlotsFor(EV1, EV3);
         PhysicalSlotRequestBulk bulk = bulkChecker.getBulk();
 
-        assertThat(bulk.getPendingRequests(), hasSize(2));
-        assertThat(
-                bulk.getPendingRequests(),
-                containsInAnyOrder(RESOURCE_PROFILE.multiply(2), RESOURCE_PROFILE));
-        assertThat(bulk.getAllocationIdsOfFulfilledRequests(), hasSize(0));
-        assertThat(bulkChecker.getTimeout(), is(ALLOCATION_TIMEOUT));
+        assertThat(bulk.getPendingRequests()).hasSize(2);
+        assertThat(bulk.getPendingRequests())
+                .containsExactlyInAnyOrder(RESOURCE_PROFILE.multiply(2), RESOURCE_PROFILE);
+        assertThat(bulk.getAllocationIdsOfFulfilledRequests()).isEmpty();
+        assertThat(bulkChecker.getTimeout()).isEqualTo(ALLOCATION_TIMEOUT);
     }
 
     @Test
-    public void testRequestFulfilledInBulk() {
+    void testRequestFulfilledInBulk() {
         TestingPhysicalSlotRequestBulkChecker bulkChecker =
                 new TestingPhysicalSlotRequestBulkChecker();
         AllocationContext context = createBulkCheckerContextWithEv12GroupAndEv3Group(bulkChecker);
@@ -364,14 +341,14 @@ public class SlotSharingExecutionSlotAllocatorTest extends TestLogger {
                 fulfilOneOfTwoSlotRequestsAndGetPendingProfile(context, allocationId);
         PhysicalSlotRequestBulk bulk = bulkChecker.getBulk();
 
-        assertThat(bulk.getPendingRequests(), hasSize(1));
-        assertThat(bulk.getPendingRequests(), containsInAnyOrder(pendingSlotResourceProfile));
-        assertThat(bulk.getAllocationIdsOfFulfilledRequests(), hasSize(1));
-        assertThat(bulk.getAllocationIdsOfFulfilledRequests(), containsInAnyOrder(allocationId));
+        assertThat(bulk.getPendingRequests()).hasSize(1);
+        assertThat(bulk.getPendingRequests()).containsExactly(pendingSlotResourceProfile);
+        assertThat(bulk.getAllocationIdsOfFulfilledRequests()).hasSize(1);
+        assertThat(bulk.getAllocationIdsOfFulfilledRequests()).containsExactly(allocationId);
     }
 
     @Test
-    public void testRequestBulkCancel() {
+    void testRequestBulkCancel() {
         TestingPhysicalSlotRequestBulkChecker bulkChecker =
                 new TestingPhysicalSlotRequestBulkChecker();
         AllocationContext context = createBulkCheckerContextWithEv12GroupAndEv3Group(bulkChecker);
@@ -397,11 +374,10 @@ public class SlotSharingExecutionSlotAllocatorTest extends TestLogger {
         // EV3 needs again a physical slot, therefore there are 3 requests overall
         context.allocateSlotsFor(EV1, EV3);
 
-        assertThat(context.getSlotProvider().getRequests().values(), hasSize(3));
+        assertThat(context.getSlotProvider().getRequests()).hasSize(3);
         // either EV1 or EV3 logical slot future is fulfilled before cancellation
-        assertThat(ev1failed != ev3failed, is(true));
-        assertThat(
-                assignments2.get(0).getLogicalSlotFuture().isCompletedExceptionally(), is(false));
+        assertThat(ev1failed).isNotEqualTo(ev3failed);
+        assertThat(assignments2.get(0).getLogicalSlotFuture()).isNotCompletedExceptionally();
     }
 
     private static void releaseLogicalSlot(LogicalSlot slot) {
@@ -410,7 +386,7 @@ public class SlotSharingExecutionSlotAllocatorTest extends TestLogger {
     }
 
     @Test
-    public void testBulkClearIfPhysicalSlotRequestFails() {
+    void testBulkClearIfPhysicalSlotRequestFails() {
         TestingPhysicalSlotRequestBulkChecker bulkChecker =
                 new TestingPhysicalSlotRequestBulkChecker();
         AllocationContext context = createBulkCheckerContextWithEv12GroupAndEv3Group(bulkChecker);
@@ -423,11 +399,11 @@ public class SlotSharingExecutionSlotAllocatorTest extends TestLogger {
                 .completeExceptionally(new Throwable());
         PhysicalSlotRequestBulk bulk = bulkChecker.getBulk();
 
-        assertThat(bulk.getPendingRequests(), hasSize(0));
+        assertThat(bulk.getPendingRequests()).isEmpty();
     }
 
     @Test
-    public void failLogicalSlotsIfPhysicalSlotIsFailed() {
+    void failLogicalSlotsIfPhysicalSlotIsFailed() {
         final TestingPhysicalSlotRequestBulkChecker bulkChecker =
                 new TestingPhysicalSlotRequestBulkChecker();
         AllocationContext context =
@@ -442,17 +418,17 @@ public class SlotSharingExecutionSlotAllocatorTest extends TestLogger {
         final List<ExecutionSlotAssignment> allocatedSlots = context.allocateSlotsFor(EV1, EV2);
 
         for (ExecutionSlotAssignment allocatedSlot : allocatedSlots) {
-            assertTrue(allocatedSlot.getLogicalSlotFuture().isCompletedExceptionally());
+            assertThat(allocatedSlot.getLogicalSlotFuture()).isCompletedExceptionally();
         }
 
-        assertThat(bulkChecker.getBulk().getPendingRequests(), is(empty()));
+        assertThat(bulkChecker.getBulk().getPendingRequests()).isEmpty();
 
         final Set<SlotRequestId> requests = context.getSlotProvider().getRequests().keySet();
-        assertThat(context.getSlotProvider().getCancellations().keySet(), is(requests));
+        assertThat(context.getSlotProvider().getCancellations().keySet()).isEqualTo(requests);
     }
 
     @Test
-    public void testSlotRequestProfileFromExecutionSlotSharingGroup() {
+    void testSlotRequestProfileFromExecutionSlotSharingGroup() {
         final ResourceProfile resourceProfile1 = ResourceProfile.fromResources(1, 10);
         final ResourceProfile resourceProfile2 = ResourceProfile.fromResources(2, 20);
         final AllocationContext context =
@@ -462,14 +438,14 @@ public class SlotSharingExecutionSlotAllocatorTest extends TestLogger {
                         .build();
 
         context.allocateSlotsFor(EV1, EV2);
-        assertThat(context.getSlotProvider().getRequests().values().size(), is(2));
+        assertThat(context.getSlotProvider().getRequests()).hasSize(2);
 
         assertThat(
-                context.getSlotProvider().getRequests().values().stream()
-                        .map(PhysicalSlotRequest::getSlotProfile)
-                        .map(SlotProfile::getPhysicalSlotResourceProfile)
-                        .collect(Collectors.toList()),
-                containsInAnyOrder(resourceProfile1, resourceProfile2));
+                        context.getSlotProvider().getRequests().values().stream()
+                                .map(PhysicalSlotRequest::getSlotProfile)
+                                .map(SlotProfile::getPhysicalSlotResourceProfile)
+                                .collect(Collectors.toList()))
+                .containsExactlyInAnyOrder(resourceProfile1, resourceProfile2);
     }
 
     private static List<ExecutionVertexID> getAssignIds(
@@ -495,7 +471,7 @@ public class SlotSharingExecutionSlotAllocatorTest extends TestLogger {
             AllocationContext context, AllocationID allocationId) {
         Map<SlotRequestId, PhysicalSlotRequest> requests = context.getSlotProvider().getRequests();
         List<SlotRequestId> slotRequestIds = new ArrayList<>(requests.keySet());
-        assertThat(slotRequestIds, hasSize(2));
+        assertThat(slotRequestIds).hasSize(2);
         SlotRequestId slotRequestId1 = slotRequestIds.get(0);
         SlotRequestId slotRequestId2 = slotRequestIds.get(1);
         context.getSlotProvider()


[flink] 05/05: [FLINK-28137][runtime] Enable SimpleExecutionSlotAllocator to do batch slot request timeout check

Posted by zh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

zhuzh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 265612c2cf93a589d87d7fc8ca168bc19d838885
Author: Zhu Zhu <re...@gmail.com>
AuthorDate: Mon Jul 11 18:44:22 2022 +0800

    [FLINK-28137][runtime] Enable SimpleExecutionSlotAllocator to do batch slot request timeout check
---
 .../slotpool/DeclarativeSlotPoolBridge.java        |  5 ++--
 .../jobmaster/slotpool/PhysicalSlotProvider.java   |  6 +++++
 .../slotpool/PhysicalSlotProviderImpl.java         |  4 ++++
 .../SlotSharingExecutionSlotAllocator.java         |  2 ++
 ...erImplWithDefaultSlotSelectionStrategyTest.java |  6 +++--
 ...lSlotProviderImplWithSpreadOutStrategyTest.java |  6 +++--
 .../slotpool/SlotPoolBatchSlotRequestTest.java     | 28 ----------------------
 .../SimpleExecutionSlotAllocatorTest.java          |  6 +++++
 .../SlotSharingExecutionSlotAllocatorTest.java     |  6 +++++
 .../scheduler/TestingPhysicalSlotProvider.java     | 11 +++++++++
 10 files changed, 46 insertions(+), 34 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridge.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridge.java
index 6391657b18a..c7ae507ad6c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridge.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridge.java
@@ -397,8 +397,9 @@ public class DeclarativeSlotPoolBridge extends DeclarativeSlotPoolService implem
     }
 
     private void failPendingRequests(Collection<ResourceRequirement> acquiredResources) {
-        Predicate<PendingRequest> predicate =
-                request -> !isBatchSlotRequestTimeoutCheckDisabled || !request.isBatchRequest();
+        // only fails streaming requests because batch jobs do not require all resources
+        // requirements to be fullfilled at the same time
+        Predicate<PendingRequest> predicate = request -> !request.isBatchRequest();
         if (pendingRequests.values().stream().anyMatch(predicate)) {
             log.warn(
                     "Could not acquire the minimum required resources, failing slot requests. Acquired: {}. Current slot pool status: {}",
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotProvider.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotProvider.java
index 83b2aa5567f..bab5693dcbc 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotProvider.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotProvider.java
@@ -46,4 +46,10 @@ public interface PhysicalSlotProvider {
      * @param cause of the cancellation
      */
     void cancelSlotRequest(SlotRequestId slotRequestId, Throwable cause);
+
+    /**
+     * Disables batch slot request timeout check. Invoked when someone else wants to take over the
+     * timeout check responsibility.
+     */
+    void disableBatchSlotRequestTimeoutCheck();
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotProviderImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotProviderImpl.java
index 433cf89c6db..270b8749cf4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotProviderImpl.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotProviderImpl.java
@@ -45,6 +45,10 @@ public class PhysicalSlotProviderImpl implements PhysicalSlotProvider {
             SlotSelectionStrategy slotSelectionStrategy, SlotPool slotPool) {
         this.slotSelectionStrategy = checkNotNull(slotSelectionStrategy);
         this.slotPool = checkNotNull(slotPool);
+    }
+
+    @Override
+    public void disableBatchSlotRequestTimeoutCheck() {
         slotPool.disableBatchSlotRequestTimeoutCheck();
     }
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SlotSharingExecutionSlotAllocator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SlotSharingExecutionSlotAllocator.java
index 67c8b3bb970..f1995a7062a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SlotSharingExecutionSlotAllocator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SlotSharingExecutionSlotAllocator.java
@@ -95,6 +95,8 @@ class SlotSharingExecutionSlotAllocator implements ExecutionSlotAllocator {
         this.allocationTimeout = checkNotNull(allocationTimeout);
         this.resourceProfileRetriever = checkNotNull(resourceProfileRetriever);
         this.sharedSlots = new IdentityHashMap<>();
+
+        this.slotProvider.disableBatchSlotRequestTimeoutCheck();
     }
 
     @Override
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotProviderImplWithDefaultSlotSelectionStrategyTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotProviderImplWithDefaultSlotSelectionStrategyTest.java
index 560a26a3d1a..a29a698cbe0 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotProviderImplWithDefaultSlotSelectionStrategyTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotProviderImplWithDefaultSlotSelectionStrategyTest.java
@@ -71,8 +71,10 @@ public class PhysicalSlotProviderImplWithDefaultSlotSelectionStrategyTest extend
                         .buildAndStart(physicalSlotProviderResource.getMainThreadExecutor());
         assertThat(slotPool.isBatchSlotRequestTimeoutCheckEnabled(), is(true));
 
-        new PhysicalSlotProviderImpl(
-                LocationPreferenceSlotSelectionStrategy.createDefault(), slotPool);
+        final PhysicalSlotProvider slotProvider =
+                new PhysicalSlotProviderImpl(
+                        LocationPreferenceSlotSelectionStrategy.createDefault(), slotPool);
+        slotProvider.disableBatchSlotRequestTimeoutCheck();
         assertThat(slotPool.isBatchSlotRequestTimeoutCheckEnabled(), is(false));
     }
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotProviderImplWithSpreadOutStrategyTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotProviderImplWithSpreadOutStrategyTest.java
index 1d77b9ed794..a1d2edfb9e3 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotProviderImplWithSpreadOutStrategyTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotProviderImplWithSpreadOutStrategyTest.java
@@ -115,8 +115,10 @@ public class PhysicalSlotProviderImplWithSpreadOutStrategyTest extends TestLogge
                         .buildAndStart(physicalSlotProviderResource.getMainThreadExecutor());
         assertThat(slotPool.isBatchSlotRequestTimeoutCheckEnabled(), is(true));
 
-        new PhysicalSlotProviderImpl(
-                LocationPreferenceSlotSelectionStrategy.createEvenlySpreadOut(), slotPool);
+        final PhysicalSlotProvider slotProvider =
+                new PhysicalSlotProviderImpl(
+                        LocationPreferenceSlotSelectionStrategy.createEvenlySpreadOut(), slotPool);
+        slotProvider.disableBatchSlotRequestTimeoutCheck();
         assertThat(slotPool.isBatchSlotRequestTimeoutCheckEnabled(), is(false));
     }
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolBatchSlotRequestTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolBatchSlotRequestTest.java
index fb95adc47df..718cf87c1c8 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolBatchSlotRequestTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolBatchSlotRequestTest.java
@@ -41,7 +41,6 @@ import javax.annotation.Nullable;
 
 import java.time.Duration;
 import java.util.Arrays;
-import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
@@ -134,33 +133,6 @@ public class SlotPoolBatchSlotRequestTest extends TestLogger {
         }
     }
 
-    /**
-     * Tests that a batch slot request does react to {@link
-     * SlotPoolService#notifyNotEnoughResourcesAvailable}.
-     */
-    @Test
-    public void testPendingBatchSlotRequestFailsIfAllocationFailsUnfulfillably() throws Exception {
-        final TestingResourceManagerGateway testingResourceManagerGateway =
-                new TestingResourceManagerGateway();
-
-        try (final DeclarativeSlotPoolBridge slotPool =
-                new DeclarativeSlotPoolBridgeBuilder()
-                        .setResourceManagerGateway(testingResourceManagerGateway)
-                        .buildAndStart(mainThreadExecutor)) {
-
-            final CompletableFuture<PhysicalSlot> slotFuture =
-                    SlotPoolUtils.requestNewAllocatedBatchSlot(
-                            slotPool, mainThreadExecutor, resourceProfile);
-
-            SlotPoolUtils.notifyNotEnoughResourcesAvailable(
-                    slotPool, mainThreadExecutor, Collections.emptyList());
-
-            assertThat(
-                    slotFuture,
-                    FlinkMatchers.futureWillCompleteExceptionally(Duration.ofSeconds(10L)));
-        }
-    }
-
     /**
      * Tests that a batch slot request won't fail if its resource manager request fails with
      * exceptions other than {@link UnfulfillableSlotRequestException}.
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SimpleExecutionSlotAllocatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SimpleExecutionSlotAllocatorTest.java
index 1fd6ca850f6..033ff2a1d26 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SimpleExecutionSlotAllocatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SimpleExecutionSlotAllocatorTest.java
@@ -226,6 +226,12 @@ class SimpleExecutionSlotAllocatorTest {
                 .isEqualTo(context.getSlotProvider().getRequests().keySet());
     }
 
+    @Test
+    void testSlotProviderBatchSlotRequestTimeoutCheckIsEnabled() {
+        final AllocationContext context = new AllocationContext();
+        assertThat(context.getSlotProvider().isBatchSlotRequestTimeoutCheckEnabled()).isTrue();
+    }
+
     private static class AllocationContext {
         private final TestingPhysicalSlotProvider slotProvider;
         private final boolean slotWillBeOccupiedIndefinitely;
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SlotSharingExecutionSlotAllocatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SlotSharingExecutionSlotAllocatorTest.java
index 8fbbf3adc91..277869fba3d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SlotSharingExecutionSlotAllocatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SlotSharingExecutionSlotAllocatorTest.java
@@ -448,6 +448,12 @@ class SlotSharingExecutionSlotAllocatorTest {
                 .containsExactlyInAnyOrder(resourceProfile1, resourceProfile2);
     }
 
+    @Test
+    void testSlotProviderBatchSlotRequestTimeoutCheckIsDisabled() {
+        final AllocationContext context = AllocationContext.newBuilder().build();
+        assertThat(context.getSlotProvider().isBatchSlotRequestTimeoutCheckEnabled()).isFalse();
+    }
+
     private static List<ExecutionVertexID> getAssignIds(
             Collection<ExecutionSlotAssignment> assignments) {
         return assignments.stream()
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestingPhysicalSlotProvider.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestingPhysicalSlotProvider.java
index 6fd6d01ab49..2f870ae82ea 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestingPhysicalSlotProvider.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestingPhysicalSlotProvider.java
@@ -48,6 +48,8 @@ public class TestingPhysicalSlotProvider implements PhysicalSlotProvider {
     private final Function<ResourceProfile, CompletableFuture<TestingPhysicalSlot>>
             physicalSlotCreator;
 
+    private boolean batchSlotRequestTimeoutCheckEnabled = true;
+
     public static TestingPhysicalSlotProvider create(
             Function<ResourceProfile, CompletableFuture<TestingPhysicalSlot>> physicalSlotCreator) {
         return new TestingPhysicalSlotProvider(physicalSlotCreator);
@@ -126,6 +128,11 @@ public class TestingPhysicalSlotProvider implements PhysicalSlotProvider {
         cancellations.put(slotRequestId, cause);
     }
 
+    @Override
+    public void disableBatchSlotRequestTimeoutCheck() {
+        batchSlotRequestTimeoutCheckEnabled = false;
+    }
+
     public CompletableFuture<TestingPhysicalSlot> getResultForRequestId(
             SlotRequestId slotRequestId) {
         return getResponses().get(slotRequestId);
@@ -160,4 +167,8 @@ public class TestingPhysicalSlotProvider implements PhysicalSlotProvider {
         Preconditions.checkState(element.isPresent());
         return element.get();
     }
+
+    boolean isBatchSlotRequestTimeoutCheckEnabled() {
+        return batchSlotRequestTimeoutCheckEnabled;
+    }
 }


[flink] 03/05: [FLINK-28137][runtime] Introduce SpeculativeScheduler

Posted by zh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

zhuzh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 81c739ae462412e531216bb46bc567fce2355dd8
Author: Zhu Zhu <re...@gmail.com>
AuthorDate: Thu Jul 7 13:36:27 2022 +0800

    [FLINK-28137][runtime] Introduce SpeculativeScheduler
    
    This closes #20222.
---
 .../runtime/blocklist/BlocklistOperations.java     |  33 ++
 .../RestartPipelinedRegionFailoverStrategy.java    |  12 -
 .../apache/flink/runtime/jobgraph/JobVertex.java   |   7 +-
 .../DefaultSlotPoolServiceSchedulerFactory.java    |   7 +-
 .../apache/flink/runtime/jobmaster/JobMaster.java  |   3 +-
 .../jobmaster/SlotPoolServiceSchedulerFactory.java |   4 +-
 .../flink/runtime/scheduler/DefaultScheduler.java  | 137 ++++---
 .../runtime/scheduler/DefaultSchedulerFactory.java |   4 +-
 .../flink/runtime/scheduler/ExecutionDeployer.java |   2 +-
 .../scheduler/ExecutionVertexVersioner.java        |   9 +
 .../flink/runtime/scheduler/SchedulerBase.java     |  50 +--
 .../runtime/scheduler/SchedulerNGFactory.java      |   4 +-
 .../scheduler/SimpleExecutionSlotAllocator.java    | 189 +++++++++
 .../adaptive/AdaptiveSchedulerFactory.java         |   4 +-
 .../adaptivebatch/AdaptiveBatchScheduler.java      |  14 +-
 .../AdaptiveBatchSchedulerFactory.java             | 139 ++++---
 .../adaptivebatch/SpeculativeScheduler.java        | 314 +++++++++++++++
 .../runtime/jobmaster/JobMasterSchedulerTest.java  |   4 +-
 .../runtime/scheduler/DefaultSchedulerBuilder.java |  47 ++-
 .../runtime/scheduler/DefaultSchedulerTest.java    |   4 +-
 .../SimpleExecutionSlotAllocatorTest.java          | 269 +++++++++++++
 .../scheduler/TestingSchedulerNGFactory.java       |   4 +-
 .../adaptivebatch/SpeculativeSchedulerTest.java    | 426 +++++++++++++++++++++
 23 files changed, 1516 insertions(+), 170 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blocklist/BlocklistOperations.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blocklist/BlocklistOperations.java
new file mode 100644
index 00000000000..f7ef00f5f01
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blocklist/BlocklistOperations.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.blocklist;
+
+import java.util.Collection;
+
+/** Operations to perform on the blocklist. */
+public interface BlocklistOperations {
+
+    /**
+     * Add new blocked node records. If a node (identified by node id) already exists, the newly
+     * added one will be merged with the existing one.
+     *
+     * @param newNodes the new blocked node records
+     */
+    void addNewBlockedNodes(Collection<BlockedNode> newNodes);
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/RestartPipelinedRegionFailoverStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/RestartPipelinedRegionFailoverStrategy.java
index 4e977c1a7c8..6e7181ffd03 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/RestartPipelinedRegionFailoverStrategy.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/RestartPipelinedRegionFailoverStrategy.java
@@ -31,9 +31,6 @@ import org.apache.flink.runtime.scheduler.strategy.SchedulingTopology;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.IterableUtils;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -53,10 +50,6 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  */
 public class RestartPipelinedRegionFailoverStrategy implements FailoverStrategy {
 
-    /** The log object used for debugging. */
-    private static final Logger LOG =
-            LoggerFactory.getLogger(RestartPipelinedRegionFailoverStrategy.class);
-
     /** The topology containing info about all the vertices and result partitions. */
     private final SchedulingTopology topology;
 
@@ -112,7 +105,6 @@ public class RestartPipelinedRegionFailoverStrategy implements FailoverStrategy
     @Override
     public Set<ExecutionVertexID> getTasksNeedingRestart(
             ExecutionVertexID executionVertexId, Throwable cause) {
-        LOG.info("Calculating tasks to restart to recover the failed task {}.", executionVertexId);
 
         final SchedulingPipelinedRegion failedRegion =
                 topology.getPipelinedRegionOfVertex(executionVertexId);
@@ -149,10 +141,6 @@ public class RestartPipelinedRegionFailoverStrategy implements FailoverStrategy
                     dataConsumptionException.get().getPartitionId().getPartitionId());
         }
 
-        LOG.info(
-                "{} tasks should be restarted to recover the failed task {}. ",
-                tasksToRestart.size(),
-                executionVertexId);
         return tasksToRestart;
     }
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java
index af4a0189f06..36d1a1e5487 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java
@@ -308,8 +308,11 @@ public class JobVertex implements java.io.Serializable {
      * @param parallelism The parallelism for the task.
      */
     public void setParallelism(int parallelism) {
-        if (parallelism < 1) {
-            throw new IllegalArgumentException("The parallelism must be at least one.");
+        if (parallelism < 1 && parallelism != ExecutionConfig.PARALLELISM_DEFAULT) {
+            throw new IllegalArgumentException(
+                    "The parallelism must be at least one, or "
+                            + ExecutionConfig.PARALLELISM_DEFAULT
+                            + " (unset).");
         }
         this.parallelism = parallelism;
     }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/DefaultSlotPoolServiceSchedulerFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/DefaultSlotPoolServiceSchedulerFactory.java
index 0b3757a761e..f218927ebb5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/DefaultSlotPoolServiceSchedulerFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/DefaultSlotPoolServiceSchedulerFactory.java
@@ -28,6 +28,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.configuration.SchedulerExecutionMode;
 import org.apache.flink.runtime.blob.BlobWriter;
+import org.apache.flink.runtime.blocklist.BlocklistOperations;
 import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
 import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
 import org.apache.flink.runtime.executiongraph.JobStatusListener;
@@ -112,7 +113,8 @@ public final class DefaultSlotPoolServiceSchedulerFactory
             long initializationTimestamp,
             ComponentMainThreadExecutor mainThreadExecutor,
             FatalErrorHandler fatalErrorHandler,
-            JobStatusListener jobStatusListener)
+            JobStatusListener jobStatusListener,
+            BlocklistOperations blocklistOperations)
             throws Exception {
         return schedulerNGFactory.createInstance(
                 log,
@@ -133,7 +135,8 @@ public final class DefaultSlotPoolServiceSchedulerFactory
                 initializationTimestamp,
                 mainThreadExecutor,
                 fatalErrorHandler,
-                jobStatusListener);
+                jobStatusListener,
+                blocklistOperations);
     }
 
     public static DefaultSlotPoolServiceSchedulerFactory create(
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index 1d9024f50ab..25e2949182c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -385,7 +385,8 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId>
                         initializationTimestamp,
                         getMainThreadExecutor(),
                         fatalErrorHandler,
-                        jobStatusListener);
+                        jobStatusListener,
+                        blocklistHandler::addNewBlockedNodes);
 
         return scheduler;
     }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/SlotPoolServiceSchedulerFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/SlotPoolServiceSchedulerFactory.java
index d97dc577b41..df8c40c86c4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/SlotPoolServiceSchedulerFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/SlotPoolServiceSchedulerFactory.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.runtime.blob.BlobWriter;
+import org.apache.flink.runtime.blocklist.BlocklistOperations;
 import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
 import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
 import org.apache.flink.runtime.executiongraph.JobStatusListener;
@@ -85,6 +86,7 @@ public interface SlotPoolServiceSchedulerFactory {
             long initializationTimestamp,
             ComponentMainThreadExecutor mainThreadExecutor,
             FatalErrorHandler fatalErrorHandler,
-            JobStatusListener jobStatusListener)
+            JobStatusListener jobStatusListener,
+            BlocklistOperations blocklistOperations)
             throws Exception;
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java
index 521dda8b953..da6fa69b325 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java
@@ -29,11 +29,8 @@ import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.Execution;
-import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
-import org.apache.flink.runtime.executiongraph.ExecutionVertex;
 import org.apache.flink.runtime.executiongraph.JobStatusListener;
-import org.apache.flink.runtime.executiongraph.TaskExecutionStateTransition;
 import org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler;
 import org.apache.flink.runtime.executiongraph.failover.flip1.FailoverStrategy;
 import org.apache.flink.runtime.executiongraph.failover.flip1.FailureHandlingResult;
@@ -45,7 +42,6 @@ import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
 import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
 import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup;
-import org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder;
 import org.apache.flink.runtime.scheduler.exceptionhistory.FailureHandlingResultSnapshot;
 import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
 import org.apache.flink.runtime.scheduler.strategy.SchedulingStrategy;
@@ -53,8 +49,6 @@ import org.apache.flink.runtime.scheduler.strategy.SchedulingStrategyFactory;
 import org.apache.flink.runtime.scheduler.strategy.SchedulingTopology;
 import org.apache.flink.runtime.shuffle.ShuffleMaster;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
-import org.apache.flink.runtime.topology.Vertex;
-import org.apache.flink.util.IterableUtils;
 import org.apache.flink.util.concurrent.FutureUtils;
 import org.apache.flink.util.concurrent.ScheduledExecutor;
 
@@ -76,7 +70,9 @@ import java.util.concurrent.TimeUnit;
 import java.util.function.Consumer;
 import java.util.stream.Collectors;
 
+import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
 
 /** The future default scheduler. */
 public class DefaultScheduler extends SchedulerBase implements SchedulerOperations {
@@ -85,13 +81,13 @@ public class DefaultScheduler extends SchedulerBase implements SchedulerOperatio
 
     private final ClassLoader userCodeLoader;
 
-    private final ExecutionSlotAllocator executionSlotAllocator;
+    protected final ExecutionSlotAllocator executionSlotAllocator;
 
     private final ExecutionFailureHandler executionFailureHandler;
 
     private final ScheduledExecutor delayExecutor;
 
-    protected final SchedulingStrategy schedulingStrategy;
+    private final SchedulingStrategy schedulingStrategy;
 
     private final ExecutionOperations executionOperations;
 
@@ -106,7 +102,7 @@ public class DefaultScheduler extends SchedulerBase implements SchedulerOperatio
     // anymore. The reserved allocation information is needed for local recovery.
     private final Map<ExecutionVertexID, AllocationID> reservedAllocationByExecutionVertex;
 
-    private final ExecutionDeployer executionDeployer;
+    protected final ExecutionDeployer executionDeployer;
 
     protected DefaultScheduler(
             final Logger log,
@@ -204,10 +200,9 @@ public class DefaultScheduler extends SchedulerBase implements SchedulerOperatio
 
     @Override
     protected void cancelAllPendingSlotRequestsInternal() {
-        IterableUtils.toStream(getSchedulingTopology().getVertices())
-                .map(Vertex::getId)
-                .map(this::getCurrentExecutionIdOfVertex)
-                .forEach(executionSlotAllocator::cancel);
+        getSchedulingTopology()
+                .getVertices()
+                .forEach(ev -> cancelAllPendingSlotRequestsForVertex(ev.getId()));
     }
 
     @Override
@@ -220,47 +215,46 @@ public class DefaultScheduler extends SchedulerBase implements SchedulerOperatio
     }
 
     @Override
-    protected void updateTaskExecutionStateInternal(
-            final ExecutionVertexID executionVertexId,
-            final TaskExecutionStateTransition taskExecutionState) {
+    protected void onTaskFinished(final Execution execution) {
+        checkState(execution.getState() == ExecutionState.FINISHED);
 
+        final ExecutionVertexID executionVertexId = execution.getVertex().getID();
         // once a task finishes, it will release the assigned allocation/slot and no longer
         // needs it. Therefore, it should stop reserving the slot so that other tasks are
         // possible to use the slot. Ideally, the `stopReserveAllocation` should happen
         // along with the release slot process. However, that process is hidden in the depth
         // of the ExecutionGraph, so we currently do it in DefaultScheduler after that process
         // is done.
-        if (taskExecutionState.getExecutionState() == ExecutionState.FINISHED) {
-            stopReserveAllocation(executionVertexId);
-        }
+        stopReserveAllocation(executionVertexId);
 
-        schedulingStrategy.onExecutionStateChange(
-                executionVertexId, taskExecutionState.getExecutionState());
-        maybeHandleTaskFailure(taskExecutionState, getCurrentExecutionOfVertex(executionVertexId));
+        schedulingStrategy.onExecutionStateChange(executionVertexId, ExecutionState.FINISHED);
     }
 
-    private void maybeHandleTaskFailure(
-            final TaskExecutionStateTransition taskExecutionState, final Execution execution) {
+    @Override
+    protected void onTaskFailed(final Execution execution) {
+        checkState(execution.getState() == ExecutionState.FAILED);
+        checkState(execution.getFailureInfo().isPresent());
+
+        final Throwable error =
+                execution.getFailureInfo().get().getException().deserializeError(userCodeLoader);
+        handleTaskFailure(
+                execution,
+                maybeTranslateToCachedIntermediateDataSetException(
+                        error, execution.getVertex().getID()));
+    }
 
-        if (taskExecutionState.getExecutionState() == ExecutionState.FAILED) {
-            final Throwable error = taskExecutionState.getError(userCodeLoader);
-            handleTaskFailure(execution, error);
-        }
+    protected void handleTaskFailure(
+            final Execution failedExecution, @Nullable final Throwable error) {
+        maybeRestartTasks(recordTaskFailure(failedExecution, error));
     }
 
-    private void handleTaskFailure(
+    protected FailureHandlingResult recordTaskFailure(
             final Execution failedExecution, @Nullable final Throwable error) {
-        Throwable revisedError =
-                maybeTranslateToCachedIntermediateDataSetException(
-                        error, failedExecution.getVertex().getID());
         final long timestamp = System.currentTimeMillis();
-        setGlobalFailureCause(revisedError, timestamp);
-        notifyCoordinatorsAboutTaskFailure(failedExecution.getVertex().getID(), revisedError);
+        setGlobalFailureCause(error, timestamp);
+        notifyCoordinatorsAboutTaskFailure(failedExecution, error);
 
-        final FailureHandlingResult failureHandlingResult =
-                executionFailureHandler.getFailureHandlingResult(
-                        failedExecution, revisedError, timestamp);
-        maybeRestartTasks(failureHandlingResult);
+        return executionFailureHandler.getFailureHandlingResult(failedExecution, error, timestamp);
     }
 
     private Throwable maybeTranslateToCachedIntermediateDataSetException(
@@ -286,10 +280,9 @@ public class DefaultScheduler extends SchedulerBase implements SchedulerOperatio
     }
 
     private void notifyCoordinatorsAboutTaskFailure(
-            final ExecutionVertexID executionVertexId, @Nullable final Throwable error) {
-        final ExecutionJobVertex jobVertex =
-                getExecutionJobVertex(executionVertexId.getJobVertexId());
-        final int subtaskIndex = executionVertexId.getSubtaskIndex();
+            final Execution execution, @Nullable final Throwable error) {
+        final ExecutionJobVertex jobVertex = execution.getVertex().getJobVertex();
+        final int subtaskIndex = execution.getParallelSubtaskIndex();
 
         jobVertex.getOperatorCoordinators().forEach(c -> c.subtaskFailed(subtaskIndex, error));
     }
@@ -324,13 +317,24 @@ public class DefaultScheduler extends SchedulerBase implements SchedulerOperatio
                                 .values());
         final boolean globalRecovery = failureHandlingResult.isGlobalFailure();
 
+        if (globalRecovery) {
+            log.info(
+                    "{} tasks will be restarted to recover from a global failure.",
+                    verticesToRestart.size());
+        } else {
+            checkArgument(failureHandlingResult.getFailedExecution().isPresent());
+            log.info(
+                    "{} tasks will be restarted to recover the failed task {}.",
+                    verticesToRestart.size(),
+                    failureHandlingResult.getFailedExecution().get().getAttemptId());
+        }
+
         addVerticesToRestartPending(verticesToRestart);
 
         final CompletableFuture<?> cancelFuture = cancelTasksAsync(verticesToRestart);
 
         final FailureHandlingResultSnapshot failureHandlingResultSnapshot =
-                FailureHandlingResultSnapshot.create(
-                        failureHandlingResult, id -> getExecutionVertex(id).getCurrentExecutions());
+                createFailureHandlingResultSnapshot(failureHandlingResult);
         delayExecutor.schedule(
                 () ->
                         FutureUtils.assertNoException(
@@ -345,6 +349,12 @@ public class DefaultScheduler extends SchedulerBase implements SchedulerOperatio
                 TimeUnit.MILLISECONDS);
     }
 
+    protected FailureHandlingResultSnapshot createFailureHandlingResultSnapshot(
+            final FailureHandlingResult failureHandlingResult) {
+        return FailureHandlingResultSnapshot.create(
+                failureHandlingResult, id -> getExecutionVertex(id).getCurrentExecutions());
+    }
+
     private void addVerticesToRestartPending(final Set<ExecutionVertexID> verticesToRestart) {
         verticesWaitingForRestart.addAll(verticesToRestart);
         transitionExecutionGraphState(JobStatus.RUNNING, JobStatus.RESTARTING);
@@ -380,9 +390,7 @@ public class DefaultScheduler extends SchedulerBase implements SchedulerOperatio
     private CompletableFuture<?> cancelTasksAsync(final Set<ExecutionVertexID> verticesToRestart) {
         // clean up all the related pending requests to avoid that immediately returned slot
         // is used to fulfill the pending requests of these tasks
-        verticesToRestart.stream()
-                .map(this::getCurrentExecutionIdOfVertex)
-                .forEach(executionSlotAllocator::cancel);
+        cancelAllPendingSlotRequestsForVertices(verticesToRestart);
 
         final List<CompletableFuture<?>> cancelFutures =
                 verticesToRestart.stream()
@@ -393,15 +401,27 @@ public class DefaultScheduler extends SchedulerBase implements SchedulerOperatio
     }
 
     private CompletableFuture<?> cancelExecutionVertex(final ExecutionVertexID executionVertexId) {
-        final ExecutionVertex vertex = getExecutionVertex(executionVertexId);
+        return FutureUtils.combineAll(
+                getExecutionVertex(executionVertexId).getCurrentExecutions().stream()
+                        .map(this::cancelExecution)
+                        .collect(Collectors.toList()));
+    }
 
-        notifyCoordinatorOfCancellation(vertex);
+    protected CompletableFuture<?> cancelExecution(final Execution execution) {
+        notifyCoordinatorOfCancellation(execution);
+        return executionOperations.cancel(execution);
+    }
 
-        return executionOperations.cancel(vertex.getCurrentExecutionAttempt());
+    private void cancelAllPendingSlotRequestsForVertices(
+            final Set<ExecutionVertexID> executionVertices) {
+        executionVertices.forEach(this::cancelAllPendingSlotRequestsForVertex);
     }
 
-    private ExecutionAttemptID getCurrentExecutionIdOfVertex(ExecutionVertexID executionVertexId) {
-        return getCurrentExecutionOfVertex(executionVertexId).getAttemptId();
+    protected void cancelAllPendingSlotRequestsForVertex(
+            final ExecutionVertexID executionVertexId) {
+        getExecutionVertex(executionVertexId)
+                .getCurrentExecutions()
+                .forEach(e -> executionSlotAllocator.cancel(e.getAttemptId()));
     }
 
     private Execution getCurrentExecutionOfVertex(ExecutionVertexID executionVertexId) {
@@ -445,23 +465,18 @@ public class DefaultScheduler extends SchedulerBase implements SchedulerOperatio
         }
     }
 
-    private void notifyCoordinatorOfCancellation(ExecutionVertex vertex) {
+    private void notifyCoordinatorOfCancellation(Execution execution) {
         // this method makes a best effort to filter out duplicate notifications, meaning cases
-        // where
-        // the coordinator was already notified for that specific task
+        // where the coordinator was already notified for that specific task
         // we don't notify if the task is already FAILED, CANCELLING, or CANCELED
-
-        final ExecutionState currentState = vertex.getExecutionState();
+        final ExecutionState currentState = execution.getState();
         if (currentState == ExecutionState.FAILED
                 || currentState == ExecutionState.CANCELING
                 || currentState == ExecutionState.CANCELED) {
             return;
         }
 
-        for (OperatorCoordinatorHolder coordinator :
-                vertex.getJobVertex().getOperatorCoordinators()) {
-            coordinator.subtaskFailed(vertex.getParallelSubtaskIndex(), null);
-        }
+        notifyCoordinatorsAboutTaskFailure(execution, null);
     }
 
     private class DefaultExecutionSlotAllocationContext implements ExecutionSlotAllocationContext {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultSchedulerFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultSchedulerFactory.java
index de7d56315bd..72893347982 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultSchedulerFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultSchedulerFactory.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.runtime.blob.BlobWriter;
+import org.apache.flink.runtime.blocklist.BlocklistOperations;
 import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
 import org.apache.flink.runtime.checkpoint.CheckpointsCleaner;
 import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
@@ -72,7 +73,8 @@ public class DefaultSchedulerFactory implements SchedulerNGFactory {
             long initializationTimestamp,
             final ComponentMainThreadExecutor mainThreadExecutor,
             final FatalErrorHandler fatalErrorHandler,
-            final JobStatusListener jobStatusListener)
+            final JobStatusListener jobStatusListener,
+            final BlocklistOperations blocklistOperations)
             throws Exception {
 
         final SlotPool slotPool =
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionDeployer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionDeployer.java
index adb5ff76c87..a42eb97b644 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionDeployer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionDeployer.java
@@ -32,7 +32,7 @@ import java.util.Map;
 import java.util.function.BiConsumer;
 
 /** This deployer is responsible for deploying executions. */
-interface ExecutionDeployer {
+public interface ExecutionDeployer {
 
     /**
      * Allocate slots and deploy executions.
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionVertexVersioner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionVertexVersioner.java
index 42e5f3387c2..b0a0b17db0d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionVertexVersioner.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionVertexVersioner.java
@@ -83,6 +83,15 @@ public class ExecutionVertexVersioner {
                 .collect(Collectors.toSet());
     }
 
+    public Map<ExecutionVertexID, ExecutionVertexVersion> getExecutionVertexVersions(
+            Collection<ExecutionVertexID> executionVertexIds) {
+        return executionVertexIds.stream()
+                .map(id -> new ExecutionVertexVersion(id, getCurrentVersion(id)))
+                .collect(
+                        Collectors.toMap(
+                                ExecutionVertexVersion::getExecutionVertexId, Function.identity()));
+    }
+
     ExecutionVertexVersion getExecutionVertexVersion(ExecutionVertexID executionVertexId) {
         final long currentVersion = getCurrentVersion(executionVertexId);
         return new ExecutionVertexVersion(executionVertexId, currentVersion);
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
index 53547ab4104..f240dce094c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
@@ -706,49 +706,41 @@ public abstract class SchedulerBase implements SchedulerNG, CheckpointScheduling
     @Override
     public final boolean updateTaskExecutionState(
             final TaskExecutionStateTransition taskExecutionState) {
-        final ExecutionVertexID executionVertexId =
-                taskExecutionState.getID().getExecutionVertexId();
 
-        boolean updateSuccess = executionGraph.updateState(taskExecutionState);
-
-        if (updateSuccess) {
-            if (isNotifiable(executionVertexId, taskExecutionState)) {
-                updateTaskExecutionStateInternal(executionVertexId, taskExecutionState);
-            }
+        final ExecutionAttemptID attemptId = taskExecutionState.getID();
+        final Execution execution = executionGraph.getRegisteredExecutions().get(attemptId);
+        if (execution != null && executionGraph.updateState(taskExecutionState)) {
+            onTaskExecutionStateUpdate(execution, taskExecutionState);
             return true;
-        } else {
-            return false;
         }
+
+        return false;
     }
 
-    private boolean isNotifiable(
-            final ExecutionVertexID executionVertexId,
-            final TaskExecutionStateTransition taskExecutionState) {
+    private void onTaskExecutionStateUpdate(
+            final Execution execution, final TaskExecutionStateTransition taskExecutionState) {
 
-        final ExecutionVertex executionVertex = getExecutionVertex(executionVertexId);
+        // only notifies a state update if it's effective, namely it successfully
+        // turns the execution state to the expected value.
+        if (execution.getState() != taskExecutionState.getExecutionState()) {
+            return;
+        }
 
         // only notifies FINISHED and FAILED states which are needed at the moment.
-        // can be refined in FLINK-14233 after the legacy scheduler is removed and
-        // the actions are factored out from ExecutionGraph.
+        // can be refined in FLINK-14233 after the actions are factored out from ExecutionGraph.
         switch (taskExecutionState.getExecutionState()) {
             case FINISHED:
-            case FAILED:
-                // only notifies a state update if it's effective, namely it successfully
-                // turns the execution state to the expected value.
-                if (executionVertex.getExecutionState() == taskExecutionState.getExecutionState()) {
-                    return true;
-                }
+                onTaskFinished(execution);
                 break;
-            default:
+            case FAILED:
+                onTaskFailed(execution);
                 break;
         }
-
-        return false;
     }
 
-    protected void updateTaskExecutionStateInternal(
-            final ExecutionVertexID executionVertexId,
-            final TaskExecutionStateTransition taskExecutionState) {}
+    protected abstract void onTaskFinished(final Execution execution);
+
+    protected abstract void onTaskFailed(final Execution execution);
 
     @Override
     public SerializedInputSplit requestNextInputSplit(
@@ -770,7 +762,7 @@ public abstract class SchedulerBase implements SchedulerNG, CheckpointScheduling
     }
 
     @VisibleForTesting
-    Iterable<RootExceptionHistoryEntry> getExceptionHistory() {
+    public Iterable<RootExceptionHistoryEntry> getExceptionHistory() {
         return exceptionHistory.toArrayList();
     }
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerNGFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerNGFactory.java
index 27b92b3d8a8..d3809b6b353 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerNGFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerNGFactory.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.runtime.blob.BlobWriter;
+import org.apache.flink.runtime.blocklist.BlocklistOperations;
 import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
 import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
 import org.apache.flink.runtime.executiongraph.JobStatusListener;
@@ -61,7 +62,8 @@ public interface SchedulerNGFactory {
             long initializationTimestamp,
             ComponentMainThreadExecutor mainThreadExecutor,
             FatalErrorHandler fatalErrorHandler,
-            JobStatusListener jobStatusListener)
+            JobStatusListener jobStatusListener,
+            BlocklistOperations blocklistOperations)
             throws Exception;
 
     JobManagerOptions.SchedulerType getSchedulerType();
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SimpleExecutionSlotAllocator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SimpleExecutionSlotAllocator.java
new file mode 100644
index 00000000000..e5d6d8ad1e7
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SimpleExecutionSlotAllocator.java
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.clusterframework.types.SlotProfile;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.jobmanager.scheduler.Locality;
+import org.apache.flink.runtime.jobmaster.LogicalSlot;
+import org.apache.flink.runtime.jobmaster.SlotRequestId;
+import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlot;
+import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotProvider;
+import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotRequest;
+import org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot;
+import org.apache.flink.runtime.util.DualKeyLinkedMap;
+import org.apache.flink.util.FlinkException;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A simple implementation of {@link ExecutionSlotAllocator}. No support for slot sharing,
+ * co-location, state/input locality, nor local recovery.
+ */
+public class SimpleExecutionSlotAllocator implements ExecutionSlotAllocator {
+    private final PhysicalSlotProvider slotProvider;
+
+    private final boolean slotWillBeOccupiedIndefinitely;
+
+    private final Function<ExecutionAttemptID, ResourceProfile> resourceProfileRetriever;
+
+    private final DualKeyLinkedMap<
+                    ExecutionAttemptID, SlotRequestId, CompletableFuture<LogicalSlot>>
+            requestedPhysicalSlots;
+
+    SimpleExecutionSlotAllocator(
+            PhysicalSlotProvider slotProvider,
+            Function<ExecutionAttemptID, ResourceProfile> resourceProfileRetriever,
+            boolean slotWillBeOccupiedIndefinitely) {
+        this.slotProvider = checkNotNull(slotProvider);
+        this.slotWillBeOccupiedIndefinitely = slotWillBeOccupiedIndefinitely;
+        this.resourceProfileRetriever = checkNotNull(resourceProfileRetriever);
+        this.requestedPhysicalSlots = new DualKeyLinkedMap<>();
+    }
+
+    @Override
+    public List<ExecutionSlotAssignment> allocateSlotsFor(
+            List<ExecutionAttemptID> executionAttemptIds) {
+        return executionAttemptIds.stream()
+                .map(id -> new ExecutionSlotAssignment(id, allocateSlotFor(id)))
+                .collect(Collectors.toList());
+    }
+
+    private CompletableFuture<LogicalSlot> allocateSlotFor(ExecutionAttemptID executionAttemptId) {
+        if (requestedPhysicalSlots.containsKeyA(executionAttemptId)) {
+            return requestedPhysicalSlots.getValueByKeyA(executionAttemptId);
+        }
+        final SlotRequestId slotRequestId = new SlotRequestId();
+        final ResourceProfile resourceProfile = resourceProfileRetriever.apply(executionAttemptId);
+        final SlotProfile slotProfile =
+                SlotProfile.priorAllocation(
+                        resourceProfile,
+                        resourceProfile,
+                        Collections.emptyList(),
+                        Collections.emptyList(),
+                        Collections.emptySet());
+        final PhysicalSlotRequest request =
+                new PhysicalSlotRequest(slotRequestId, slotProfile, slotWillBeOccupiedIndefinitely);
+        final CompletableFuture<LogicalSlot> slotFuture =
+                slotProvider
+                        .allocatePhysicalSlot(request)
+                        .thenApply(
+                                physicalSlotRequest ->
+                                        allocateLogicalSlotFromPhysicalSlot(
+                                                slotRequestId,
+                                                physicalSlotRequest.getPhysicalSlot(),
+                                                slotWillBeOccupiedIndefinitely));
+        slotFuture.exceptionally(
+                throwable -> {
+                    this.requestedPhysicalSlots.removeKeyA(executionAttemptId);
+                    this.slotProvider.cancelSlotRequest(slotRequestId, throwable);
+                    return null;
+                });
+        this.requestedPhysicalSlots.put(executionAttemptId, slotRequestId, slotFuture);
+        return slotFuture;
+    }
+
+    @Override
+    public void cancel(ExecutionAttemptID executionAttemptId) {
+        final CompletableFuture<LogicalSlot> slotFuture =
+                this.requestedPhysicalSlots.getValueByKeyA(executionAttemptId);
+        if (slotFuture != null) {
+            slotFuture.cancel(false);
+        }
+    }
+
+    private void returnLogicalSlot(LogicalSlot slot) {
+        releaseSlot(
+                slot,
+                new FlinkException("Slot is being returned from SimpleExecutionSlotAllocator."));
+    }
+
+    private void releaseSlot(LogicalSlot slot, Throwable cause) {
+        requestedPhysicalSlots.removeKeyB(slot.getSlotRequestId());
+        slotProvider.cancelSlotRequest(slot.getSlotRequestId(), cause);
+    }
+
+    private LogicalSlot allocateLogicalSlotFromPhysicalSlot(
+            final SlotRequestId slotRequestId,
+            final PhysicalSlot physicalSlot,
+            final boolean slotWillBeOccupiedIndefinitely) {
+
+        final SingleLogicalSlot singleLogicalSlot =
+                new SingleLogicalSlot(
+                        slotRequestId,
+                        physicalSlot,
+                        Locality.UNKNOWN,
+                        this::returnLogicalSlot,
+                        slotWillBeOccupiedIndefinitely);
+
+        final LogicalSlotHolder logicalSlotHolder = new LogicalSlotHolder(singleLogicalSlot);
+        if (physicalSlot.tryAssignPayload(logicalSlotHolder)) {
+            return singleLogicalSlot;
+        } else {
+            throw new IllegalStateException(
+                    "BUG: Unexpected physical slot payload assignment failure!");
+        }
+    }
+
+    private class LogicalSlotHolder implements PhysicalSlot.Payload {
+        private final SingleLogicalSlot logicalSlot;
+
+        private LogicalSlotHolder(SingleLogicalSlot logicalSlot) {
+            this.logicalSlot = checkNotNull(logicalSlot);
+        }
+
+        @Override
+        public void release(Throwable cause) {
+            logicalSlot.release(cause);
+            releaseSlot(logicalSlot, new FlinkException("Physical slot releases its payload."));
+        }
+
+        @Override
+        public boolean willOccupySlotIndefinitely() {
+            return logicalSlot.willOccupySlotIndefinitely();
+        }
+    }
+
+    /** Factory to instantiate a {@link SimpleExecutionSlotAllocator}. */
+    public static class Factory implements ExecutionSlotAllocatorFactory {
+        private final PhysicalSlotProvider slotProvider;
+
+        private final boolean slotWillBeOccupiedIndefinitely;
+
+        public Factory(PhysicalSlotProvider slotProvider, boolean slotWillBeOccupiedIndefinitely) {
+            this.slotProvider = slotProvider;
+            this.slotWillBeOccupiedIndefinitely = slotWillBeOccupiedIndefinitely;
+        }
+
+        @Override
+        public ExecutionSlotAllocator createInstance(ExecutionSlotAllocationContext context) {
+            return new SimpleExecutionSlotAllocator(
+                    slotProvider,
+                    id -> context.getResourceProfile(id.getExecutionVertexId()),
+                    slotWillBeOccupiedIndefinitely);
+        }
+    }
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerFactory.java
index 61236a1c18e..2d59d56466d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerFactory.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.runtime.blob.BlobWriter;
+import org.apache.flink.runtime.blocklist.BlocklistOperations;
 import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
 import org.apache.flink.runtime.checkpoint.CheckpointsCleaner;
 import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
@@ -80,7 +81,8 @@ public class AdaptiveSchedulerFactory implements SchedulerNGFactory {
             long initializationTimestamp,
             ComponentMainThreadExecutor mainThreadExecutor,
             FatalErrorHandler fatalErrorHandler,
-            JobStatusListener jobStatusListener)
+            JobStatusListener jobStatusListener,
+            BlocklistOperations blocklistOperations)
             throws Exception {
         final DeclarativeSlotPool declarativeSlotPool =
                 slotPoolService
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchScheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchScheduler.java
index f744d68f28b..29d6bfa72a1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchScheduler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchScheduler.java
@@ -26,10 +26,10 @@ import org.apache.flink.runtime.JobException;
 import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
 import org.apache.flink.runtime.checkpoint.CheckpointsCleaner;
 import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import org.apache.flink.runtime.executiongraph.Execution;
 import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
 import org.apache.flink.runtime.executiongraph.IntermediateResult;
 import org.apache.flink.runtime.executiongraph.JobStatusListener;
-import org.apache.flink.runtime.executiongraph.TaskExecutionStateTransition;
 import org.apache.flink.runtime.executiongraph.failover.flip1.FailoverStrategy;
 import org.apache.flink.runtime.executiongraph.failover.flip1.RestartBackoffTimeStrategy;
 import org.apache.flink.runtime.jobgraph.JobEdge;
@@ -51,7 +51,6 @@ import org.apache.flink.runtime.scheduler.SchedulerOperations;
 import org.apache.flink.runtime.scheduler.VertexParallelismStore;
 import org.apache.flink.runtime.scheduler.adaptivebatch.forwardgroup.ForwardGroup;
 import org.apache.flink.runtime.scheduler.adaptivebatch.forwardgroup.ForwardGroupComputeUtil;
-import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
 import org.apache.flink.runtime.scheduler.strategy.SchedulingStrategyFactory;
 import org.apache.flink.runtime.shuffle.ShuffleMaster;
 import org.apache.flink.util.concurrent.ScheduledExecutor;
@@ -145,23 +144,20 @@ public class AdaptiveBatchScheduler extends DefaultScheduler implements Schedule
     }
 
     @Override
-    public void startSchedulingInternal() {
+    protected void startSchedulingInternal() {
         initializeVerticesIfPossible();
 
         super.startSchedulingInternal();
     }
 
     @Override
-    protected void updateTaskExecutionStateInternal(
-            final ExecutionVertexID executionVertexId,
-            final TaskExecutionStateTransition taskExecutionState) {
-
+    protected void onTaskFinished(final Execution execution) {
         initializeVerticesIfPossible();
 
-        super.updateTaskExecutionStateInternal(executionVertexId, taskExecutionState);
+        super.onTaskFinished(execution);
     }
 
-    private void initializeVerticesIfPossible() {
+    void initializeVerticesIfPossible() {
         final List<ExecutionJobVertex> newlyInitializedJobVertices = new ArrayList<>();
         try {
             final long createTimestamp = System.currentTimeMillis();
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchSchedulerFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchSchedulerFactory.java
index 3aaad1dbd0d..621eaa9788b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchSchedulerFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchSchedulerFactory.java
@@ -25,11 +25,13 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.ExecutionOptions;
 import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.runtime.blob.BlobWriter;
+import org.apache.flink.runtime.blocklist.BlocklistOperations;
 import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
 import org.apache.flink.runtime.checkpoint.CheckpointsCleaner;
 import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
 import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
 import org.apache.flink.runtime.executiongraph.JobStatusListener;
+import org.apache.flink.runtime.executiongraph.SpeculativeExecutionJobVertex;
 import org.apache.flink.runtime.executiongraph.failover.flip1.FailoverStrategyFactoryLoader;
 import org.apache.flink.runtime.executiongraph.failover.flip1.RestartBackoffTimeStrategy;
 import org.apache.flink.runtime.executiongraph.failover.flip1.RestartBackoffTimeStrategyFactoryLoader;
@@ -41,8 +43,6 @@ import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobmaster.ExecutionDeploymentTracker;
 import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotProvider;
 import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotProviderImpl;
-import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotRequestBulkChecker;
-import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotRequestBulkCheckerImpl;
 import org.apache.flink.runtime.jobmaster.slotpool.SlotPool;
 import org.apache.flink.runtime.jobmaster.slotpool.SlotPoolService;
 import org.apache.flink.runtime.jobmaster.slotpool.SlotSelectionStrategy;
@@ -55,17 +55,19 @@ import org.apache.flink.runtime.scheduler.ExecutionSlotAllocatorFactory;
 import org.apache.flink.runtime.scheduler.ExecutionVertexVersioner;
 import org.apache.flink.runtime.scheduler.SchedulerNG;
 import org.apache.flink.runtime.scheduler.SchedulerNGFactory;
-import org.apache.flink.runtime.scheduler.SlotSharingExecutionSlotAllocatorFactory;
+import org.apache.flink.runtime.scheduler.SimpleExecutionSlotAllocator;
 import org.apache.flink.runtime.scheduler.strategy.VertexwiseSchedulingStrategy;
 import org.apache.flink.runtime.shuffle.ShuffleMaster;
 import org.apache.flink.runtime.util.SlotSelectionStrategyUtils;
-import org.apache.flink.util.clock.SystemClock;
 import org.apache.flink.util.concurrent.ScheduledExecutorServiceAdapter;
 
 import org.slf4j.Logger;
 
+import java.util.ArrayList;
+import java.util.List;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ScheduledExecutorService;
+import java.util.function.Consumer;
 
 import static org.apache.flink.util.Preconditions.checkState;
 
@@ -92,7 +94,8 @@ public class AdaptiveBatchSchedulerFactory implements SchedulerNGFactory {
             long initializationTimestamp,
             ComponentMainThreadExecutor mainThreadExecutor,
             FatalErrorHandler fatalErrorHandler,
-            JobStatusListener jobStatusListener)
+            JobStatusListener jobStatusListener,
+            BlocklistOperations blocklistOperations)
             throws Exception {
 
         checkState(
@@ -108,17 +111,15 @@ public class AdaptiveBatchSchedulerFactory implements SchedulerNGFactory {
                                         new IllegalStateException(
                                                 "The AdaptiveBatchScheduler requires a SlotPool."));
 
-        final SlotSelectionStrategy slotSelectionStrategy =
-                SlotSelectionStrategyUtils.selectSlotSelectionStrategy(
-                        JobType.BATCH, jobMasterConfiguration);
-        final PhysicalSlotRequestBulkChecker bulkChecker =
-                PhysicalSlotRequestBulkCheckerImpl.createFromSlotPool(
-                        slotPool, SystemClock.getInstance());
-        final PhysicalSlotProvider physicalSlotProvider =
-                new PhysicalSlotProviderImpl(slotSelectionStrategy, slotPool);
+        final boolean enableSpeculativeExecution =
+                jobMasterConfiguration.getBoolean(JobManagerOptions.SPECULATIVE_ENABLED);
+
+        final List<Consumer<ComponentMainThreadExecutor>> startUpActions = new ArrayList<>();
+        final Consumer<ComponentMainThreadExecutor> combinedStartUpActions =
+                m -> startUpActions.forEach(a -> a.accept(m));
+
         final ExecutionSlotAllocatorFactory allocatorFactory =
-                new SlotSharingExecutionSlotAllocatorFactory(
-                        physicalSlotProvider, false, bulkChecker, slotRequestTimeout);
+                createExecutionSlotAllocatorFactory(jobMasterConfiguration, slotPool);
 
         final RestartBackoffTimeStrategy restartBackoffTimeStrategy =
                 RestartBackoffTimeStrategyFactoryLoader.createRestartBackoffTimeStrategyFactory(
@@ -147,34 +148,86 @@ public class AdaptiveBatchSchedulerFactory implements SchedulerNGFactory {
                         shuffleMaster,
                         partitionTracker,
                         true,
-                        new ExecutionJobVertex.Factory());
-
-        return new AdaptiveBatchScheduler(
-                log,
-                jobGraph,
-                ioExecutor,
-                jobMasterConfiguration,
-                bulkChecker::start,
-                new ScheduledExecutorServiceAdapter(futureExecutor),
-                userCodeLoader,
-                new CheckpointsCleaner(),
-                checkpointRecoveryFactory,
-                jobManagerJobMetricGroup,
-                new VertexwiseSchedulingStrategy.Factory(),
-                FailoverStrategyFactoryLoader.loadFailoverStrategyFactory(jobMasterConfiguration),
-                restartBackoffTimeStrategy,
-                new DefaultExecutionOperations(),
-                new ExecutionVertexVersioner(),
-                allocatorFactory,
-                initializationTimestamp,
-                mainThreadExecutor,
-                jobStatusListener,
-                executionGraphFactory,
-                shuffleMaster,
-                rpcTimeout,
-                DefaultVertexParallelismDecider.from(jobMasterConfiguration),
-                DefaultVertexParallelismDecider.getNormalizedMaxParallelism(
-                        jobMasterConfiguration));
+                        createExecutionJobVertexFactory(enableSpeculativeExecution));
+
+        if (enableSpeculativeExecution) {
+            return new SpeculativeScheduler(
+                    log,
+                    jobGraph,
+                    ioExecutor,
+                    jobMasterConfiguration,
+                    combinedStartUpActions,
+                    new ScheduledExecutorServiceAdapter(futureExecutor),
+                    userCodeLoader,
+                    new CheckpointsCleaner(),
+                    checkpointRecoveryFactory,
+                    jobManagerJobMetricGroup,
+                    new VertexwiseSchedulingStrategy.Factory(),
+                    FailoverStrategyFactoryLoader.loadFailoverStrategyFactory(
+                            jobMasterConfiguration),
+                    restartBackoffTimeStrategy,
+                    new DefaultExecutionOperations(),
+                    new ExecutionVertexVersioner(),
+                    allocatorFactory,
+                    initializationTimestamp,
+                    mainThreadExecutor,
+                    jobStatusListener,
+                    executionGraphFactory,
+                    shuffleMaster,
+                    rpcTimeout,
+                    DefaultVertexParallelismDecider.from(jobMasterConfiguration),
+                    DefaultVertexParallelismDecider.getNormalizedMaxParallelism(
+                            jobMasterConfiguration),
+                    blocklistOperations);
+        } else {
+            return new AdaptiveBatchScheduler(
+                    log,
+                    jobGraph,
+                    ioExecutor,
+                    jobMasterConfiguration,
+                    combinedStartUpActions,
+                    new ScheduledExecutorServiceAdapter(futureExecutor),
+                    userCodeLoader,
+                    new CheckpointsCleaner(),
+                    checkpointRecoveryFactory,
+                    jobManagerJobMetricGroup,
+                    new VertexwiseSchedulingStrategy.Factory(),
+                    FailoverStrategyFactoryLoader.loadFailoverStrategyFactory(
+                            jobMasterConfiguration),
+                    restartBackoffTimeStrategy,
+                    new DefaultExecutionOperations(),
+                    new ExecutionVertexVersioner(),
+                    allocatorFactory,
+                    initializationTimestamp,
+                    mainThreadExecutor,
+                    jobStatusListener,
+                    executionGraphFactory,
+                    shuffleMaster,
+                    rpcTimeout,
+                    DefaultVertexParallelismDecider.from(jobMasterConfiguration),
+                    DefaultVertexParallelismDecider.getNormalizedMaxParallelism(
+                            jobMasterConfiguration));
+        }
+    }
+
+    private static ExecutionSlotAllocatorFactory createExecutionSlotAllocatorFactory(
+            Configuration configuration, SlotPool slotPool) {
+        final SlotSelectionStrategy slotSelectionStrategy =
+                SlotSelectionStrategyUtils.selectSlotSelectionStrategy(
+                        JobType.BATCH, configuration);
+        final PhysicalSlotProvider physicalSlotProvider =
+                new PhysicalSlotProviderImpl(slotSelectionStrategy, slotPool);
+
+        return new SimpleExecutionSlotAllocator.Factory(physicalSlotProvider, false);
+    }
+
+    private static ExecutionJobVertex.Factory createExecutionJobVertexFactory(
+            boolean enableSpeculativeExecution) {
+        if (enableSpeculativeExecution) {
+            return new SpeculativeExecutionJobVertex.Factory();
+        } else {
+            return new ExecutionJobVertex.Factory();
+        }
     }
 
     private void checkAllExchangesBlocking(final JobGraph jobGraph) {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/SpeculativeScheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/SpeculativeScheduler.java
new file mode 100644
index 00000000000..919ec05cf1d
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/SpeculativeScheduler.java
@@ -0,0 +1,314 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.adaptivebatch;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.runtime.blocklist.BlockedNode;
+import org.apache.flink.runtime.blocklist.BlocklistOperations;
+import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
+import org.apache.flink.runtime.checkpoint.CheckpointsCleaner;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.JobStatusListener;
+import org.apache.flink.runtime.executiongraph.SpeculativeExecutionVertex;
+import org.apache.flink.runtime.executiongraph.failover.flip1.FailoverStrategy;
+import org.apache.flink.runtime.executiongraph.failover.flip1.FailureHandlingResult;
+import org.apache.flink.runtime.executiongraph.failover.flip1.RestartBackoffTimeStrategy;
+import org.apache.flink.runtime.io.network.partition.PartitionException;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup;
+import org.apache.flink.runtime.scheduler.ExecutionGraphFactory;
+import org.apache.flink.runtime.scheduler.ExecutionOperations;
+import org.apache.flink.runtime.scheduler.ExecutionSlotAllocatorFactory;
+import org.apache.flink.runtime.scheduler.ExecutionVertexVersioner;
+import org.apache.flink.runtime.scheduler.slowtaskdetector.ExecutionTimeBasedSlowTaskDetector;
+import org.apache.flink.runtime.scheduler.slowtaskdetector.SlowTaskDetector;
+import org.apache.flink.runtime.scheduler.slowtaskdetector.SlowTaskDetectorListener;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.scheduler.strategy.SchedulingStrategyFactory;
+import org.apache.flink.runtime.shuffle.ShuffleMaster;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.concurrent.FutureUtils;
+import org.apache.flink.util.concurrent.ScheduledExecutor;
+
+import org.slf4j.Logger;
+
+import javax.annotation.Nullable;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/** The speculative scheduler. */
+public class SpeculativeScheduler extends AdaptiveBatchScheduler
+        implements SlowTaskDetectorListener {
+
+    private final int maxConcurrentExecutions;
+
+    private final Duration blockSlowNodeDuration;
+
+    private final BlocklistOperations blocklistOperations;
+
+    private final SlowTaskDetector slowTaskDetector;
+
+    public SpeculativeScheduler(
+            final Logger log,
+            final JobGraph jobGraph,
+            final Executor ioExecutor,
+            final Configuration jobMasterConfiguration,
+            final Consumer<ComponentMainThreadExecutor> startUpAction,
+            final ScheduledExecutor delayExecutor,
+            final ClassLoader userCodeLoader,
+            final CheckpointsCleaner checkpointsCleaner,
+            final CheckpointRecoveryFactory checkpointRecoveryFactory,
+            final JobManagerJobMetricGroup jobManagerJobMetricGroup,
+            final SchedulingStrategyFactory schedulingStrategyFactory,
+            final FailoverStrategy.Factory failoverStrategyFactory,
+            final RestartBackoffTimeStrategy restartBackoffTimeStrategy,
+            final ExecutionOperations executionOperations,
+            final ExecutionVertexVersioner executionVertexVersioner,
+            final ExecutionSlotAllocatorFactory executionSlotAllocatorFactory,
+            long initializationTimestamp,
+            final ComponentMainThreadExecutor mainThreadExecutor,
+            final JobStatusListener jobStatusListener,
+            final ExecutionGraphFactory executionGraphFactory,
+            final ShuffleMaster<?> shuffleMaster,
+            final Time rpcTimeout,
+            final VertexParallelismDecider vertexParallelismDecider,
+            final int defaultMaxParallelism,
+            final BlocklistOperations blocklistOperations)
+            throws Exception {
+
+        super(
+                log,
+                jobGraph,
+                ioExecutor,
+                jobMasterConfiguration,
+                startUpAction,
+                delayExecutor,
+                userCodeLoader,
+                checkpointsCleaner,
+                checkpointRecoveryFactory,
+                jobManagerJobMetricGroup,
+                schedulingStrategyFactory,
+                failoverStrategyFactory,
+                restartBackoffTimeStrategy,
+                executionOperations,
+                executionVertexVersioner,
+                executionSlotAllocatorFactory,
+                initializationTimestamp,
+                mainThreadExecutor,
+                jobStatusListener,
+                executionGraphFactory,
+                shuffleMaster,
+                rpcTimeout,
+                vertexParallelismDecider,
+                defaultMaxParallelism);
+
+        this.maxConcurrentExecutions =
+                jobMasterConfiguration.getInteger(
+                        JobManagerOptions.SPECULATIVE_MAX_CONCURRENT_EXECUTIONS);
+
+        this.blockSlowNodeDuration =
+                jobMasterConfiguration.get(JobManagerOptions.BLOCK_SLOW_NODE_DURATION);
+
+        this.blocklistOperations = checkNotNull(blocklistOperations);
+
+        this.slowTaskDetector = new ExecutionTimeBasedSlowTaskDetector(jobMasterConfiguration);
+    }
+
+    @Override
+    protected void startSchedulingInternal() {
+        super.startSchedulingInternal();
+        slowTaskDetector.start(getExecutionGraph(), this, getMainThreadExecutor());
+    }
+
+    @Override
+    public CompletableFuture<Void> closeAsync() {
+        slowTaskDetector.stop();
+        return super.closeAsync();
+    }
+
+    @Override
+    public SpeculativeExecutionVertex getExecutionVertex(ExecutionVertexID executionVertexId) {
+        return (SpeculativeExecutionVertex) super.getExecutionVertex(executionVertexId);
+    }
+
+    @Override
+    protected void onTaskFinished(final Execution execution) {
+        // cancel all un-terminated executions because the execution vertex has finished
+        FutureUtils.assertNoException(cancelPendingExecutions(execution.getVertex().getID()));
+
+        super.onTaskFinished(execution);
+    }
+
+    private CompletableFuture<?> cancelPendingExecutions(
+            final ExecutionVertexID executionVertexId) {
+        // cancel all the related pending requests to avoid that slots returned by the canceled
+        // vertices are used to fulfill these pending requests
+        // do not cancel the FINISHED execution
+        cancelAllPendingSlotRequestsForVertex(executionVertexId);
+        return FutureUtils.combineAll(
+                getExecutionVertex(executionVertexId).getCurrentExecutions().stream()
+                        .filter(e -> e.getState() != ExecutionState.FINISHED)
+                        .map(this::cancelExecution)
+                        .collect(Collectors.toList()));
+    }
+
+    @Override
+    protected void onTaskFailed(final Execution execution) {
+        final SpeculativeExecutionVertex executionVertex =
+                getExecutionVertex(execution.getVertex().getID());
+
+        // when an execution fails, remove it from current executions to make room for future
+        // speculative executions
+        executionVertex.archiveFailedExecution(execution.getAttemptId());
+
+        super.onTaskFailed(execution);
+    }
+
+    @Override
+    protected void handleTaskFailure(
+            final Execution failedExecution, @Nullable final Throwable error) {
+
+        final SpeculativeExecutionVertex executionVertex =
+                getExecutionVertex(failedExecution.getVertex().getID());
+
+        // if the execution vertex is not possible finish or a PartitionException occurred, trigger
+        // an execution vertex failover to recover
+        if (!isExecutionVertexPossibleToFinish(executionVertex)
+                || ExceptionUtils.findThrowable(error, PartitionException.class).isPresent()) {
+            super.handleTaskFailure(failedExecution, error);
+        } else {
+            // this is just a local failure and the execution vertex will not be fully restarted
+            handleLocalExecutionAttemptFailure(failedExecution, error);
+        }
+    }
+
+    private void handleLocalExecutionAttemptFailure(
+            final Execution failedExecution, @Nullable final Throwable error) {
+        executionSlotAllocator.cancel(failedExecution.getAttemptId());
+
+        final FailureHandlingResult failureHandlingResult =
+                recordTaskFailure(failedExecution, error);
+        if (failureHandlingResult.canRestart()) {
+            archiveFromFailureHandlingResult(
+                    createFailureHandlingResultSnapshot(failureHandlingResult));
+        } else {
+            failJob(error, failureHandlingResult.getTimestamp());
+        }
+    }
+
+    private static boolean isExecutionVertexPossibleToFinish(
+            final SpeculativeExecutionVertex executionVertex) {
+        boolean anyExecutionPossibleToFinish = false;
+        for (Execution execution : executionVertex.getCurrentExecutions()) {
+            // if any execution has finished, no execution of the same execution vertex should fail
+            // after that
+            checkState(execution.getState() != ExecutionState.FINISHED);
+
+            if (execution.getState() == ExecutionState.CREATED
+                    || execution.getState() == ExecutionState.SCHEDULED
+                    || execution.getState() == ExecutionState.DEPLOYING
+                    || execution.getState() == ExecutionState.INITIALIZING
+                    || execution.getState() == ExecutionState.RUNNING) {
+                anyExecutionPossibleToFinish = true;
+            }
+        }
+        return anyExecutionPossibleToFinish;
+    }
+
+    @Override
+    public void notifySlowTasks(Map<ExecutionVertexID, Collection<ExecutionAttemptID>> slowTasks) {
+        // add slow nodes to blocklist before scheduling new speculative executions
+        final long blockedEndTimestamp =
+                System.currentTimeMillis() + blockSlowNodeDuration.toMillis();
+        final Collection<BlockedNode> nodesToBlock =
+                getSlowNodeIds(slowTasks).stream()
+                        .map(
+                                nodeId ->
+                                        new BlockedNode(
+                                                nodeId,
+                                                "Node is detected to be slow.",
+                                                blockedEndTimestamp))
+                        .collect(Collectors.toList());
+        blocklistOperations.addNewBlockedNodes(nodesToBlock);
+
+        final List<Execution> newSpeculativeExecutions = new ArrayList<>();
+        final Set<ExecutionVertexID> verticesToDeploy = new HashSet<>();
+        for (ExecutionVertexID executionVertexId : slowTasks.keySet()) {
+            final SpeculativeExecutionVertex executionVertex =
+                    getExecutionVertex(executionVertexId);
+
+            if (executionVertex.containsSources() || executionVertex.containsSinks()) {
+                continue;
+            }
+
+            final int currentConcurrentExecutions = executionVertex.getCurrentExecutions().size();
+            final int newSpeculativeExecutionsToDeploy =
+                    maxConcurrentExecutions - currentConcurrentExecutions;
+            if (newSpeculativeExecutionsToDeploy > 0) {
+                log.info(
+                        "{} ({}) is detected as a slow vertex, create and deploy {} new speculative executions for it.",
+                        executionVertex.getTaskNameWithSubtaskIndex(),
+                        executionVertex.getID(),
+                        newSpeculativeExecutionsToDeploy);
+
+                verticesToDeploy.add(executionVertexId);
+                IntStream.range(0, newSpeculativeExecutionsToDeploy)
+                        .mapToObj(executionVertex::createNewSpeculativeExecution)
+                        .forEach(newSpeculativeExecutions::add);
+            }
+        }
+
+        executionDeployer.allocateSlotsAndDeploy(
+                newSpeculativeExecutions,
+                executionVertexVersioner.getExecutionVertexVersions(verticesToDeploy));
+    }
+
+    private Set<String> getSlowNodeIds(
+            Map<ExecutionVertexID, Collection<ExecutionAttemptID>> slowTasks) {
+        final Set<ExecutionAttemptID> slowExecutions =
+                slowTasks.values().stream().flatMap(Collection::stream).collect(Collectors.toSet());
+
+        return slowExecutions.stream()
+                .map(id -> getExecutionGraph().getRegisteredExecutions().get(id))
+                .map(Execution::getAssignedResourceLocation)
+                .map(TaskManagerLocation::getNodeId)
+                .collect(Collectors.toSet());
+    }
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterSchedulerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterSchedulerTest.java
index 07fc6ed2e3c..ee542eb0822 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterSchedulerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterSchedulerTest.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.runtime.blob.BlobWriter;
+import org.apache.flink.runtime.blocklist.BlocklistOperations;
 import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
 import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
 import org.apache.flink.runtime.executiongraph.JobStatusListener;
@@ -119,7 +120,8 @@ public class JobMasterSchedulerTest extends TestLogger {
                 long initializationTimestamp,
                 ComponentMainThreadExecutor mainThreadExecutor,
                 FatalErrorHandler fatalErrorHandler,
-                JobStatusListener jobStatusListener) {
+                JobStatusListener jobStatusListener,
+                BlocklistOperations blocklistOperations) {
             return TestingSchedulerNG.newBuilder()
                     .setStartSchedulingRunnable(
                             () -> {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerBuilder.java
index 4c99e8ce5ec..7629e76a157 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerBuilder.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerBuilder.java
@@ -23,12 +23,14 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.runtime.blob.BlobWriter;
 import org.apache.flink.runtime.blob.VoidBlobWriter;
+import org.apache.flink.runtime.blocklist.BlocklistOperations;
 import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
 import org.apache.flink.runtime.checkpoint.CheckpointsCleaner;
 import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
 import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
 import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
 import org.apache.flink.runtime.executiongraph.JobStatusListener;
+import org.apache.flink.runtime.executiongraph.SpeculativeExecutionJobVertex;
 import org.apache.flink.runtime.executiongraph.failover.flip1.FailoverStrategy;
 import org.apache.flink.runtime.executiongraph.failover.flip1.NoRestartBackoffTimeStrategy;
 import org.apache.flink.runtime.executiongraph.failover.flip1.RestartBackoffTimeStrategy;
@@ -40,6 +42,7 @@ import org.apache.flink.runtime.jobmaster.DefaultExecutionDeploymentTracker;
 import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup;
 import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
 import org.apache.flink.runtime.scheduler.adaptivebatch.AdaptiveBatchScheduler;
+import org.apache.flink.runtime.scheduler.adaptivebatch.SpeculativeScheduler;
 import org.apache.flink.runtime.scheduler.adaptivebatch.VertexParallelismDecider;
 import org.apache.flink.runtime.scheduler.strategy.PipelinedRegionSchedulingStrategy;
 import org.apache.flink.runtime.scheduler.strategy.SchedulingStrategyFactory;
@@ -57,7 +60,7 @@ import java.util.concurrent.ScheduledExecutorService;
 
 import static org.apache.flink.runtime.scheduler.SchedulerBase.computeVertexParallelismStore;
 
-/** A builder to create {@link DefaultScheduler} instances for testing. */
+/** A builder to create {@link DefaultScheduler} or its subclass instances for testing. */
 public class DefaultSchedulerBuilder {
     private static final Logger LOG = LoggerFactory.getLogger(DefaultSchedulerBuilder.class);
 
@@ -95,6 +98,7 @@ public class DefaultSchedulerBuilder {
     private VertexParallelismDecider vertexParallelismDecider = (ignored) -> 0;
     private int defaultMaxParallelism =
             JobManagerOptions.ADAPTIVE_BATCH_SCHEDULER_MAX_PARALLELISM.defaultValue();
+    private BlocklistOperations blocklistOperations = ignore -> {};
 
     public DefaultSchedulerBuilder(
             JobGraph jobGraph,
@@ -245,6 +249,11 @@ public class DefaultSchedulerBuilder {
         return this;
     }
 
+    public DefaultSchedulerBuilder setBlocklistOperations(BlocklistOperations blocklistOperations) {
+        this.blocklistOperations = blocklistOperations;
+        return this;
+    }
+
     public DefaultScheduler build() throws Exception {
         return new DefaultScheduler(
                 log,
@@ -301,7 +310,41 @@ public class DefaultSchedulerBuilder {
                 defaultMaxParallelism);
     }
 
+    public SpeculativeScheduler buildSpeculativeScheduler() throws Exception {
+        return new SpeculativeScheduler(
+                log,
+                jobGraph,
+                ioExecutor,
+                jobMasterConfiguration,
+                componentMainThreadExecutor -> {},
+                delayExecutor,
+                userCodeLoader,
+                checkpointCleaner,
+                checkpointRecoveryFactory,
+                jobManagerJobMetricGroup,
+                new VertexwiseSchedulingStrategy.Factory(),
+                failoverStrategyFactory,
+                restartBackoffTimeStrategy,
+                executionOperations,
+                executionVertexVersioner,
+                executionSlotAllocatorFactory,
+                System.currentTimeMillis(),
+                mainThreadExecutor,
+                jobStatusListener,
+                createExecutionGraphFactory(true, new SpeculativeExecutionJobVertex.Factory()),
+                shuffleMaster,
+                rpcTimeout,
+                vertexParallelismDecider,
+                defaultMaxParallelism,
+                blocklistOperations);
+    }
+
     private ExecutionGraphFactory createExecutionGraphFactory(boolean isDynamicGraph) {
+        return createExecutionGraphFactory(isDynamicGraph, new ExecutionJobVertex.Factory());
+    }
+
+    private ExecutionGraphFactory createExecutionGraphFactory(
+            boolean isDynamicGraph, ExecutionJobVertex.Factory executionJobVertexFactory) {
         return new DefaultExecutionGraphFactory(
                 jobMasterConfiguration,
                 userCodeLoader,
@@ -314,6 +357,6 @@ public class DefaultSchedulerBuilder {
                 shuffleMaster,
                 partitionTracker,
                 isDynamicGraph,
-                new ExecutionJobVertex.Factory());
+                executionJobVertexFactory);
     }
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
index d78610c25aa..d3bad8be708 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
@@ -1690,7 +1690,7 @@ public class DefaultSchedulerTest extends TestLogger {
         schedulerClosed.get();
     }
 
-    private static TaskExecutionState createFailedTaskExecutionState(
+    public static TaskExecutionState createFailedTaskExecutionState(
             ExecutionAttemptID executionAttemptID) {
         return new TaskExecutionState(
                 executionAttemptID, ExecutionState.FAILED, new Exception("Expected failure cause"));
@@ -1745,7 +1745,7 @@ public class DefaultSchedulerTest extends TestLogger {
         scheduler.getJobTerminationFuture().get(TIMEOUT_MS, TimeUnit.MILLISECONDS);
     }
 
-    private static JobGraph singleNonParallelJobVertexJobGraph() {
+    public static JobGraph singleNonParallelJobVertexJobGraph() {
         return singleJobVertexJobGraph(1);
     }
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SimpleExecutionSlotAllocatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SimpleExecutionSlotAllocatorTest.java
new file mode 100644
index 00000000000..1fd6ca850f6
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SimpleExecutionSlotAllocatorTest.java
@@ -0,0 +1,269 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.jobmaster.LogicalSlot;
+import org.apache.flink.runtime.jobmaster.SlotRequestId;
+import org.apache.flink.runtime.jobmaster.TestingPayload;
+import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotRequest;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.function.BiConsumerWithException;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Collections;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.CompletableFuture;
+
+import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.createExecutionAttemptId;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Test suits for {@link SimpleExecutionSlotAllocator}. */
+class SimpleExecutionSlotAllocatorTest {
+
+    private static final ResourceProfile RESOURCE_PROFILE = ResourceProfile.fromResources(3, 5);
+    private static final ExecutionAttemptID EXECUTION_ATTEMPT_ID = createExecutionAttemptId();
+
+    @Test
+    void testSlotAllocation() {
+        final AllocationContext context = new AllocationContext();
+        final CompletableFuture<LogicalSlot> slotFuture =
+                context.allocateSlotsFor(EXECUTION_ATTEMPT_ID);
+        assertThat(slotFuture).isCompleted();
+        assertThat(context.getSlotProvider().getRequests()).hasSize(1);
+
+        final PhysicalSlotRequest slotRequest =
+                context.getSlotProvider().getRequests().values().iterator().next();
+        assertThat(slotRequest.getSlotProfile().getPhysicalSlotResourceProfile())
+                .isEqualTo(RESOURCE_PROFILE);
+    }
+
+    @Test
+    void testDuplicateAllocationDoesNotRecreateLogicalSlotFuture() throws Exception {
+        final AllocationContext context = new AllocationContext();
+
+        final CompletableFuture<LogicalSlot> slotFuture1 =
+                context.allocateSlotsFor(EXECUTION_ATTEMPT_ID);
+        final CompletableFuture<LogicalSlot> slotFuture2 =
+                context.allocateSlotsFor(EXECUTION_ATTEMPT_ID);
+
+        assertThat(slotFuture1.get()).isSameAs(slotFuture2.get());
+    }
+
+    @Test
+    void testFailedPhysicalSlotRequestFailsLogicalSlotFuture() {
+        final AllocationContext context =
+                new AllocationContext(
+                        TestingPhysicalSlotProvider.createWithoutImmediatePhysicalSlotCreation(),
+                        false);
+        final CompletableFuture<LogicalSlot> slotFuture =
+                context.allocateSlotsFor(EXECUTION_ATTEMPT_ID);
+        final SlotRequestId slotRequestId =
+                context.getSlotProvider().getFirstRequestOrFail().getSlotRequestId();
+
+        assertThat(slotFuture).isNotDone();
+        context.getSlotProvider()
+                .getResponses()
+                .get(slotRequestId)
+                .completeExceptionally(new Throwable());
+        assertThat(slotFuture).isCompletedExceptionally();
+
+        // next allocation allocates new slot
+        context.allocateSlotsFor(EXECUTION_ATTEMPT_ID);
+        assertThat(context.getSlotProvider().getRequests()).hasSize(2);
+    }
+
+    @Test
+    void testSlotWillBeOccupiedIndefinitelyFalse() throws Exception {
+        testSlotWillBeOccupiedIndefinitely(false);
+    }
+
+    @Test
+    void testSlotWillBeOccupiedIndefinitelyTrue() throws Exception {
+        testSlotWillBeOccupiedIndefinitely(true);
+    }
+
+    private static void testSlotWillBeOccupiedIndefinitely(boolean slotWillBeOccupiedIndefinitely)
+            throws Exception {
+        final AllocationContext context =
+                new AllocationContext(
+                        TestingPhysicalSlotProvider.createWithInfiniteSlotCreation(),
+                        slotWillBeOccupiedIndefinitely);
+        context.allocateSlotsFor(EXECUTION_ATTEMPT_ID);
+
+        final PhysicalSlotRequest slotRequest = context.getSlotProvider().getFirstRequestOrFail();
+        assertThat(slotRequest.willSlotBeOccupiedIndefinitely())
+                .isEqualTo(slotWillBeOccupiedIndefinitely);
+
+        final TestingPhysicalSlot physicalSlot =
+                context.getSlotProvider().getResponses().get(slotRequest.getSlotRequestId()).get();
+        assertThat(physicalSlot.getPayload()).isNotNull();
+        assertThat(physicalSlot.getPayload().willOccupySlotIndefinitely())
+                .isEqualTo(slotWillBeOccupiedIndefinitely);
+    }
+
+    @Test
+    void testLogicalSlotReleasingCancelsPhysicalSlotRequest() throws Exception {
+        testLogicalSlotRequestCancellationOrRelease(
+                true, true, (context, slotFuture) -> slotFuture.get().releaseSlot(null));
+    }
+
+    @Test
+    void testLogicalSlotCancellationCancelsPhysicalSlotRequest() throws Exception {
+        testLogicalSlotRequestCancellationOrRelease(
+                false,
+                true,
+                (context, slotFuture) -> {
+                    assertThatThrownBy(
+                                    () -> {
+                                        context.getAllocator().cancel(EXECUTION_ATTEMPT_ID);
+                                        slotFuture.get();
+                                    })
+                            .as("The logical future must finish with a cancellation exception.")
+                            .isInstanceOf(CancellationException.class);
+                });
+    }
+
+    @Test
+    void testCompletedLogicalSlotCancellationDoesNotCancelPhysicalSlotRequest() throws Exception {
+        testLogicalSlotRequestCancellationOrRelease(
+                true,
+                false,
+                (context, slotFuture) -> {
+                    context.getAllocator().cancel(EXECUTION_ATTEMPT_ID);
+                    slotFuture.get();
+                });
+    }
+
+    private static void testLogicalSlotRequestCancellationOrRelease(
+            final boolean autoCompletePhysicalSlotFuture,
+            final boolean expectPhysicalSlotRequestCanceled,
+            final BiConsumerWithException<
+                            AllocationContext, CompletableFuture<LogicalSlot>, Exception>
+                    cancelOrReleaseAction)
+            throws Exception {
+
+        final TestingPhysicalSlotProvider physicalSlotProvider;
+        if (!autoCompletePhysicalSlotFuture) {
+            physicalSlotProvider =
+                    TestingPhysicalSlotProvider.createWithoutImmediatePhysicalSlotCreation();
+        } else {
+            physicalSlotProvider = TestingPhysicalSlotProvider.createWithInfiniteSlotCreation();
+        }
+        final AllocationContext context = new AllocationContext(physicalSlotProvider, false);
+
+        final CompletableFuture<LogicalSlot> slotFuture1 =
+                context.allocateSlotsFor(EXECUTION_ATTEMPT_ID);
+
+        cancelOrReleaseAction.accept(context, slotFuture1);
+
+        final SlotRequestId slotRequestId =
+                context.getSlotProvider().getFirstRequestOrFail().getSlotRequestId();
+        assertThat(context.getSlotProvider().getCancellations().containsKey(slotRequestId))
+                .isEqualTo(expectPhysicalSlotRequestCanceled);
+
+        context.allocateSlotsFor(EXECUTION_ATTEMPT_ID);
+        final int expectedNumberOfRequests = expectPhysicalSlotRequestCanceled ? 2 : 1;
+        assertThat(context.getSlotProvider().getRequests()).hasSize(expectedNumberOfRequests);
+    }
+
+    @Test
+    void testPhysicalSlotReleasesLogicalSlots() throws Exception {
+        final AllocationContext context = new AllocationContext();
+        final CompletableFuture<LogicalSlot> slotFuture =
+                context.allocateSlotsFor(EXECUTION_ATTEMPT_ID);
+        final TestingPayload payload = new TestingPayload();
+        slotFuture.thenAccept(logicalSlot -> logicalSlot.tryAssignPayload(payload));
+        final SlotRequestId slotRequestId =
+                context.getSlotProvider().getFirstRequestOrFail().getSlotRequestId();
+        final TestingPhysicalSlot physicalSlot =
+                context.getSlotProvider().getFirstResponseOrFail().get();
+
+        assertThat(payload.getTerminalStateFuture()).isNotDone();
+        assertThat(physicalSlot.getPayload()).isNotNull();
+
+        physicalSlot.getPayload().release(new Throwable());
+        assertThat(payload.getTerminalStateFuture()).isDone();
+        assertThat(context.getSlotProvider().getCancellations()).containsKey(slotRequestId);
+
+        context.allocateSlotsFor(EXECUTION_ATTEMPT_ID);
+        // there should be one more physical slot allocation, as the first allocation should be
+        // removed after releasing all logical slots
+        assertThat(context.getSlotProvider().getRequests()).hasSize(2);
+    }
+
+    @Test
+    void testFailLogicalSlotIfPhysicalSlotIsFails() {
+        final AllocationContext context =
+                new AllocationContext(
+                        TestingPhysicalSlotProvider.createWithFailingPhysicalSlotCreation(
+                                new FlinkException("test failure")),
+                        false);
+        final CompletableFuture<LogicalSlot> slotFuture =
+                context.allocateSlotsFor(EXECUTION_ATTEMPT_ID);
+
+        assertThat(slotFuture).isCompletedExceptionally();
+        assertThat(context.getSlotProvider().getCancellations().keySet())
+                .isEqualTo(context.getSlotProvider().getRequests().keySet());
+    }
+
+    private static class AllocationContext {
+        private final TestingPhysicalSlotProvider slotProvider;
+        private final boolean slotWillBeOccupiedIndefinitely;
+        private final SimpleExecutionSlotAllocator allocator;
+
+        public AllocationContext() {
+            this(TestingPhysicalSlotProvider.createWithInfiniteSlotCreation(), false);
+        }
+
+        public AllocationContext(
+                TestingPhysicalSlotProvider slotProvider, boolean slotWillBeOccupiedIndefinitely) {
+            this.slotProvider = slotProvider;
+            this.slotWillBeOccupiedIndefinitely = slotWillBeOccupiedIndefinitely;
+            this.allocator =
+                    new SimpleExecutionSlotAllocator(
+                            slotProvider,
+                            executionAttemptId -> RESOURCE_PROFILE,
+                            slotWillBeOccupiedIndefinitely);
+        }
+
+        private CompletableFuture<LogicalSlot> allocateSlotsFor(
+                ExecutionAttemptID executionAttemptId) {
+            return allocator
+                    .allocateSlotsFor(Collections.singletonList(executionAttemptId))
+                    .get(0)
+                    .getLogicalSlotFuture();
+        }
+
+        public TestingPhysicalSlotProvider getSlotProvider() {
+            return slotProvider;
+        }
+
+        public boolean isSlotWillBeOccupiedIndefinitely() {
+            return slotWillBeOccupiedIndefinitely;
+        }
+
+        public SimpleExecutionSlotAllocator getAllocator() {
+            return allocator;
+        }
+    }
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestingSchedulerNGFactory.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestingSchedulerNGFactory.java
index 40a122e46f5..6233a42a805 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestingSchedulerNGFactory.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestingSchedulerNGFactory.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.runtime.blob.BlobWriter;
+import org.apache.flink.runtime.blocklist.BlocklistOperations;
 import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
 import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
 import org.apache.flink.runtime.executiongraph.JobStatusListener;
@@ -69,7 +70,8 @@ public class TestingSchedulerNGFactory implements SchedulerNGFactory {
             long initializationTimestamp,
             ComponentMainThreadExecutor mainThreadExecutor,
             FatalErrorHandler fatalErrorHandler,
-            JobStatusListener jobStatusListener)
+            JobStatusListener jobStatusListener,
+            BlocklistOperations blocklistOperations)
             throws Exception {
         return schedulerNG;
     }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/SpeculativeSchedulerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/SpeculativeSchedulerTest.java
new file mode 100644
index 00000000000..75456468447
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/SpeculativeSchedulerTest.java
@@ -0,0 +1,426 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.adaptivebatch;
+
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.runtime.blocklist.BlockedNode;
+import org.apache.flink.runtime.blocklist.BlocklistOperations;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.execution.SuppressRestartsException;
+import org.apache.flink.runtime.executiongraph.DefaultExecutionGraph;
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.executiongraph.IOMetrics;
+import org.apache.flink.runtime.executiongraph.failover.flip1.TestRestartBackoffTimeStrategy;
+import org.apache.flink.runtime.io.network.partition.PartitionNotFoundException;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.scheduler.DefaultExecutionOperations;
+import org.apache.flink.runtime.scheduler.DefaultSchedulerBuilder;
+import org.apache.flink.runtime.scheduler.TestExecutionOperationsDecorator;
+import org.apache.flink.runtime.scheduler.exceptionhistory.RootExceptionHistoryEntry;
+import org.apache.flink.runtime.taskmanager.TaskExecutionState;
+import org.apache.flink.runtime.testutils.DirectScheduledExecutorService;
+import org.apache.flink.testutils.TestingUtils;
+import org.apache.flink.testutils.executor.TestExecutorExtension;
+import org.apache.flink.util.ExecutorUtils;
+import org.apache.flink.util.concurrent.ManuallyTriggeredScheduledExecutor;
+
+import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableMap;
+import org.apache.flink.shaded.guava30.com.google.common.collect.Iterables;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.completeCancellingForAllVertices;
+import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.createNoOpVertex;
+import static org.apache.flink.runtime.scheduler.DefaultSchedulerTest.createFailedTaskExecutionState;
+import static org.apache.flink.runtime.scheduler.DefaultSchedulerTest.singleNonParallelJobVertexJobGraph;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link SpeculativeScheduler}. */
+class SpeculativeSchedulerTest {
+
+    @RegisterExtension
+    private static final TestExecutorExtension<ScheduledExecutorService> EXECUTOR_RESOURCE =
+            TestingUtils.defaultExecutorExtension();
+
+    private ScheduledExecutorService futureExecutor;
+
+    private ManuallyTriggeredScheduledExecutor taskRestartExecutor;
+    private TestExecutionOperationsDecorator testExecutionOperations;
+    private TestBlocklistOperations testBlocklistOperations;
+    private TestRestartBackoffTimeStrategy restartStrategy;
+
+    @BeforeEach
+    void setUp() {
+        futureExecutor = new DirectScheduledExecutorService();
+
+        taskRestartExecutor = new ManuallyTriggeredScheduledExecutor();
+        testExecutionOperations =
+                new TestExecutionOperationsDecorator(new DefaultExecutionOperations());
+        testBlocklistOperations = new TestBlocklistOperations();
+        restartStrategy = new TestRestartBackoffTimeStrategy(true, 0);
+    }
+
+    @AfterEach
+    void tearDown() {
+        if (futureExecutor != null) {
+            ExecutorUtils.gracefulShutdown(10, TimeUnit.SECONDS, futureExecutor);
+        }
+    }
+
+    @Test
+    void testStartScheduling() {
+        createSchedulerAndStartScheduling();
+        final List<ExecutionAttemptID> deployedExecutions =
+                testExecutionOperations.getDeployedExecutions();
+        assertThat(deployedExecutions).hasSize(1);
+    }
+
+    @Test
+    void testNotifySlowTasks() {
+        final SpeculativeScheduler scheduler = createSchedulerAndStartScheduling();
+        final ExecutionVertex ev = getOnlyExecutionVertex(scheduler);
+        final Execution attempt1 = ev.getCurrentExecutionAttempt();
+
+        assertThat(testExecutionOperations.getDeployedExecutions()).hasSize(1);
+
+        notifySlowTask(scheduler, attempt1);
+
+        assertThat(testExecutionOperations.getDeployedExecutions()).hasSize(2);
+        assertThat(testBlocklistOperations.getAllBlockedNodeIds())
+                .containsExactly(attempt1.getAssignedResourceLocation().getNodeId());
+    }
+
+    @Test
+    void testNotifyDuplicatedSlowTasks() {
+        final SpeculativeScheduler scheduler = createSchedulerAndStartScheduling();
+        final ExecutionVertex ev = getOnlyExecutionVertex(scheduler);
+        final Execution attempt1 = ev.getCurrentExecutionAttempt();
+
+        notifySlowTask(scheduler, attempt1);
+
+        assertThat(testExecutionOperations.getDeployedExecutions()).hasSize(2);
+
+        // notify the execution as a slow task again
+        notifySlowTask(scheduler, attempt1);
+
+        assertThat(testExecutionOperations.getDeployedExecutions()).hasSize(2);
+
+        // fail attempt2 to make room for a new speculative execution
+        final Execution attempt2 = getExecution(ev, 1);
+        scheduler.updateTaskExecutionState(createFailedTaskExecutionState(attempt2.getAttemptId()));
+
+        // notify the execution as a slow task again
+        notifySlowTask(scheduler, attempt1);
+
+        assertThat(testExecutionOperations.getDeployedExecutions()).hasSize(3);
+    }
+
+    @Test
+    void testRestartVertexIfAllSpeculativeExecutionFailed() {
+        final SpeculativeScheduler scheduler = createSchedulerAndStartScheduling();
+        final ExecutionVertex ev = getOnlyExecutionVertex(scheduler);
+        final Execution attempt1 = ev.getCurrentExecutionAttempt();
+
+        notifySlowTask(scheduler, attempt1);
+
+        assertThat(testExecutionOperations.getDeployedExecutions()).hasSize(2);
+
+        final ExecutionAttemptID attemptId1 = attempt1.getAttemptId();
+        final ExecutionAttemptID attemptId2 = getExecution(ev, 1).getAttemptId();
+
+        scheduler.updateTaskExecutionState(createFailedTaskExecutionState(attemptId1));
+        scheduler.updateTaskExecutionState(createFailedTaskExecutionState(attemptId2));
+        taskRestartExecutor.triggerScheduledTasks();
+
+        assertThat(testExecutionOperations.getDeployedExecutions()).hasSize(3);
+    }
+
+    @Test
+    void testNoRestartIfNotAllSpeculativeExecutionFailed() {
+        final SpeculativeScheduler scheduler = createSchedulerAndStartScheduling();
+        final ExecutionVertex ev = getOnlyExecutionVertex(scheduler);
+        final Execution attempt1 = ev.getCurrentExecutionAttempt();
+
+        notifySlowTask(scheduler, attempt1);
+        scheduler.updateTaskExecutionState(createFailedTaskExecutionState(attempt1.getAttemptId()));
+        taskRestartExecutor.triggerScheduledTasks();
+
+        assertThat(testExecutionOperations.getDeployedExecutions()).hasSize(2);
+    }
+
+    @Test
+    void testRestartVertexIfPartitionExceptionHappened() {
+        final SpeculativeScheduler scheduler = createSchedulerAndStartScheduling();
+        final ExecutionVertex ev = getOnlyExecutionVertex(scheduler);
+        final Execution attempt1 = ev.getCurrentExecutionAttempt();
+
+        notifySlowTask(scheduler, attempt1);
+        final Execution attempt2 = getExecution(ev, 1);
+        scheduler.updateTaskExecutionState(
+                new TaskExecutionState(
+                        attempt1.getAttemptId(),
+                        ExecutionState.FAILED,
+                        new PartitionNotFoundException(new ResultPartitionID())));
+
+        assertThat(attempt2.getState()).isEqualTo(ExecutionState.CANCELING);
+
+        completeCancellingForAllVertices(scheduler.getExecutionGraph());
+        taskRestartExecutor.triggerScheduledTasks();
+
+        assertThat(testExecutionOperations.getDeployedExecutions()).hasSize(3);
+    }
+
+    @Test
+    void testCancelOtherCurrentExecutionsWhenAnyExecutionFinished() {
+        final SpeculativeScheduler scheduler = createSchedulerAndStartScheduling();
+        final ExecutionVertex ev = getOnlyExecutionVertex(scheduler);
+        final Execution attempt1 = ev.getCurrentExecutionAttempt();
+
+        notifySlowTask(scheduler, attempt1);
+        final Execution attempt2 = getExecution(ev, 1);
+        scheduler.updateTaskExecutionState(
+                new TaskExecutionState(attempt1.getAttemptId(), ExecutionState.FINISHED));
+
+        assertThat(attempt2.getState()).isEqualTo(ExecutionState.CANCELING);
+    }
+
+    @Test
+    void testExceptionHistoryIfPartitionExceptionHappened() {
+        final SpeculativeScheduler scheduler = createSchedulerAndStartScheduling();
+        final ExecutionVertex ev = getOnlyExecutionVertex(scheduler);
+        final Execution attempt1 = ev.getCurrentExecutionAttempt();
+
+        notifySlowTask(scheduler, attempt1);
+
+        // A partition exception can result in a restart of the whole execution vertex.
+        scheduler.updateTaskExecutionState(
+                new TaskExecutionState(
+                        attempt1.getAttemptId(),
+                        ExecutionState.FAILED,
+                        new PartitionNotFoundException(new ResultPartitionID())));
+
+        completeCancellingForAllVertices(scheduler.getExecutionGraph());
+        taskRestartExecutor.triggerScheduledTasks();
+
+        assertThat(scheduler.getExceptionHistory()).hasSize(1);
+
+        final RootExceptionHistoryEntry entry = scheduler.getExceptionHistory().iterator().next();
+        // the current execution attempt before the restarting should be attempt2 but the failure
+        // root exception should be attempt1
+        assertThat(entry.getFailingTaskName()).isEqualTo(attempt1.getVertexWithAttempt());
+    }
+
+    @Test
+    void testLocalExecutionAttemptFailureIsCorrectlyRecorded() {
+        final SpeculativeScheduler scheduler = createSchedulerAndStartScheduling();
+        final ExecutionVertex ev = getOnlyExecutionVertex(scheduler);
+        final Execution attempt1 = ev.getCurrentExecutionAttempt();
+
+        notifySlowTask(scheduler, attempt1);
+
+        // the execution vertex will not be restarted if we only fails attempt1, but it still should
+        // be recorded in the execution graph and in exception history
+        final TaskExecutionState failedState =
+                createFailedTaskExecutionState(attempt1.getAttemptId());
+        scheduler.updateTaskExecutionState(failedState);
+
+        final ClassLoader classLoader = SpeculativeSchedulerTest.class.getClassLoader();
+        assertThat(scheduler.getExecutionGraph().getFailureInfo()).isNotNull();
+        assertThat(scheduler.getExecutionGraph().getFailureInfo().getExceptionAsString())
+                .contains(failedState.getError(classLoader).getMessage());
+
+        assertThat(scheduler.getExceptionHistory()).hasSize(1);
+
+        final RootExceptionHistoryEntry entry = scheduler.getExceptionHistory().iterator().next();
+        assertThat(entry.getFailingTaskName()).isEqualTo(attempt1.getVertexWithAttempt());
+    }
+
+    @Test
+    void testUnrecoverableLocalExecutionAttemptFailureWillFailJob() {
+        final SpeculativeScheduler scheduler = createSchedulerAndStartScheduling();
+        final ExecutionVertex ev = getOnlyExecutionVertex(scheduler);
+        final Execution attempt1 = ev.getCurrentExecutionAttempt();
+
+        notifySlowTask(scheduler, attempt1);
+
+        final TaskExecutionState failedState =
+                new TaskExecutionState(
+                        attempt1.getAttemptId(),
+                        ExecutionState.FAILED,
+                        new SuppressRestartsException(
+                                new Exception("Forced failure for testing.")));
+        scheduler.updateTaskExecutionState(failedState);
+
+        assertThat(scheduler.getExecutionGraph().getState()).isEqualTo(JobStatus.FAILING);
+    }
+
+    @Test
+    void testLocalExecutionAttemptFailureAndForbiddenRestartWillFailJob() {
+        restartStrategy.setCanRestart(false);
+
+        final SpeculativeScheduler scheduler = createSchedulerAndStartScheduling();
+        final ExecutionVertex ev = getOnlyExecutionVertex(scheduler);
+        final Execution attempt1 = ev.getCurrentExecutionAttempt();
+
+        notifySlowTask(scheduler, attempt1);
+
+        final TaskExecutionState failedState =
+                createFailedTaskExecutionState(attempt1.getAttemptId());
+        scheduler.updateTaskExecutionState(failedState);
+
+        assertThat(scheduler.getExecutionGraph().getState()).isEqualTo(JobStatus.FAILING);
+    }
+
+    @Test
+    public void testSpeculativeExecutionCombinedWithAdaptiveScheduling() throws Exception {
+        final JobVertex source = createNoOpVertex("source", 1);
+        final JobVertex sink = createNoOpVertex("sink", -1);
+        sink.connectNewDataSetAsInput(
+                source, DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING);
+        final JobGraph jobGraph = JobGraphTestUtils.batchJobGraph(source, sink);
+
+        final ComponentMainThreadExecutor mainThreadExecutor =
+                ComponentMainThreadExecutorServiceAdapter.forMainThread();
+        final SpeculativeScheduler scheduler =
+                createSchedulerBuilder(jobGraph, mainThreadExecutor)
+                        .setVertexParallelismDecider(ignore -> 3)
+                        .buildSpeculativeScheduler();
+        mainThreadExecutor.execute(scheduler::startScheduling);
+
+        final DefaultExecutionGraph graph = (DefaultExecutionGraph) scheduler.getExecutionGraph();
+        final ExecutionJobVertex sourceExecutionJobVertex = graph.getJobVertex(source.getID());
+        final ExecutionJobVertex sinkExecutionJobVertex = graph.getJobVertex(sink.getID());
+
+        final ExecutionVertex sourceExecutionVertex = sourceExecutionJobVertex.getTaskVertices()[0];
+        assertThat(sourceExecutionVertex.getCurrentExecutions()).hasSize(1);
+
+        // trigger source vertex speculation
+        final Execution sourceAttempt1 = sourceExecutionVertex.getCurrentExecutionAttempt();
+        notifySlowTask(scheduler, sourceAttempt1);
+        assertThat(sourceExecutionVertex.getCurrentExecutions()).hasSize(2);
+
+        assertThat(sinkExecutionJobVertex.getParallelism()).isEqualTo(-1);
+
+        // Finishing any source execution attempt will finish the source execution vertex, and then
+        // finish the job vertex.
+        scheduler.updateTaskExecutionState(
+                new TaskExecutionState(
+                        sourceAttempt1.getAttemptId(),
+                        ExecutionState.FINISHED,
+                        null,
+                        null,
+                        new IOMetrics(0, 0, 0, 0, 0, 0, 0)));
+        assertThat(sinkExecutionJobVertex.getParallelism()).isEqualTo(3);
+
+        // trigger sink vertex speculation
+        final ExecutionVertex sinkExecutionVertex = sinkExecutionJobVertex.getTaskVertices()[0];
+        final Execution sinkAttempt1 = sinkExecutionVertex.getCurrentExecutionAttempt();
+        notifySlowTask(scheduler, sinkAttempt1);
+        assertThat(sinkExecutionVertex.getCurrentExecutions()).hasSize(2);
+    }
+
+    private static Execution getExecution(ExecutionVertex executionVertex, int attemptNumber) {
+        return executionVertex.getCurrentExecutions().stream()
+                .filter(e -> e.getAttemptNumber() == attemptNumber)
+                .findFirst()
+                .get();
+    }
+
+    private static ExecutionVertex getOnlyExecutionVertex(SpeculativeScheduler scheduler) {
+        return Iterables.getOnlyElement(scheduler.getExecutionGraph().getAllExecutionVertices());
+    }
+
+    private SpeculativeScheduler createSchedulerAndStartScheduling() {
+        return createSchedulerAndStartScheduling(singleNonParallelJobVertexJobGraph());
+    }
+
+    private SpeculativeScheduler createSchedulerAndStartScheduling(final JobGraph jobGraph) {
+        final ComponentMainThreadExecutor mainThreadExecutor =
+                ComponentMainThreadExecutorServiceAdapter.forMainThread();
+
+        try {
+            final SpeculativeScheduler scheduler = createScheduler(jobGraph, mainThreadExecutor);
+            mainThreadExecutor.execute(scheduler::startScheduling);
+            return scheduler;
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    private SpeculativeScheduler createScheduler(
+            final JobGraph jobGraph, final ComponentMainThreadExecutor mainThreadExecutor)
+            throws Exception {
+        return createSchedulerBuilder(jobGraph, mainThreadExecutor).buildSpeculativeScheduler();
+    }
+
+    private DefaultSchedulerBuilder createSchedulerBuilder(
+            final JobGraph jobGraph, final ComponentMainThreadExecutor mainThreadExecutor) {
+        return new DefaultSchedulerBuilder(
+                        jobGraph, mainThreadExecutor, EXECUTOR_RESOURCE.getExecutor())
+                .setBlocklistOperations(testBlocklistOperations)
+                .setExecutionOperations(testExecutionOperations)
+                .setFutureExecutor(futureExecutor)
+                .setDelayExecutor(taskRestartExecutor)
+                .setRestartBackoffTimeStrategy(restartStrategy);
+    }
+
+    private static void notifySlowTask(
+            final SpeculativeScheduler scheduler, final Execution slowTask) {
+        scheduler.notifySlowTasks(
+                ImmutableMap.of(
+                        slowTask.getVertex().getID(),
+                        Collections.singleton(slowTask.getAttemptId())));
+    }
+
+    private static class TestBlocklistOperations implements BlocklistOperations {
+        private final List<BlockedNode> blockedNodes = new ArrayList<>();
+
+        @Override
+        public void addNewBlockedNodes(Collection<BlockedNode> newNodes) {
+            blockedNodes.addAll(newNodes);
+        }
+
+        public Set<String> getAllBlockedNodeIds() {
+            return blockedNodes.stream().map(BlockedNode::getNodeId).collect(Collectors.toSet());
+        }
+    }
+}


[flink] 01/05: [hotfix] Remove the initialCapacity param from the constructor of DualKeyLinkedMap

Posted by zh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

zhuzh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 331264ccf65d1887015667cb0c543a7c0fe862c4
Author: Zhu Zhu <re...@gmail.com>
AuthorDate: Fri Jul 8 10:52:57 2022 +0800

    [hotfix] Remove the initialCapacity param from the constructor of DualKeyLinkedMap
    
    The initial capacity of a map does not make much sense and is not necessarily needed currently.
---
 .../org/apache/flink/runtime/scheduler/SharedSlot.java   |  3 +--
 .../org/apache/flink/runtime/util/DualKeyLinkedMap.java  |  6 +++---
 .../apache/flink/runtime/util/DualKeyLinkedMapTest.java  | 16 +++++++---------
 3 files changed, 11 insertions(+), 14 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SharedSlot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SharedSlot.java
index 656ce2a6fac..d46c6a191cf 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SharedSlot.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SharedSlot.java
@@ -101,8 +101,7 @@ class SharedSlot implements SlotOwner, PhysicalSlot.Payload {
                                     "Unexpected physical slot payload assignment failure!");
                             return physicalSlot;
                         });
-        this.requestedLogicalSlots =
-                new DualKeyLinkedMap<>(executionSlotSharingGroup.getExecutionVertexIds().size());
+        this.requestedLogicalSlots = new DualKeyLinkedMap<>();
         this.slotWillBeOccupiedIndefinitely = slotWillBeOccupiedIndefinitely;
         this.externalReleaseCallback = externalReleaseCallback;
         this.state = State.ALLOCATED;
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/DualKeyLinkedMap.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/DualKeyLinkedMap.java
index 857d39571a4..63d39bd5746 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/DualKeyLinkedMap.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/DualKeyLinkedMap.java
@@ -50,9 +50,9 @@ public class DualKeyLinkedMap<A, B, V> {
 
     private Collection<V> values;
 
-    public DualKeyLinkedMap(int initialCapacity) {
-        this.aMap = new LinkedHashMap<>(initialCapacity);
-        this.bMap = new HashMap<>(initialCapacity);
+    public DualKeyLinkedMap() {
+        this.aMap = new LinkedHashMap<>();
+        this.bMap = new HashMap<>();
     }
 
     public int size() {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/util/DualKeyLinkedMapTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/util/DualKeyLinkedMapTest.java
index c60982f6306..cd506942189 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/util/DualKeyLinkedMapTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/util/DualKeyLinkedMapTest.java
@@ -39,17 +39,15 @@ public class DualKeyLinkedMapTest extends TestLogger {
     @Test
     public void testKeySets() {
         final Random random = new Random();
-        final int capacity = 10;
-        final Set<Tuple2<Integer, Integer>> keys = new HashSet<>(capacity);
+        final Set<Tuple2<Integer, Integer>> keys = new HashSet<>();
 
-        for (int i = 0; i < capacity; i++) {
+        for (int i = 0; i < 10; i++) {
             int keyA = random.nextInt();
             int keyB = random.nextInt();
             keys.add(Tuple2.of(keyA, keyB));
         }
 
-        final DualKeyLinkedMap<Integer, Integer, String> dualKeyMap =
-                new DualKeyLinkedMap<>(capacity);
+        final DualKeyLinkedMap<Integer, Integer, String> dualKeyMap = new DualKeyLinkedMap<>();
 
         for (Tuple2<Integer, Integer> key : keys) {
             dualKeyMap.put(key.f0, key.f1, "foobar");
@@ -65,7 +63,7 @@ public class DualKeyLinkedMapTest extends TestLogger {
 
     @Test
     public void ensuresOneToOneMappingBetweenKeysSamePrimaryKey() {
-        final DualKeyLinkedMap<Integer, Integer, String> map = new DualKeyLinkedMap<>(2);
+        final DualKeyLinkedMap<Integer, Integer, String> map = new DualKeyLinkedMap<>();
 
         final String secondValue = "barfoo";
         map.put(1, 1, "foobar");
@@ -78,7 +76,7 @@ public class DualKeyLinkedMapTest extends TestLogger {
 
     @Test
     public void ensuresOneToOneMappingBetweenKeysSameSecondaryKey() {
-        final DualKeyLinkedMap<Integer, Integer, String> map = new DualKeyLinkedMap<>(2);
+        final DualKeyLinkedMap<Integer, Integer, String> map = new DualKeyLinkedMap<>();
 
         final String secondValue = "barfoo";
         map.put(1, 1, "foobar");
@@ -91,7 +89,7 @@ public class DualKeyLinkedMapTest extends TestLogger {
 
     @Test
     public void testPrimaryKeyOrderIsNotAffectedIfReInsertedWithSameSecondaryKey() {
-        final DualKeyLinkedMap<Integer, Integer, String> map = new DualKeyLinkedMap<>(2);
+        final DualKeyLinkedMap<Integer, Integer, String> map = new DualKeyLinkedMap<>();
 
         final String value1 = "1";
         map.put(1, 1, value1);
@@ -106,7 +104,7 @@ public class DualKeyLinkedMapTest extends TestLogger {
 
     @Test
     public void testPrimaryKeyOrderIsNotAffectedIfReInsertedWithDifferentSecondaryKey() {
-        final DualKeyLinkedMap<Integer, Integer, String> map = new DualKeyLinkedMap<>(2);
+        final DualKeyLinkedMap<Integer, Integer, String> map = new DualKeyLinkedMap<>();
 
         final String value1 = "1";
         map.put(1, 1, value1);