You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by zh...@apache.org on 2022/06/24 07:25:47 UTC

[flink] branch master updated: [FLINK-28133][runtime] Rework DefaultScheduler to directly deploy executions

This is an automated email from the ASF dual-hosted git repository.

zhuzh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new c66101d5186 [FLINK-28133][runtime] Rework DefaultScheduler to directly deploy executions
c66101d5186 is described below

commit c66101d518615ec2d6a36c829568583a6a5d93f9
Author: Zhu Zhu <re...@gmail.com>
AuthorDate: Fri Jun 10 15:33:51 2022 +0800

    [FLINK-28133][runtime] Rework DefaultScheduler to directly deploy executions
    
    This closes #20039.
---
 .../flink/runtime/executiongraph/Execution.java    |   2 +-
 .../scheduler/DefaultExecutionDeployer.java        | 400 +++++++++++++++++++++
 ...ations.java => DefaultExecutionOperations.java} |  19 +-
 .../flink/runtime/scheduler/DefaultScheduler.java  | 315 ++--------------
 .../runtime/scheduler/DefaultSchedulerFactory.java |   7 +-
 .../flink/runtime/scheduler/DeploymentHandle.java  |  78 ----
 .../flink/runtime/scheduler/ExecutionDeployer.java |  81 +++++
 ...texOperations.java => ExecutionOperations.java} |  30 +-
 .../runtime/scheduler/ExecutionSlotAllocator.java  |  15 +-
 ...ssignment.java => ExecutionSlotAssignment.java} |  21 +-
 .../apache/flink/runtime/scheduler/SharedSlot.java |   2 +-
 .../SlotSharingExecutionSlotAllocator.java         |  61 +++-
 .../adaptivebatch/AdaptiveBatchScheduler.java      |  10 +-
 .../AdaptiveBatchSchedulerFactory.java             |   4 +-
 .../scheduler/DefaultExecutionDeployerTest.java    | 388 ++++++++++++++++++++
 .../runtime/scheduler/DefaultSchedulerTest.java    |  59 +--
 .../runtime/scheduler/DeploymentHandleTest.java    |  94 -----
 .../runtime/scheduler/SchedulerTestingUtils.java   |  24 +-
 .../SlotSharingExecutionSlotAllocatorTest.java     |  52 +--
 .../TestExecutionOperationsDecorator.java          | 156 ++++++++
 .../scheduler/TestExecutionSlotAllocator.java      | 110 +++---
 .../TestExecutionVertexOperationsDecorator.java    | 133 -------
 .../AdaptiveBatchSchedulerTestUtils.java           |   2 +-
 23 files changed, 1329 insertions(+), 734 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index b18f9b2fa23..79c2cb9602f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -888,7 +888,7 @@ public class Execution
      *
      * @param t The exception that caused the task to fail.
      */
-    void markFailed(Throwable t) {
+    public void markFailed(Throwable t) {
         processFail(t, false);
     }
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultExecutionDeployer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultExecutionDeployer.java
new file mode 100644
index 00000000000..e5b7d18b209
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultExecutionDeployer.java
@@ -0,0 +1,400 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
+import org.apache.flink.runtime.jobmaster.LogicalSlot;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.concurrent.FutureUtils;
+
+import org.slf4j.Logger;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.function.BiConsumer;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/** Default implementation of {@link ExecutionDeployer}. */
+public class DefaultExecutionDeployer implements ExecutionDeployer {
+
+    private final Logger log;
+
+    private final ExecutionSlotAllocator executionSlotAllocator;
+
+    private final ExecutionOperations executionOperations;
+
+    private final ExecutionVertexVersioner executionVertexVersioner;
+
+    private final Time partitionRegistrationTimeout;
+
+    private final Function<ExecutionAttemptID, Execution> executionRetriever;
+
+    private final BiConsumer<ExecutionVertexID, AllocationID> allocationReservationFunc;
+
+    private final ComponentMainThreadExecutor mainThreadExecutor;
+
+    private DefaultExecutionDeployer(
+            final Logger log,
+            final ExecutionSlotAllocator executionSlotAllocator,
+            final ExecutionOperations executionOperations,
+            final ExecutionVertexVersioner executionVertexVersioner,
+            final Time partitionRegistrationTimeout,
+            final Function<ExecutionAttemptID, Execution> executionRetriever,
+            final BiConsumer<ExecutionVertexID, AllocationID> allocationReservationFunc,
+            final ComponentMainThreadExecutor mainThreadExecutor) {
+
+        this.log = checkNotNull(log);
+        this.executionSlotAllocator = checkNotNull(executionSlotAllocator);
+        this.executionOperations = checkNotNull(executionOperations);
+        this.executionVertexVersioner = checkNotNull(executionVertexVersioner);
+        this.partitionRegistrationTimeout = checkNotNull(partitionRegistrationTimeout);
+        this.executionRetriever = checkNotNull(executionRetriever);
+        this.allocationReservationFunc = checkNotNull(allocationReservationFunc);
+        this.mainThreadExecutor = checkNotNull(mainThreadExecutor);
+    }
+
+    @Override
+    public void allocateSlotsAndDeploy(
+            final List<Execution> executionsToDeploy,
+            final Map<ExecutionVertexID, ExecutionVertexVersion> requiredVersionByVertex) {
+        validateExecutionStates(executionsToDeploy);
+
+        transitionToScheduled(executionsToDeploy);
+
+        final List<ExecutionSlotAssignment> executionSlotAssignments =
+                allocateSlotsFor(executionsToDeploy);
+
+        final List<ExecutionDeploymentHandle> deploymentHandles =
+                createDeploymentHandles(requiredVersionByVertex, executionSlotAssignments);
+
+        waitForAllSlotsAndDeploy(deploymentHandles);
+    }
+
+    private void validateExecutionStates(final Collection<Execution> executionsToDeploy) {
+        executionsToDeploy.forEach(
+                e ->
+                        checkState(
+                                e.getState() == ExecutionState.CREATED,
+                                "Expected execution %s to be in CREATED state, was: %s",
+                                e.getAttemptId(),
+                                e.getState()));
+    }
+
+    private void transitionToScheduled(final List<Execution> executionsToDeploy) {
+        executionsToDeploy.forEach(e -> e.transitionState(ExecutionState.SCHEDULED));
+    }
+
+    private List<ExecutionSlotAssignment> allocateSlotsFor(
+            final List<Execution> executionsToDeploy) {
+        final List<ExecutionAttemptID> executionAttemptIds =
+                executionsToDeploy.stream()
+                        .map(Execution::getAttemptId)
+                        .collect(Collectors.toList());
+        return executionSlotAllocator.allocateSlotsFor(executionAttemptIds);
+    }
+
+    private List<ExecutionDeploymentHandle> createDeploymentHandles(
+            final Map<ExecutionVertexID, ExecutionVertexVersion> requiredVersionByVertex,
+            final List<ExecutionSlotAssignment> executionSlotAssignments) {
+
+        return executionSlotAssignments.stream()
+                .map(
+                        executionSlotAssignment -> {
+                            final Execution execution =
+                                    getExecutionOrThrow(
+                                            executionSlotAssignment.getExecutionAttemptId());
+                            final ExecutionVertexID executionVertexId =
+                                    execution.getVertex().getID();
+                            return new ExecutionDeploymentHandle(
+                                    executionSlotAssignment,
+                                    requiredVersionByVertex.get(executionVertexId));
+                        })
+                .collect(Collectors.toList());
+    }
+
+    private void waitForAllSlotsAndDeploy(final List<ExecutionDeploymentHandle> deploymentHandles) {
+        FutureUtils.assertNoException(
+                assignAllResourcesAndRegisterProducedPartitions(deploymentHandles)
+                        .handle(deployAll(deploymentHandles)));
+    }
+
+    private CompletableFuture<Void> assignAllResourcesAndRegisterProducedPartitions(
+            final List<ExecutionDeploymentHandle> deploymentHandles) {
+        final List<CompletableFuture<Void>> resultFutures = new ArrayList<>();
+        for (ExecutionDeploymentHandle deploymentHandle : deploymentHandles) {
+            final CompletableFuture<Void> resultFuture =
+                    deploymentHandle
+                            .getLogicalSlotFuture()
+                            .handle(assignResource(deploymentHandle))
+                            .thenCompose(registerProducedPartitions(deploymentHandle))
+                            .handle(
+                                    (ignore, throwable) -> {
+                                        if (throwable != null) {
+                                            handleTaskDeploymentFailure(
+                                                    deploymentHandle.getExecutionAttemptId(),
+                                                    throwable);
+                                        }
+                                        return null;
+                                    });
+
+            resultFutures.add(resultFuture);
+        }
+        return FutureUtils.waitForAll(resultFutures);
+    }
+
+    private BiFunction<Void, Throwable, Void> deployAll(
+            final List<ExecutionDeploymentHandle> deploymentHandles) {
+        return (ignored, throwable) -> {
+            propagateIfNonNull(throwable);
+            for (final ExecutionDeploymentHandle deploymentHandle : deploymentHandles) {
+                final CompletableFuture<LogicalSlot> slotAssigned =
+                        deploymentHandle.getLogicalSlotFuture();
+                checkState(slotAssigned.isDone());
+
+                FutureUtils.assertNoException(
+                        slotAssigned.handle(deployOrHandleError(deploymentHandle)));
+            }
+            return null;
+        };
+    }
+
+    private static void propagateIfNonNull(final Throwable throwable) {
+        if (throwable != null) {
+            throw new CompletionException(throwable);
+        }
+    }
+
+    private BiFunction<LogicalSlot, Throwable, LogicalSlot> assignResource(
+            final ExecutionDeploymentHandle deploymentHandle) {
+
+        return (logicalSlot, throwable) -> {
+            final ExecutionVertexVersion requiredVertexVersion =
+                    deploymentHandle.getRequiredVertexVersion();
+            final Optional<Execution> optionalExecution =
+                    getExecution(deploymentHandle.getExecutionAttemptId());
+
+            if (!optionalExecution.isPresent()
+                    || optionalExecution.get().getState() != ExecutionState.SCHEDULED
+                    || executionVertexVersioner.isModified(requiredVertexVersion)) {
+                if (throwable == null) {
+                    log.debug(
+                            "Refusing to assign slot to execution {} because this deployment was "
+                                    + "superseded by another deployment",
+                            deploymentHandle.getExecutionAttemptId());
+                    releaseSlotIfPresent(logicalSlot);
+                }
+                return null;
+            }
+
+            // throw exception only if the execution version is not outdated.
+            // this ensures that canceling a pending slot request does not fail
+            // a task which is about to cancel.
+            if (throwable != null) {
+                throw new CompletionException(maybeWrapWithNoResourceAvailableException(throwable));
+            }
+
+            final Execution execution = optionalExecution.get();
+            if (!execution.tryAssignResource(logicalSlot)) {
+                throw new IllegalStateException(
+                        "Could not assign resource "
+                                + logicalSlot
+                                + " to execution "
+                                + execution
+                                + '.');
+            }
+
+            // We only reserve the latest execution of an execution vertex. Because it may cause
+            // problems to reserve multiple slots for one execution vertex. Besides that, slot
+            // reservation is for local recovery and therefore is only needed by streaming jobs, in
+            // which case an execution vertex will have one only current execution.
+            allocationReservationFunc.accept(
+                    execution.getAttemptId().getExecutionVertexId(), logicalSlot.getAllocationId());
+
+            return logicalSlot;
+        };
+    }
+
+    private static void releaseSlotIfPresent(@Nullable final LogicalSlot logicalSlot) {
+        if (logicalSlot != null) {
+            logicalSlot.releaseSlot(null);
+        }
+    }
+
+    private static Throwable maybeWrapWithNoResourceAvailableException(final Throwable failure) {
+        final Throwable strippedThrowable = ExceptionUtils.stripCompletionException(failure);
+        if (strippedThrowable instanceof TimeoutException) {
+            return new NoResourceAvailableException(
+                    "Could not allocate the required slot within slot request timeout. "
+                            + "Please make sure that the cluster has enough resources.",
+                    failure);
+        } else {
+            return failure;
+        }
+    }
+
+    private Function<LogicalSlot, CompletableFuture<Void>> registerProducedPartitions(
+            final ExecutionDeploymentHandle deploymentHandle) {
+
+        return logicalSlot -> {
+            // a null logicalSlot means the slot assignment is skipped, in which case
+            // the produced partition registration process can be skipped as well
+            if (logicalSlot != null) {
+                final Execution execution =
+                        getExecutionOrThrow(deploymentHandle.getExecutionAttemptId());
+                final CompletableFuture<Void> partitionRegistrationFuture =
+                        execution.registerProducedPartitions(logicalSlot.getTaskManagerLocation());
+
+                return FutureUtils.orTimeout(
+                        partitionRegistrationFuture,
+                        partitionRegistrationTimeout.toMilliseconds(),
+                        TimeUnit.MILLISECONDS,
+                        mainThreadExecutor);
+            } else {
+                return FutureUtils.completedVoidFuture();
+            }
+        };
+    }
+
+    private BiFunction<Object, Throwable, Void> deployOrHandleError(
+            final ExecutionDeploymentHandle deploymentHandle) {
+
+        return (ignored, throwable) -> {
+            final ExecutionVertexVersion requiredVertexVersion =
+                    deploymentHandle.getRequiredVertexVersion();
+            final Optional<Execution> optionalExecution =
+                    getExecution(deploymentHandle.getExecutionAttemptId());
+
+            if (!optionalExecution.isPresent()
+                    || optionalExecution.get().getState() != ExecutionState.SCHEDULED
+                    || executionVertexVersioner.isModified(requiredVertexVersion)) {
+                if (throwable == null) {
+                    log.debug(
+                            "Refusing to assign slot to execution {} because this deployment was "
+                                    + "superseded by another deployment",
+                            deploymentHandle.getExecutionAttemptId());
+                }
+                return null;
+            }
+
+            final Execution execution = optionalExecution.get();
+            if (throwable == null) {
+                deployTaskSafe(execution);
+            } else {
+                handleTaskDeploymentFailure(execution.getAttemptId(), throwable);
+            }
+            return null;
+        };
+    }
+
+    private void deployTaskSafe(final Execution execution) {
+        try {
+            executionOperations.deploy(execution);
+        } catch (Throwable e) {
+            handleTaskDeploymentFailure(execution.getAttemptId(), e);
+        }
+    }
+
+    private void handleTaskDeploymentFailure(
+            final ExecutionAttemptID executionAttemptId, final Throwable error) {
+
+        final Execution execution = getExecutionOrThrow(executionAttemptId);
+        executionOperations.markFailed(execution, error);
+    }
+
+    private Execution getExecutionOrThrow(ExecutionAttemptID executionAttemptId) {
+        return getExecution(executionAttemptId).get();
+    }
+
+    private Optional<Execution> getExecution(ExecutionAttemptID executionAttemptId) {
+        return Optional.ofNullable(executionRetriever.apply(executionAttemptId));
+    }
+
+    private static class ExecutionDeploymentHandle {
+
+        private final ExecutionSlotAssignment executionSlotAssignment;
+
+        private final ExecutionVertexVersion requiredVertexVersion;
+
+        ExecutionDeploymentHandle(
+                ExecutionSlotAssignment executionSlotAssignment,
+                final ExecutionVertexVersion requiredVertexVersion) {
+            this.executionSlotAssignment = checkNotNull(executionSlotAssignment);
+            this.requiredVertexVersion = checkNotNull(requiredVertexVersion);
+        }
+
+        ExecutionAttemptID getExecutionAttemptId() {
+            return executionSlotAssignment.getExecutionAttemptId();
+        }
+
+        CompletableFuture<LogicalSlot> getLogicalSlotFuture() {
+            return executionSlotAssignment.getLogicalSlotFuture();
+        }
+
+        ExecutionVertexVersion getRequiredVertexVersion() {
+            return requiredVertexVersion;
+        }
+    }
+
+    /** Factory to instantiate the {@link DefaultExecutionDeployer}. */
+    public static class Factory implements ExecutionDeployer.Factory {
+
+        @Override
+        public DefaultExecutionDeployer createInstance(
+                Logger log,
+                ExecutionSlotAllocator executionSlotAllocator,
+                ExecutionOperations executionOperations,
+                ExecutionVertexVersioner executionVertexVersioner,
+                Time partitionRegistrationTimeout,
+                Function<ExecutionAttemptID, Execution> executionRetriever,
+                BiConsumer<ExecutionVertexID, AllocationID> allocationReservationFunc,
+                ComponentMainThreadExecutor mainThreadExecutor) {
+            return new DefaultExecutionDeployer(
+                    log,
+                    executionSlotAllocator,
+                    executionOperations,
+                    executionVertexVersioner,
+                    partitionRegistrationTimeout,
+                    executionRetriever,
+                    allocationReservationFunc,
+                    mainThreadExecutor);
+        }
+    }
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultExecutionVertexOperations.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultExecutionOperations.java
similarity index 63%
rename from flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultExecutionVertexOperations.java
rename to flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultExecutionOperations.java
index d7454c830bd..4bfe8766688 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultExecutionVertexOperations.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultExecutionOperations.java
@@ -20,25 +20,26 @@
 package org.apache.flink.runtime.scheduler;
 
 import org.apache.flink.runtime.JobException;
-import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.executiongraph.Execution;
 
 import java.util.concurrent.CompletableFuture;
 
-/** Default implementation of {@link ExecutionVertexOperations}. */
-public class DefaultExecutionVertexOperations implements ExecutionVertexOperations {
+/** Default implementation of {@link ExecutionOperations}. */
+public class DefaultExecutionOperations implements ExecutionOperations {
 
     @Override
-    public void deploy(final ExecutionVertex executionVertex) throws JobException {
-        executionVertex.deploy();
+    public void deploy(Execution execution) throws JobException {
+        execution.deploy();
     }
 
     @Override
-    public CompletableFuture<?> cancel(final ExecutionVertex executionVertex) {
-        return executionVertex.cancel();
+    public CompletableFuture<?> cancel(Execution execution) {
+        execution.cancel();
+        return execution.getReleaseFuture();
     }
 
     @Override
-    public void markFailed(final ExecutionVertex executionVertex, final Throwable cause) {
-        executionVertex.markFailed(cause);
+    public void markFailed(Execution execution, Throwable cause) {
+        execution.markFailed(cause);
     }
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java
index e7869b6c9a9..081cfa26987 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java
@@ -28,6 +28,8 @@ import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
 import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
 import org.apache.flink.runtime.executiongraph.ExecutionVertex;
 import org.apache.flink.runtime.executiongraph.JobStatusListener;
@@ -41,9 +43,7 @@ import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
-import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
 import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
-import org.apache.flink.runtime.jobmaster.LogicalSlot;
 import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup;
 import org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder;
 import org.apache.flink.runtime.scheduler.exceptionhistory.FailureHandlingResultSnapshot;
@@ -54,7 +54,6 @@ import org.apache.flink.runtime.scheduler.strategy.SchedulingTopology;
 import org.apache.flink.runtime.shuffle.ShuffleMaster;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.topology.Vertex;
-import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.IterableUtils;
 import org.apache.flink.util.concurrent.FutureUtils;
 import org.apache.flink.util.concurrent.ScheduledExecutor;
@@ -63,7 +62,6 @@ import org.slf4j.Logger;
 
 import javax.annotation.Nullable;
 
-import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
@@ -73,17 +71,12 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CompletionException;
 import java.util.concurrent.Executor;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.function.BiFunction;
 import java.util.function.Consumer;
-import java.util.function.Function;
 import java.util.stream.Collectors;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
-import static org.apache.flink.util.Preconditions.checkState;
 
 /** The future default scheduler. */
 public class DefaultScheduler extends SchedulerBase implements SchedulerOperations {
@@ -100,14 +93,12 @@ public class DefaultScheduler extends SchedulerBase implements SchedulerOperatio
 
     protected final SchedulingStrategy schedulingStrategy;
 
-    private final ExecutionVertexOperations executionVertexOperations;
+    private final ExecutionOperations executionOperations;
 
     private final Set<ExecutionVertexID> verticesWaitingForRestart;
 
     private final ShuffleMaster<?> shuffleMaster;
 
-    private final Time rpcTimeout;
-
     private final Map<AllocationID, Long> reservedAllocationRefCounters;
 
     // once an execution vertex is assigned an allocation/slot, it will reserve the allocation
@@ -115,55 +106,7 @@ public class DefaultScheduler extends SchedulerBase implements SchedulerOperatio
     // anymore. The reserved allocation information is needed for local recovery.
     private final Map<ExecutionVertexID, AllocationID> reservedAllocationByExecutionVertex;
 
-    DefaultScheduler(
-            final Logger log,
-            final JobGraph jobGraph,
-            final Executor ioExecutor,
-            final Configuration jobMasterConfiguration,
-            final Consumer<ComponentMainThreadExecutor> startUpAction,
-            final ScheduledExecutor delayExecutor,
-            final ClassLoader userCodeLoader,
-            final CheckpointsCleaner checkpointsCleaner,
-            final CheckpointRecoveryFactory checkpointRecoveryFactory,
-            final JobManagerJobMetricGroup jobManagerJobMetricGroup,
-            final SchedulingStrategyFactory schedulingStrategyFactory,
-            final FailoverStrategy.Factory failoverStrategyFactory,
-            final RestartBackoffTimeStrategy restartBackoffTimeStrategy,
-            final ExecutionVertexOperations executionVertexOperations,
-            final ExecutionVertexVersioner executionVertexVersioner,
-            final ExecutionSlotAllocatorFactory executionSlotAllocatorFactory,
-            long initializationTimestamp,
-            final ComponentMainThreadExecutor mainThreadExecutor,
-            final JobStatusListener jobStatusListener,
-            final ExecutionGraphFactory executionGraphFactory,
-            final ShuffleMaster<?> shuffleMaster,
-            final Time rpcTimeout)
-            throws Exception {
-        this(
-                log,
-                jobGraph,
-                ioExecutor,
-                jobMasterConfiguration,
-                startUpAction,
-                delayExecutor,
-                userCodeLoader,
-                checkpointsCleaner,
-                checkpointRecoveryFactory,
-                jobManagerJobMetricGroup,
-                schedulingStrategyFactory,
-                failoverStrategyFactory,
-                restartBackoffTimeStrategy,
-                executionVertexOperations,
-                executionVertexVersioner,
-                executionSlotAllocatorFactory,
-                initializationTimestamp,
-                mainThreadExecutor,
-                jobStatusListener,
-                executionGraphFactory,
-                shuffleMaster,
-                rpcTimeout,
-                computeVertexParallelismStore(jobGraph));
-    }
+    private final ExecutionDeployer executionDeployer;
 
     protected DefaultScheduler(
             final Logger log,
@@ -179,7 +122,7 @@ public class DefaultScheduler extends SchedulerBase implements SchedulerOperatio
             final SchedulingStrategyFactory schedulingStrategyFactory,
             final FailoverStrategy.Factory failoverStrategyFactory,
             final RestartBackoffTimeStrategy restartBackoffTimeStrategy,
-            final ExecutionVertexOperations executionVertexOperations,
+            final ExecutionOperations executionOperations,
             final ExecutionVertexVersioner executionVertexVersioner,
             final ExecutionSlotAllocatorFactory executionSlotAllocatorFactory,
             long initializationTimestamp,
@@ -188,7 +131,8 @@ public class DefaultScheduler extends SchedulerBase implements SchedulerOperatio
             final ExecutionGraphFactory executionGraphFactory,
             final ShuffleMaster<?> shuffleMaster,
             final Time rpcTimeout,
-            final VertexParallelismStore vertexParallelismStore)
+            final VertexParallelismStore vertexParallelismStore,
+            final ExecutionDeployer.Factory executionDeployerFactory)
             throws Exception {
 
         super(
@@ -210,9 +154,8 @@ public class DefaultScheduler extends SchedulerBase implements SchedulerOperatio
 
         this.delayExecutor = checkNotNull(delayExecutor);
         this.userCodeLoader = checkNotNull(userCodeLoader);
-        this.executionVertexOperations = checkNotNull(executionVertexOperations);
+        this.executionOperations = checkNotNull(executionOperations);
         this.shuffleMaster = checkNotNull(shuffleMaster);
-        this.rpcTimeout = checkNotNull(rpcTimeout);
 
         this.reservedAllocationRefCounters = new HashMap<>();
         this.reservedAllocationByExecutionVertex = new HashMap<>();
@@ -238,6 +181,17 @@ public class DefaultScheduler extends SchedulerBase implements SchedulerOperatio
 
         this.verticesWaitingForRestart = new HashSet<>();
         startUpAction.accept(mainThreadExecutor);
+
+        this.executionDeployer =
+                executionDeployerFactory.createInstance(
+                        log,
+                        executionSlotAllocator,
+                        executionOperations,
+                        executionVertexVersioner,
+                        rpcTimeout,
+                        id -> getExecutionGraph().getRegisteredExecutions().get(id),
+                        this::startReserveAllocation,
+                        mainThreadExecutor);
     }
 
     // ------------------------------------------------------------------------
@@ -253,6 +207,7 @@ public class DefaultScheduler extends SchedulerBase implements SchedulerOperatio
     protected void cancelAllPendingSlotRequestsInternal() {
         IterableUtils.toStream(getSchedulingTopology().getVertices())
                 .map(Vertex::getId)
+                .map(this::getCurrentExecutionIdOfVertex)
                 .forEach(executionSlotAllocator::cancel);
     }
 
@@ -426,7 +381,9 @@ public class DefaultScheduler extends SchedulerBase implements SchedulerOperatio
     private CompletableFuture<?> cancelTasksAsync(final Set<ExecutionVertexID> verticesToRestart) {
         // clean up all the related pending requests to avoid that immediately returned slot
         // is used to fulfill the pending requests of these tasks
-        verticesToRestart.stream().forEach(executionSlotAllocator::cancel);
+        verticesToRestart.stream()
+                .map(this::getCurrentExecutionIdOfVertex)
+                .forEach(executionSlotAllocator::cancel);
 
         final List<CompletableFuture<?>> cancelFutures =
                 verticesToRestart.stream()
@@ -441,7 +398,11 @@ public class DefaultScheduler extends SchedulerBase implements SchedulerOperatio
 
         notifyCoordinatorOfCancellation(vertex);
 
-        return executionVertexOperations.cancel(vertex);
+        return executionOperations.cancel(vertex.getCurrentExecutionAttempt());
+    }
+
+    private ExecutionAttemptID getCurrentExecutionIdOfVertex(ExecutionVertexID executionVertexId) {
+        return getExecutionVertex(executionVertexId).getCurrentExecutionAttempt().getAttemptId();
     }
 
     // ------------------------------------------------------------------------
@@ -450,142 +411,16 @@ public class DefaultScheduler extends SchedulerBase implements SchedulerOperatio
 
     @Override
     public void allocateSlotsAndDeploy(final List<ExecutionVertexID> verticesToDeploy) {
-        validateDeploymentOptions(verticesToDeploy);
-
         final Map<ExecutionVertexID, ExecutionVertexVersion> requiredVersionByVertex =
                 executionVertexVersioner.recordVertexModifications(verticesToDeploy);
 
-        transitionToScheduled(verticesToDeploy);
-
-        final List<SlotExecutionVertexAssignment> slotExecutionVertexAssignments =
-                allocateSlots(verticesToDeploy);
-
-        final List<DeploymentHandle> deploymentHandles =
-                createDeploymentHandles(requiredVersionByVertex, slotExecutionVertexAssignments);
-
-        waitForAllSlotsAndDeploy(deploymentHandles);
-    }
-
-    private void validateDeploymentOptions(final Collection<ExecutionVertexID> verticesToDeploy) {
-        verticesToDeploy.stream()
-                .map(this::getExecutionVertex)
-                .forEach(
-                        v ->
-                                checkState(
-                                        v.getExecutionState() == ExecutionState.CREATED,
-                                        "expected vertex %s to be in CREATED state, was: %s",
-                                        v.getID(),
-                                        v.getExecutionState()));
-    }
-
-    private List<SlotExecutionVertexAssignment> allocateSlots(
-            final List<ExecutionVertexID> verticesToDeploy) {
-        return executionSlotAllocator.allocateSlotsFor(verticesToDeploy);
-    }
-
-    private static List<DeploymentHandle> createDeploymentHandles(
-            final Map<ExecutionVertexID, ExecutionVertexVersion> requiredVersionByVertex,
-            final List<SlotExecutionVertexAssignment> slotExecutionVertexAssignments) {
-
-        return slotExecutionVertexAssignments.stream()
-                .map(
-                        slotExecutionVertexAssignment -> {
-                            final ExecutionVertexID executionVertexId =
-                                    slotExecutionVertexAssignment.getExecutionVertexId();
-                            return new DeploymentHandle(
-                                    requiredVersionByVertex.get(executionVertexId),
-                                    slotExecutionVertexAssignment);
-                        })
-                .collect(Collectors.toList());
-    }
-
-    private void waitForAllSlotsAndDeploy(final List<DeploymentHandle> deploymentHandles) {
-        FutureUtils.assertNoException(
-                assignAllResourcesAndRegisterProducedPartitions(deploymentHandles)
-                        .handle(deployAll(deploymentHandles)));
-    }
-
-    private CompletableFuture<Void> assignAllResourcesAndRegisterProducedPartitions(
-            final List<DeploymentHandle> deploymentHandles) {
-        final List<CompletableFuture<Void>> resultFutures = new ArrayList<>();
-        for (DeploymentHandle deploymentHandle : deploymentHandles) {
-            final CompletableFuture<Void> resultFuture =
-                    deploymentHandle
-                            .getSlotExecutionVertexAssignment()
-                            .getLogicalSlotFuture()
-                            .handle(assignResource(deploymentHandle))
-                            .thenCompose(registerProducedPartitions(deploymentHandle))
-                            .handle(
-                                    (ignore, throwable) -> {
-                                        if (throwable != null) {
-                                            handleTaskDeploymentFailure(
-                                                    deploymentHandle.getExecutionVertexId(),
-                                                    throwable);
-                                        }
-                                        return null;
-                                    });
-
-            resultFutures.add(resultFuture);
-        }
-        return FutureUtils.waitForAll(resultFutures);
-    }
-
-    private BiFunction<Void, Throwable, Void> deployAll(
-            final List<DeploymentHandle> deploymentHandles) {
-        return (ignored, throwable) -> {
-            propagateIfNonNull(throwable);
-            for (final DeploymentHandle deploymentHandle : deploymentHandles) {
-                final SlotExecutionVertexAssignment slotExecutionVertexAssignment =
-                        deploymentHandle.getSlotExecutionVertexAssignment();
-                final CompletableFuture<LogicalSlot> slotAssigned =
-                        slotExecutionVertexAssignment.getLogicalSlotFuture();
-                checkState(slotAssigned.isDone());
-
-                FutureUtils.assertNoException(
-                        slotAssigned.handle(deployOrHandleError(deploymentHandle)));
-            }
-            return null;
-        };
-    }
-
-    private static void propagateIfNonNull(final Throwable throwable) {
-        if (throwable != null) {
-            throw new CompletionException(throwable);
-        }
-    }
+        final List<Execution> executionsToDeploy =
+                verticesToDeploy.stream()
+                        .map(this::getExecutionVertex)
+                        .map(ExecutionVertex::getCurrentExecutionAttempt)
+                        .collect(Collectors.toList());
 
-    private BiFunction<LogicalSlot, Throwable, LogicalSlot> assignResource(
-            final DeploymentHandle deploymentHandle) {
-        final ExecutionVertexVersion requiredVertexVersion =
-                deploymentHandle.getRequiredVertexVersion();
-        final ExecutionVertexID executionVertexId = deploymentHandle.getExecutionVertexId();
-
-        return (logicalSlot, throwable) -> {
-            if (executionVertexVersioner.isModified(requiredVertexVersion)) {
-                if (throwable == null) {
-                    log.debug(
-                            "Refusing to assign slot to execution vertex {} because this deployment was "
-                                    + "superseded by another deployment",
-                            executionVertexId);
-                    releaseSlotIfPresent(logicalSlot);
-                }
-                return null;
-            }
-
-            // throw exception only if the execution version is not outdated.
-            // this ensures that canceling a pending slot request does not fail
-            // a task which is about to cancel in #restartTasksWithDelay(...)
-            if (throwable != null) {
-                throw new CompletionException(maybeWrapWithNoResourceAvailableException(throwable));
-            }
-
-            final ExecutionVertex executionVertex = getExecutionVertex(executionVertexId);
-            executionVertex.tryAssignResource(logicalSlot);
-
-            startReserveAllocation(executionVertexId, logicalSlot.getAllocationId());
-
-            return logicalSlot;
-        };
+        executionDeployer.allocateSlotsAndDeploy(executionsToDeploy, requiredVersionByVertex);
     }
 
     private void startReserveAllocation(
@@ -608,88 +443,6 @@ public class DefaultScheduler extends SchedulerBase implements SchedulerOperatio
         }
     }
 
-    private Function<LogicalSlot, CompletableFuture<Void>> registerProducedPartitions(
-            final DeploymentHandle deploymentHandle) {
-        final ExecutionVertexID executionVertexId = deploymentHandle.getExecutionVertexId();
-
-        return logicalSlot -> {
-            // a null logicalSlot means the slot assignment is skipped, in which case
-            // the produced partition registration process can be skipped as well
-            if (logicalSlot != null) {
-                final ExecutionVertex executionVertex = getExecutionVertex(executionVertexId);
-
-                final CompletableFuture<Void> partitionRegistrationFuture =
-                        executionVertex
-                                .getCurrentExecutionAttempt()
-                                .registerProducedPartitions(logicalSlot.getTaskManagerLocation());
-
-                return FutureUtils.orTimeout(
-                        partitionRegistrationFuture,
-                        rpcTimeout.toMilliseconds(),
-                        TimeUnit.MILLISECONDS,
-                        getMainThreadExecutor());
-            } else {
-                return FutureUtils.completedVoidFuture();
-            }
-        };
-    }
-
-    private void releaseSlotIfPresent(@Nullable final LogicalSlot logicalSlot) {
-        if (logicalSlot != null) {
-            logicalSlot.releaseSlot(null);
-        }
-    }
-
-    private void handleTaskDeploymentFailure(
-            final ExecutionVertexID executionVertexId, final Throwable error) {
-        executionVertexOperations.markFailed(getExecutionVertex(executionVertexId), error);
-    }
-
-    private static Throwable maybeWrapWithNoResourceAvailableException(final Throwable failure) {
-        final Throwable strippedThrowable = ExceptionUtils.stripCompletionException(failure);
-        if (strippedThrowable instanceof TimeoutException) {
-            return new NoResourceAvailableException(
-                    "Could not allocate the required slot within slot request timeout. "
-                            + "Please make sure that the cluster has enough resources.",
-                    failure);
-        } else {
-            return failure;
-        }
-    }
-
-    private BiFunction<Object, Throwable, Void> deployOrHandleError(
-            final DeploymentHandle deploymentHandle) {
-        final ExecutionVertexVersion requiredVertexVersion =
-                deploymentHandle.getRequiredVertexVersion();
-        final ExecutionVertexID executionVertexId = requiredVertexVersion.getExecutionVertexId();
-
-        return (ignored, throwable) -> {
-            if (executionVertexVersioner.isModified(requiredVertexVersion)) {
-                log.debug(
-                        "Refusing to deploy execution vertex {} because this deployment was "
-                                + "superseded by another deployment",
-                        executionVertexId);
-                return null;
-            }
-
-            if (throwable == null) {
-                deployTaskSafe(executionVertexId);
-            } else {
-                handleTaskDeploymentFailure(executionVertexId, throwable);
-            }
-            return null;
-        };
-    }
-
-    private void deployTaskSafe(final ExecutionVertexID executionVertexId) {
-        try {
-            final ExecutionVertex executionVertex = getExecutionVertex(executionVertexId);
-            executionVertexOperations.deploy(executionVertex);
-        } catch (Throwable e) {
-            handleTaskDeploymentFailure(executionVertexId, e);
-        }
-    }
-
     private void notifyCoordinatorOfCancellation(ExecutionVertex vertex) {
         // this method makes a best effort to filter out duplicate notifications, meaning cases
         // where
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultSchedulerFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultSchedulerFactory.java
index fda0d8a7928..de7d56315bd 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultSchedulerFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultSchedulerFactory.java
@@ -47,6 +47,7 @@ import java.util.concurrent.Executor;
 import java.util.concurrent.ScheduledExecutorService;
 
 import static org.apache.flink.runtime.scheduler.DefaultSchedulerComponents.createSchedulerComponents;
+import static org.apache.flink.runtime.scheduler.SchedulerBase.computeVertexParallelismStore;
 
 /** Factory for {@link DefaultScheduler}. */
 public class DefaultSchedulerFactory implements SchedulerNGFactory {
@@ -130,7 +131,7 @@ public class DefaultSchedulerFactory implements SchedulerNGFactory {
                 schedulerComponents.getSchedulingStrategyFactory(),
                 FailoverStrategyFactoryLoader.loadFailoverStrategyFactory(jobMasterConfiguration),
                 restartBackoffTimeStrategy,
-                new DefaultExecutionVertexOperations(),
+                new DefaultExecutionOperations(),
                 new ExecutionVertexVersioner(),
                 schedulerComponents.getAllocatorFactory(),
                 initializationTimestamp,
@@ -145,7 +146,9 @@ public class DefaultSchedulerFactory implements SchedulerNGFactory {
                 },
                 executionGraphFactory,
                 shuffleMaster,
-                rpcTimeout);
+                rpcTimeout,
+                computeVertexParallelismStore(jobGraph),
+                new DefaultExecutionDeployer.Factory());
     }
 
     @Override
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DeploymentHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DeploymentHandle.java
deleted file mode 100644
index 4724c37bd6d..00000000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DeploymentHandle.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.flink.runtime.scheduler;
-
-import org.apache.flink.runtime.executiongraph.ExecutionVertex;
-import org.apache.flink.runtime.jobmaster.LogicalSlot;
-import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
-import org.apache.flink.util.Preconditions;
-
-import java.util.Optional;
-import java.util.concurrent.CompletableFuture;
-
-/**
- * This class is a tuple holding the information necessary to deploy an {@link ExecutionVertex}.
- *
- * <p>The tuple consists of:
- *
- * <ul>
- *   <li>{@link ExecutionVertexVersion}
- *   <li>{@link SlotExecutionVertexAssignment}
- * </ul>
- */
-class DeploymentHandle {
-
-    private final ExecutionVertexVersion requiredVertexVersion;
-
-    private final SlotExecutionVertexAssignment slotExecutionVertexAssignment;
-
-    public DeploymentHandle(
-            final ExecutionVertexVersion requiredVertexVersion,
-            final SlotExecutionVertexAssignment slotExecutionVertexAssignment) {
-
-        this.requiredVertexVersion = Preconditions.checkNotNull(requiredVertexVersion);
-        this.slotExecutionVertexAssignment =
-                Preconditions.checkNotNull(slotExecutionVertexAssignment);
-    }
-
-    public ExecutionVertexID getExecutionVertexId() {
-        return requiredVertexVersion.getExecutionVertexId();
-    }
-
-    public ExecutionVertexVersion getRequiredVertexVersion() {
-        return requiredVertexVersion;
-    }
-
-    public SlotExecutionVertexAssignment getSlotExecutionVertexAssignment() {
-        return slotExecutionVertexAssignment;
-    }
-
-    public Optional<LogicalSlot> getLogicalSlot() {
-        final CompletableFuture<LogicalSlot> logicalSlotFuture =
-                slotExecutionVertexAssignment.getLogicalSlotFuture();
-        Preconditions.checkState(
-                logicalSlotFuture.isDone(), "method can only be called after slot future is done");
-
-        if (logicalSlotFuture.isCompletedExceptionally()) {
-            return Optional.empty();
-        }
-        return Optional.ofNullable(logicalSlotFuture.getNow(null));
-    }
-}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionDeployer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionDeployer.java
new file mode 100644
index 00000000000..4fc979e80ba
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionDeployer.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+
+import org.slf4j.Logger;
+
+import java.util.List;
+import java.util.Map;
+import java.util.function.BiConsumer;
+import java.util.function.Function;
+
+/** This deployer is responsible for deploying executions. */
+interface ExecutionDeployer {
+
+    /**
+     * Allocate slots and deploy executions.
+     *
+     * @param executionsToDeploy executions to deploy
+     * @param requiredVersionByVertex required versions of the execution vertices. If the actual
+     *     version does not match, the deployment of the execution will be rejected.
+     */
+    void allocateSlotsAndDeploy(
+            final List<Execution> executionsToDeploy,
+            final Map<ExecutionVertexID, ExecutionVertexVersion> requiredVersionByVertex);
+
+    /** Factory to instantiate the {@link ExecutionDeployer}. */
+    interface Factory {
+
+        /**
+         * Instantiate an {@link ExecutionDeployer} with the given params. Note that the version of
+         * an execution vertex will be recorded before scheduling executions for it. The version may
+         * change if a global failure happens, or if the job is canceled, or if the execution vertex
+         * is restarted when all its current execution are FAILED/CANCELED. Once the version is
+         * changed, the previously triggered execution deployment will be skipped.
+         *
+         * @param log the logger
+         * @param executionSlotAllocator the allocator to allocate slots
+         * @param executionOperations the operations of executions
+         * @param executionVertexVersioner the versioner which records the versions of execution
+         *     vertices.
+         * @param partitionRegistrationTimeout timeout of partition registration
+         * @param executionRetriever retriever to get executions
+         * @param allocationReservationFunc function to reserve allocations for local recovery
+         * @param mainThreadExecutor the main thread executor
+         * @return an instantiated {@link ExecutionDeployer}
+         */
+        ExecutionDeployer createInstance(
+                final Logger log,
+                final ExecutionSlotAllocator executionSlotAllocator,
+                final ExecutionOperations executionOperations,
+                final ExecutionVertexVersioner executionVertexVersioner,
+                final Time partitionRegistrationTimeout,
+                final Function<ExecutionAttemptID, Execution> executionRetriever,
+                final BiConsumer<ExecutionVertexID, AllocationID> allocationReservationFunc,
+                final ComponentMainThreadExecutor mainThreadExecutor);
+    }
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionVertexOperations.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionOperations.java
similarity index 54%
rename from flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionVertexOperations.java
rename to flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionOperations.java
index 11cd1c9b378..6f8581dc7df 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionVertexOperations.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionOperations.java
@@ -20,16 +20,34 @@
 package org.apache.flink.runtime.scheduler;
 
 import org.apache.flink.runtime.JobException;
-import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.executiongraph.Execution;
 
 import java.util.concurrent.CompletableFuture;
 
-/** Operations on the {@link ExecutionVertex}. */
-public interface ExecutionVertexOperations {
+/** Operations on the {@link Execution}. */
+public interface ExecutionOperations {
 
-    void deploy(ExecutionVertex executionVertex) throws JobException;
+    /**
+     * Deploy the execution.
+     *
+     * @param execution to deploy.
+     * @throws JobException if the execution cannot be deployed to the assigned resource
+     */
+    void deploy(Execution execution) throws JobException;
 
-    CompletableFuture<?> cancel(ExecutionVertex executionVertex);
+    /**
+     * Cancel the execution.
+     *
+     * @param execution to cancel
+     * @return Future which completes when the cancellation is done
+     */
+    CompletableFuture<?> cancel(Execution execution);
 
-    void markFailed(ExecutionVertex executionVertex, Throwable cause);
+    /**
+     * Mark the execution as FAILED.
+     *
+     * @param execution to mark as failed.
+     * @param cause of the execution failure
+     */
+    void markFailed(Execution execution, Throwable cause);
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionSlotAllocator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionSlotAllocator.java
index 5c4c9e3ccbf..526ebae99a4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionSlotAllocator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionSlotAllocator.java
@@ -19,7 +19,7 @@
 package org.apache.flink.runtime.scheduler;
 
 import org.apache.flink.runtime.executiongraph.Execution;
-import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 
 import java.util.List;
 
@@ -29,15 +29,16 @@ public interface ExecutionSlotAllocator {
     /**
      * Allocate slots for the given executions.
      *
-     * @param executionVertexIds Execution vertices to allocate slots for
+     * @param executionAttemptIds executions to allocate slots for
+     * @return List of slot assignments to the executions
      */
-    List<SlotExecutionVertexAssignment> allocateSlotsFor(
-            List<ExecutionVertexID> executionVertexIds);
+    List<ExecutionSlotAssignment> allocateSlotsFor(List<ExecutionAttemptID> executionAttemptIds);
 
     /**
-     * Cancel an ongoing slot request.
+     * Cancel the ongoing slot request of the given {@link Execution}.
      *
-     * @param executionVertexId identifying which slot request should be canceled.
+     * @param executionAttemptId identifying the {@link Execution} of which the slot request should
+     *     be canceled.
      */
-    void cancel(ExecutionVertexID executionVertexId);
+    void cancel(ExecutionAttemptID executionAttemptId);
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SlotExecutionVertexAssignment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionSlotAssignment.java
similarity index 69%
rename from flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SlotExecutionVertexAssignment.java
rename to flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionSlotAssignment.java
index d442a41a6de..435724bf8f9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SlotExecutionVertexAssignment.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionSlotAssignment.java
@@ -18,29 +18,30 @@
 
 package org.apache.flink.runtime.scheduler;
 
-import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.jobmaster.LogicalSlot;
-import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
 
 import java.util.concurrent.CompletableFuture;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
-/** The slot assignment for a {@link ExecutionVertex}. */
-class SlotExecutionVertexAssignment {
+/** The slot assignment for an {@link Execution}. */
+class ExecutionSlotAssignment {
 
-    private final ExecutionVertexID executionVertexId;
+    private final ExecutionAttemptID executionAttemptId;
 
     private final CompletableFuture<LogicalSlot> logicalSlotFuture;
 
-    SlotExecutionVertexAssignment(
-            ExecutionVertexID executionVertexId, CompletableFuture<LogicalSlot> logicalSlotFuture) {
-        this.executionVertexId = checkNotNull(executionVertexId);
+    ExecutionSlotAssignment(
+            ExecutionAttemptID executionAttemptId,
+            CompletableFuture<LogicalSlot> logicalSlotFuture) {
+        this.executionAttemptId = checkNotNull(executionAttemptId);
         this.logicalSlotFuture = checkNotNull(logicalSlotFuture);
     }
 
-    ExecutionVertexID getExecutionVertexId() {
-        return executionVertexId;
+    ExecutionAttemptID getExecutionAttemptId() {
+        return executionAttemptId;
     }
 
     CompletableFuture<LogicalSlot> getLogicalSlotFuture() {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SharedSlot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SharedSlot.java
index 24f0444df7d..656ce2a6fac 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SharedSlot.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SharedSlot.java
@@ -223,7 +223,7 @@ class SharedSlot implements SlotOwner, PhysicalSlot.Payload {
             }
         } else {
             LOG.debug(
-                    "No SlotExecutionVertexAssignment for logical {} from physical {}}",
+                    "No request for logical {} from physical {}}",
                     logicalSlotRequestId,
                     physicalSlotRequestId);
         }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SlotSharingExecutionSlotAllocator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SlotSharingExecutionSlotAllocator.java
index 0b36a4aa35d..67c8b3bb970 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SlotSharingExecutionSlotAllocator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SlotSharingExecutionSlotAllocator.java
@@ -21,6 +21,8 @@ package org.apache.flink.runtime.scheduler;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.clusterframework.types.SlotProfile;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
 import org.apache.flink.runtime.jobmaster.LogicalSlot;
 import org.apache.flink.runtime.jobmaster.SlotRequestId;
 import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlot;
@@ -45,6 +47,7 @@ import java.util.function.Function;
 import java.util.stream.Collectors;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
 
 /**
  * Allocates {@link LogicalSlot}s from physical shared slots.
@@ -94,6 +97,34 @@ class SlotSharingExecutionSlotAllocator implements ExecutionSlotAllocator {
         this.sharedSlots = new IdentityHashMap<>();
     }
 
+    @Override
+    public List<ExecutionSlotAssignment> allocateSlotsFor(
+            List<ExecutionAttemptID> executionAttemptIds) {
+
+        final Map<ExecutionVertexID, ExecutionAttemptID> vertexIdToExecutionId = new HashMap<>();
+        executionAttemptIds.forEach(
+                executionId ->
+                        vertexIdToExecutionId.put(executionId.getExecutionVertexId(), executionId));
+
+        checkState(
+                vertexIdToExecutionId.size() == executionAttemptIds.size(),
+                "SlotSharingExecutionSlotAllocator does not support one execution vertex to have multiple concurrent executions");
+
+        final List<ExecutionVertexID> vertexIds =
+                executionAttemptIds.stream()
+                        .map(ExecutionAttemptID::getExecutionVertexId)
+                        .collect(Collectors.toList());
+
+        return allocateSlotsForVertices(vertexIds).stream()
+                .map(
+                        vertexAssignment ->
+                                new ExecutionSlotAssignment(
+                                        vertexIdToExecutionId.get(
+                                                vertexAssignment.getExecutionVertexId()),
+                                        vertexAssignment.getLogicalSlotFuture()))
+                .collect(Collectors.toList());
+    }
+
     /**
      * Creates logical {@link SlotExecutionVertexAssignment}s from physical shared slots.
      *
@@ -117,8 +148,7 @@ class SlotSharingExecutionSlotAllocator implements ExecutionSlotAllocator {
      *
      * @param executionVertexIds Execution vertices to allocate slots for
      */
-    @Override
-    public List<SlotExecutionVertexAssignment> allocateSlotsFor(
+    private List<SlotExecutionVertexAssignment> allocateSlotsForVertices(
             List<ExecutionVertexID> executionVertexIds) {
 
         SharedSlotProfileRetriever sharedSlotProfileRetriever =
@@ -149,8 +179,8 @@ class SlotSharingExecutionSlotAllocator implements ExecutionSlotAllocator {
     }
 
     @Override
-    public void cancel(ExecutionVertexID executionVertexId) {
-        cancelLogicalSlotRequest(executionVertexId, null);
+    public void cancel(ExecutionAttemptID executionAttemptId) {
+        cancelLogicalSlotRequest(executionAttemptId.getExecutionVertexId(), null);
     }
 
     private void cancelLogicalSlotRequest(ExecutionVertexID executionVertexId, Throwable cause) {
@@ -284,4 +314,27 @@ class SlotSharingExecutionSlotAllocator implements ExecutionSlotAllocator {
                     });
         }
     }
+
+    /** The slot assignment for an {@link ExecutionVertex}. */
+    private static class SlotExecutionVertexAssignment {
+
+        private final ExecutionVertexID executionVertexId;
+
+        private final CompletableFuture<LogicalSlot> logicalSlotFuture;
+
+        SlotExecutionVertexAssignment(
+                ExecutionVertexID executionVertexId,
+                CompletableFuture<LogicalSlot> logicalSlotFuture) {
+            this.executionVertexId = checkNotNull(executionVertexId);
+            this.logicalSlotFuture = checkNotNull(logicalSlotFuture);
+        }
+
+        ExecutionVertexID getExecutionVertexId() {
+            return executionVertexId;
+        }
+
+        CompletableFuture<LogicalSlot> getLogicalSlotFuture() {
+            return logicalSlotFuture;
+        }
+    }
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchScheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchScheduler.java
index 2c99ecc9c19..6507774b3e6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchScheduler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchScheduler.java
@@ -41,10 +41,11 @@ import org.apache.flink.runtime.jobgraph.topology.DefaultLogicalResult;
 import org.apache.flink.runtime.jobgraph.topology.DefaultLogicalTopology;
 import org.apache.flink.runtime.jobgraph.topology.DefaultLogicalVertex;
 import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup;
+import org.apache.flink.runtime.scheduler.DefaultExecutionDeployer;
 import org.apache.flink.runtime.scheduler.DefaultScheduler;
 import org.apache.flink.runtime.scheduler.ExecutionGraphFactory;
+import org.apache.flink.runtime.scheduler.ExecutionOperations;
 import org.apache.flink.runtime.scheduler.ExecutionSlotAllocatorFactory;
-import org.apache.flink.runtime.scheduler.ExecutionVertexOperations;
 import org.apache.flink.runtime.scheduler.ExecutionVertexVersioner;
 import org.apache.flink.runtime.scheduler.SchedulerOperations;
 import org.apache.flink.runtime.scheduler.VertexParallelismStore;
@@ -93,7 +94,7 @@ public class AdaptiveBatchScheduler extends DefaultScheduler implements Schedule
             final SchedulingStrategyFactory schedulingStrategyFactory,
             final FailoverStrategy.Factory failoverStrategyFactory,
             final RestartBackoffTimeStrategy restartBackoffTimeStrategy,
-            final ExecutionVertexOperations executionVertexOperations,
+            final ExecutionOperations executionOperations,
             final ExecutionVertexVersioner executionVertexVersioner,
             final ExecutionSlotAllocatorFactory executionSlotAllocatorFactory,
             long initializationTimestamp,
@@ -120,7 +121,7 @@ public class AdaptiveBatchScheduler extends DefaultScheduler implements Schedule
                 schedulingStrategyFactory,
                 failoverStrategyFactory,
                 restartBackoffTimeStrategy,
-                executionVertexOperations,
+                executionOperations,
                 executionVertexVersioner,
                 executionSlotAllocatorFactory,
                 initializationTimestamp,
@@ -130,7 +131,8 @@ public class AdaptiveBatchScheduler extends DefaultScheduler implements Schedule
                 shuffleMaster,
                 rpcTimeout,
                 computeVertexParallelismStoreForDynamicGraph(
-                        jobGraph.getVertices(), defaultMaxParallelism));
+                        jobGraph.getVertices(), defaultMaxParallelism),
+                new DefaultExecutionDeployer.Factory());
 
         this.logicalTopology = DefaultLogicalTopology.fromJobGraph(jobGraph);
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchSchedulerFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchSchedulerFactory.java
index 390273534a3..0cc501203b8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchSchedulerFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchSchedulerFactory.java
@@ -48,7 +48,7 @@ import org.apache.flink.runtime.jobmaster.slotpool.SlotSelectionStrategy;
 import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.scheduler.DefaultExecutionGraphFactory;
-import org.apache.flink.runtime.scheduler.DefaultExecutionVertexOperations;
+import org.apache.flink.runtime.scheduler.DefaultExecutionOperations;
 import org.apache.flink.runtime.scheduler.ExecutionGraphFactory;
 import org.apache.flink.runtime.scheduler.ExecutionSlotAllocatorFactory;
 import org.apache.flink.runtime.scheduler.ExecutionVertexVersioner;
@@ -161,7 +161,7 @@ public class AdaptiveBatchSchedulerFactory implements SchedulerNGFactory {
                 new VertexwiseSchedulingStrategy.Factory(),
                 FailoverStrategyFactoryLoader.loadFailoverStrategyFactory(jobMasterConfiguration),
                 restartBackoffTimeStrategy,
-                new DefaultExecutionVertexOperations(),
+                new DefaultExecutionOperations(),
                 new ExecutionVertexVersioner(),
                 allocatorFactory,
                 initializationTimestamp,
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultExecutionDeployerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultExecutionDeployerTest.java
new file mode 100644
index 00000000000..96c89093b92
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultExecutionDeployerTest.java
@@ -0,0 +1,388 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.executiongraph.TestingDefaultExecutionGraphBuilder;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.io.network.partition.TestingJobMasterPartitionTracker;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.shuffle.TestingShuffleMaster;
+import org.apache.flink.runtime.testtasks.NoOpInvokable;
+import org.apache.flink.util.ExecutorUtils;
+import org.apache.flink.util.IterableUtils;
+import org.apache.flink.util.TestLoggerExtension;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Tests for {@link DefaultExecutionDeployer}. */
+@ExtendWith(TestLoggerExtension.class)
+class DefaultExecutionDeployerTest {
+
+    private ScheduledExecutorService executor;
+    private ComponentMainThreadExecutor mainThreadExecutor;
+    private TestExecutionOperationsDecorator testExecutionOperations;
+    private ExecutionVertexVersioner executionVertexVersioner;
+    private TestExecutionSlotAllocator testExecutionSlotAllocator;
+    private TestingShuffleMaster shuffleMaster;
+    private TestingJobMasterPartitionTracker partitionTracker;
+    private Time partitionRegistrationTimeout;
+
+    @BeforeEach
+    void setUp() {
+        executor = Executors.newSingleThreadScheduledExecutor();
+        mainThreadExecutor = ComponentMainThreadExecutorServiceAdapter.forMainThread();
+        testExecutionOperations =
+                new TestExecutionOperationsDecorator(
+                        new ExecutionOperations() {
+                            @Override
+                            public void deploy(Execution execution) {}
+
+                            @Override
+                            public CompletableFuture<?> cancel(Execution execution) {
+                                return null;
+                            }
+
+                            @Override
+                            public void markFailed(Execution execution, Throwable cause) {}
+                        });
+        executionVertexVersioner = new ExecutionVertexVersioner();
+        testExecutionSlotAllocator = new TestExecutionSlotAllocator();
+        shuffleMaster = new TestingShuffleMaster();
+        partitionTracker = new TestingJobMasterPartitionTracker();
+        partitionRegistrationTimeout = Time.milliseconds(5000);
+    }
+
+    @AfterEach
+    void tearDown() {
+        if (executor != null) {
+            ExecutorUtils.gracefulShutdown(5, TimeUnit.SECONDS, executor);
+        }
+    }
+
+    @Test
+    void testDeployTasks() throws Exception {
+        final JobGraph jobGraph = singleNonParallelJobVertexJobGraph();
+        final ExecutionGraph executionGraph = createExecutionGraph(jobGraph);
+        final ExecutionDeployer executionDeployer = createExecutionDeployer(executionGraph);
+
+        deployTasks(executionDeployer, executionGraph);
+
+        assertThat(testExecutionOperations.getDeployedExecutions())
+                .containsExactly(getAnyExecution(executionGraph).getAttemptId());
+    }
+
+    @Test
+    void testDeployTasksOnlyIfAllSlotRequestsAreFulfilled() throws Exception {
+        final JobGraph jobGraph = singleJobVertexJobGraph(4);
+        final ExecutionGraph executionGraph = createExecutionGraph(jobGraph);
+        final ExecutionDeployer executionDeployer = createExecutionDeployer(executionGraph);
+
+        testExecutionSlotAllocator.disableAutoCompletePendingRequests();
+
+        deployTasks(executionDeployer, executionGraph);
+
+        assertThat(testExecutionOperations.getDeployedExecutions()).isEmpty();
+
+        final ExecutionAttemptID attemptId = getAnyExecution(executionGraph).getAttemptId();
+        testExecutionSlotAllocator.completePendingRequest(attemptId);
+        assertThat(testExecutionOperations.getDeployedExecutions()).isEmpty();
+
+        testExecutionSlotAllocator.completePendingRequests();
+        assertThat(testExecutionOperations.getDeployedExecutions()).hasSize(4);
+    }
+
+    @Test
+    void testDeploymentFailures() throws Exception {
+        final JobGraph jobGraph = singleNonParallelJobVertexJobGraph();
+
+        testExecutionOperations.enableFailDeploy();
+
+        final ExecutionGraph executionGraph = createExecutionGraph(jobGraph);
+        final ExecutionDeployer executionDeployer = createExecutionDeployer(executionGraph);
+        deployTasks(executionDeployer, executionGraph);
+
+        assertThat(testExecutionOperations.getFailedExecutions())
+                .containsExactly(getAnyExecution(executionGraph).getAttemptId());
+    }
+
+    @Test
+    void testSlotAllocationTimeout() throws Exception {
+        final JobGraph jobGraph = singleJobVertexJobGraph(2);
+
+        testExecutionSlotAllocator.disableAutoCompletePendingRequests();
+
+        final ExecutionGraph executionGraph = createExecutionGraph(jobGraph);
+        final ExecutionDeployer executionDeployer = createExecutionDeployer(executionGraph);
+        deployTasks(executionDeployer, executionGraph);
+
+        assertThat(testExecutionSlotAllocator.getPendingRequests()).hasSize(2);
+
+        final ExecutionAttemptID attemptId = getAnyExecution(executionGraph).getAttemptId();
+        testExecutionSlotAllocator.timeoutPendingRequest(attemptId);
+
+        assertThat(testExecutionOperations.getFailedExecutions()).containsExactly(attemptId);
+    }
+
+    @Test
+    void testSkipDeploymentIfVertexVersionOutdated() throws Exception {
+        final JobGraph jobGraph = singleNonParallelJobVertexJobGraph();
+
+        testExecutionSlotAllocator.disableAutoCompletePendingRequests();
+
+        final ExecutionGraph executionGraph = createExecutionGraph(jobGraph);
+        final ExecutionDeployer executionDeployer = createExecutionDeployer(executionGraph);
+        deployTasks(executionDeployer, executionGraph);
+
+        final ExecutionAttemptID attemptId = getAnyExecution(executionGraph).getAttemptId();
+
+        executionVertexVersioner.recordModification(attemptId.getExecutionVertexId());
+        testExecutionSlotAllocator.completePendingRequests();
+
+        assertThat(testExecutionOperations.getDeployedVertices()).isEmpty();
+    }
+
+    @Test
+    void testReleaseSlotIfVertexVersionOutdated() throws Exception {
+        final JobGraph jobGraph = singleNonParallelJobVertexJobGraph();
+
+        testExecutionSlotAllocator.disableAutoCompletePendingRequests();
+
+        final ExecutionGraph executionGraph = createExecutionGraph(jobGraph);
+        final ExecutionDeployer executionDeployer = createExecutionDeployer(executionGraph);
+        deployTasks(executionDeployer, executionGraph);
+
+        final ExecutionAttemptID attemptId = getAnyExecution(executionGraph).getAttemptId();
+
+        executionVertexVersioner.recordModification(attemptId.getExecutionVertexId());
+        testExecutionSlotAllocator.completePendingRequests();
+
+        assertThat(testExecutionSlotAllocator.getReturnedSlots()).hasSize(1);
+    }
+
+    @Test
+    void testDeployOnlyIfVertexIsCreated() throws Exception {
+        final JobGraph jobGraph = singleNonParallelJobVertexJobGraph();
+        final ExecutionGraph executionGraph = createExecutionGraph(jobGraph);
+        final ExecutionDeployer executionDeployer = createExecutionDeployer(executionGraph);
+
+        // deploy once to transition the tasks out from CREATED state
+        deployTasks(executionDeployer, executionGraph);
+
+        // The deploying of a non-CREATED vertex will result in IllegalStateException
+        assertThatThrownBy(() -> deployTasks(executionDeployer, executionGraph))
+                .as("IllegalStateException should happen")
+                .isInstanceOf(IllegalStateException.class);
+    }
+
+    @Test
+    void testDeploymentWaitForProducedPartitionRegistration() throws Exception {
+        shuffleMaster.setAutoCompleteRegistration(false);
+
+        final List<ResultPartitionID> trackedPartitions = new ArrayList<>();
+        partitionTracker.setStartTrackingPartitionsConsumer(
+                (resourceID, resultPartitionDeploymentDescriptor) ->
+                        trackedPartitions.add(
+                                resultPartitionDeploymentDescriptor
+                                        .getShuffleDescriptor()
+                                        .getResultPartitionID()));
+
+        final JobGraph jobGraph = nonParallelSourceSinkJobGraph();
+        final ExecutionGraph executionGraph = createExecutionGraph(jobGraph);
+        final ExecutionDeployer executionDeployer = createExecutionDeployer(executionGraph);
+
+        deployTasks(executionDeployer, executionGraph);
+
+        assertThat(trackedPartitions).isEmpty();
+        assertThat(testExecutionOperations.getDeployedExecutions()).isEmpty();
+
+        shuffleMaster.completeAllPendingRegistrations();
+        assertThat(trackedPartitions).hasSize(1);
+        assertThat(testExecutionOperations.getDeployedExecutions()).hasSize(2);
+    }
+
+    @Test
+    void testFailedProducedPartitionRegistration() throws Exception {
+        shuffleMaster.setAutoCompleteRegistration(false);
+
+        final JobGraph jobGraph = nonParallelSourceSinkJobGraph();
+        final ExecutionGraph executionGraph = createExecutionGraph(jobGraph);
+        final ExecutionDeployer executionDeployer = createExecutionDeployer(executionGraph);
+
+        deployTasks(executionDeployer, executionGraph);
+
+        assertThat(testExecutionOperations.getFailedExecutions()).isEmpty();
+
+        shuffleMaster.failAllPendingRegistrations();
+        assertThat(testExecutionOperations.getFailedExecutions()).hasSize(1);
+    }
+
+    @Test
+    void testDirectExceptionOnProducedPartitionRegistration() throws Exception {
+        shuffleMaster.setThrowExceptionalOnRegistration(true);
+
+        final JobGraph jobGraph = nonParallelSourceSinkJobGraph();
+        final ExecutionGraph executionGraph = createExecutionGraph(jobGraph);
+        final ExecutionDeployer executionDeployer = createExecutionDeployer(executionGraph);
+
+        deployTasks(executionDeployer, executionGraph);
+
+        assertThat(testExecutionOperations.getFailedExecutions()).hasSize(1);
+    }
+
+    @Test
+    void testProducedPartitionRegistrationTimeout() throws Exception {
+        ScheduledExecutorService scheduledExecutorService = null;
+        try {
+            partitionRegistrationTimeout = Time.milliseconds(1);
+
+            scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
+            mainThreadExecutor =
+                    ComponentMainThreadExecutorServiceAdapter.forSingleThreadExecutor(
+                            scheduledExecutorService);
+
+            shuffleMaster.setAutoCompleteRegistration(false);
+
+            final JobGraph jobGraph = nonParallelSourceSinkJobGraph();
+            final ExecutionGraph executionGraph = createExecutionGraph(jobGraph);
+            final ExecutionDeployer executionDeployer = createExecutionDeployer(executionGraph);
+
+            deployTasks(executionDeployer, executionGraph);
+
+            testExecutionOperations.awaitFailedExecutions(1);
+        } finally {
+            if (scheduledExecutorService != null) {
+                scheduledExecutorService.shutdown();
+            }
+        }
+    }
+
+    private static JobGraph singleNonParallelJobVertexJobGraph() {
+        return singleJobVertexJobGraph(1);
+    }
+
+    private static JobGraph singleJobVertexJobGraph(final int parallelism) {
+        final JobVertex vertex = new JobVertex("source");
+        vertex.setInvokableClass(NoOpInvokable.class);
+        vertex.setParallelism(parallelism);
+        return JobGraphTestUtils.streamingJobGraph(vertex);
+    }
+
+    private static JobGraph nonParallelSourceSinkJobGraph() {
+        final JobVertex source = new JobVertex("source");
+        source.setInvokableClass(NoOpInvokable.class);
+
+        final JobVertex sink = new JobVertex("sink");
+        sink.setInvokableClass(NoOpInvokable.class);
+
+        sink.connectNewDataSetAsInput(
+                source, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
+
+        return JobGraphTestUtils.streamingJobGraph(source, sink);
+    }
+
+    private ExecutionGraph createExecutionGraph(final JobGraph jobGraph) throws Exception {
+        final ExecutionGraph executionGraph =
+                TestingDefaultExecutionGraphBuilder.newBuilder()
+                        .setJobGraph(jobGraph)
+                        .setShuffleMaster(shuffleMaster)
+                        .setPartitionTracker(partitionTracker)
+                        .build(executor);
+
+        executionGraph.setInternalTaskFailuresListener(
+                new InternalFailuresListener() {
+                    @Override
+                    public void notifyTaskFailure(
+                            ExecutionAttemptID attemptId,
+                            Throwable t,
+                            boolean cancelTask,
+                            boolean releasePartitions) {}
+
+                    @Override
+                    public void notifyGlobalFailure(Throwable t) {}
+                });
+        executionGraph.start(ComponentMainThreadExecutorServiceAdapter.forMainThread());
+
+        return executionGraph;
+    }
+
+    private ExecutionDeployer createExecutionDeployer(ExecutionGraph executionGraph) {
+        return new DefaultExecutionDeployer.Factory()
+                .createInstance(
+                        LoggerFactory.getLogger(DefaultExecutionDeployer.class),
+                        testExecutionSlotAllocator,
+                        testExecutionOperations,
+                        executionVertexVersioner,
+                        partitionRegistrationTimeout,
+                        id -> executionGraph.getRegisteredExecutions().get(id),
+                        (ignored1, ignored2) -> {},
+                        mainThreadExecutor);
+    }
+
+    private void deployTasks(ExecutionDeployer executionDeployer, ExecutionGraph executionGraph) {
+        deployTasks(
+                executionDeployer,
+                IterableUtils.toStream(executionGraph.getAllExecutionVertices())
+                        .map(ExecutionVertex::getCurrentExecutionAttempt)
+                        .collect(Collectors.toList()));
+    }
+
+    private void deployTasks(ExecutionDeployer executionDeployer, List<Execution> executions) {
+        final Set<ExecutionVertexID> executionVertexIds =
+                executions.stream()
+                        .map(Execution::getAttemptId)
+                        .map(ExecutionAttemptID::getExecutionVertexId)
+                        .collect(Collectors.toSet());
+
+        executionDeployer.allocateSlotsAndDeploy(
+                executions, executionVertexVersioner.recordVertexModifications(executionVertexIds));
+    }
+
+    private static Execution getAnyExecution(ExecutionGraph executionGraph) {
+        return executionGraph.getRegisteredExecutions().values().iterator().next();
+    }
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
index cea1d98f880..733dbbc3d6e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
@@ -170,7 +170,7 @@ public class DefaultSchedulerTest extends TestLogger {
 
     private TestRestartBackoffTimeStrategy testRestartBackoffTimeStrategy;
 
-    private TestExecutionVertexOperationsDecorator testExecutionVertexOperations;
+    private TestExecutionOperationsDecorator testExecutionOperations;
 
     private ExecutionVertexVersioner executionVertexVersioner;
 
@@ -193,8 +193,8 @@ public class DefaultSchedulerTest extends TestLogger {
 
         testRestartBackoffTimeStrategy = new TestRestartBackoffTimeStrategy(true, 0);
 
-        testExecutionVertexOperations =
-                new TestExecutionVertexOperationsDecorator(new DefaultExecutionVertexOperations());
+        testExecutionOperations =
+                new TestExecutionOperationsDecorator(new DefaultExecutionOperations());
 
         executionVertexVersioner = new ExecutionVertexVersioner();
 
@@ -227,7 +227,7 @@ public class DefaultSchedulerTest extends TestLogger {
         createSchedulerAndStartScheduling(jobGraph);
 
         final List<ExecutionVertexID> deployedExecutionVertices =
-                testExecutionVertexOperations.getDeployedVertices();
+                testExecutionOperations.getDeployedVertices();
 
         final ExecutionVertexID executionVertexId = new ExecutionVertexID(onlyJobVertex.getID(), 0);
         assertThat(deployedExecutionVertices, contains(executionVertexId));
@@ -281,13 +281,13 @@ public class DefaultSchedulerTest extends TestLogger {
                         new ExecutionVertexID(onlyJobVertexId, 3));
         schedulingStrategy.schedule(verticesToSchedule);
 
-        assertThat(testExecutionVertexOperations.getDeployedVertices(), hasSize(0));
+        assertThat(testExecutionOperations.getDeployedVertices(), hasSize(0));
 
         testExecutionSlotAllocator.completePendingRequest(verticesToSchedule.get(0));
-        assertThat(testExecutionVertexOperations.getDeployedVertices(), hasSize(0));
+        assertThat(testExecutionOperations.getDeployedVertices(), hasSize(0));
 
         testExecutionSlotAllocator.completePendingRequests();
-        assertThat(testExecutionVertexOperations.getDeployedVertices(), hasSize(4));
+        assertThat(testExecutionOperations.getDeployedVertices(), hasSize(4));
     }
 
     @Test
@@ -315,7 +315,7 @@ public class DefaultSchedulerTest extends TestLogger {
         schedulingStrategy.schedule(desiredScheduleOrder);
 
         final List<ExecutionVertexID> deployedExecutionVertices =
-                testExecutionVertexOperations.getDeployedVertices();
+                testExecutionOperations.getDeployedVertices();
 
         assertEquals(desiredScheduleOrder, deployedExecutionVertices);
     }
@@ -325,15 +325,15 @@ public class DefaultSchedulerTest extends TestLogger {
         final JobGraph jobGraph = singleNonParallelJobVertexJobGraph();
         final JobVertex onlyJobVertex = getOnlyJobVertex(jobGraph);
 
-        testExecutionVertexOperations.enableFailDeploy();
+        testExecutionOperations.enableFailDeploy();
 
         createSchedulerAndStartScheduling(jobGraph);
 
-        testExecutionVertexOperations.disableFailDeploy();
+        testExecutionOperations.disableFailDeploy();
         taskRestartExecutor.triggerScheduledTasks();
 
         final List<ExecutionVertexID> deployedExecutionVertices =
-                testExecutionVertexOperations.getDeployedVertices();
+                testExecutionOperations.getDeployedVertices();
 
         final ExecutionVertexID executionVertexId = new ExecutionVertexID(onlyJobVertex.getID(), 0);
         assertThat(deployedExecutionVertices, contains(executionVertexId, executionVertexId));
@@ -360,7 +360,7 @@ public class DefaultSchedulerTest extends TestLogger {
         taskRestartExecutor.triggerScheduledTasks();
 
         final List<ExecutionVertexID> deployedExecutionVertices =
-                testExecutionVertexOperations.getDeployedVertices();
+                testExecutionOperations.getDeployedVertices();
         final ExecutionVertexID executionVertexId = new ExecutionVertexID(onlyJobVertex.getID(), 0);
         assertThat(deployedExecutionVertices, contains(executionVertexId, executionVertexId));
     }
@@ -537,7 +537,7 @@ public class DefaultSchedulerTest extends TestLogger {
         taskRestartExecutor.triggerScheduledTasks();
 
         assertThat(
-                testExecutionVertexOperations.getDeployedVertices(),
+                testExecutionOperations.getDeployedVertices(),
                 containsInAnyOrder(sourceExecutionVertexId, sinkExecutionVertexId));
         assertThat(
                 scheduler.requestJob().getArchivedExecutionGraph().getState(),
@@ -652,7 +652,7 @@ public class DefaultSchedulerTest extends TestLogger {
         taskRestartExecutor.triggerScheduledTasks();
 
         final List<ExecutionVertexID> deployedExecutionVertices =
-                testExecutionVertexOperations.getDeployedVertices();
+                testExecutionOperations.getDeployedVertices();
         final ExecutionVertexID executionVertexId = new ExecutionVertexID(onlyJobVertex.getID(), 0);
         assertThat(deployedExecutionVertices, contains(executionVertexId, executionVertexId));
     }
@@ -704,7 +704,7 @@ public class DefaultSchedulerTest extends TestLogger {
                 new ExecutionVertexID(onlyJobVertex.getID(), 1);
         assertThat(
                 "The execution vertices should be deployed in a specific order reflecting the scheduling start and the global fail-over afterwards.",
-                testExecutionVertexOperations.getDeployedVertices(),
+                testExecutionOperations.getDeployedVertices(),
                 contains(
                         executionVertexId0,
                         executionVertexId1,
@@ -881,16 +881,17 @@ public class DefaultSchedulerTest extends TestLogger {
 
         scheduler.updateTaskExecutionState(createFailedTaskExecutionState(attemptId));
         taskRestartExecutor.triggerScheduledTasks();
-        final List<ExecutionVertexID> deployedExecutionVertices =
-                testExecutionVertexOperations.getDeployedVertices();
 
         // the first task failover should be skipped on state restore failure
+        List<ExecutionVertexID> deployedExecutionVertices =
+                testExecutionOperations.getDeployedVertices();
         final ExecutionVertexID executionVertexId = new ExecutionVertexID(onlyJobVertex.getID(), 0);
         assertThat(deployedExecutionVertices, contains(executionVertexId));
 
         // a global failure should be triggered on state restore failure
         masterHook.disableFailOnRestore();
         taskRestartExecutor.triggerScheduledTasks();
+        deployedExecutionVertices = testExecutionOperations.getDeployedVertices();
         assertThat(deployedExecutionVertices, contains(executionVertexId, executionVertexId));
     }
 
@@ -1099,7 +1100,7 @@ public class DefaultSchedulerTest extends TestLogger {
 
         final Set<CompletableFuture<LogicalSlot>> pendingLogicalSlotFutures =
                 testExecutionSlotAllocator.getPendingRequests().values().stream()
-                        .map(SlotExecutionVertexAssignment::getLogicalSlotFuture)
+                        .map(ExecutionSlotAssignment::getLogicalSlotFuture)
                         .collect(Collectors.toSet());
         assertThat(pendingLogicalSlotFutures, hasSize(parallelism * 2));
 
@@ -1495,11 +1496,11 @@ public class DefaultSchedulerTest extends TestLogger {
         createSchedulerAndStartScheduling(jobGraph);
 
         assertThat(trackedPartitions, hasSize(0));
-        assertThat(testExecutionVertexOperations.getDeployedVertices(), hasSize(0));
+        assertThat(testExecutionOperations.getDeployedVertices(), hasSize(0));
 
         shuffleMaster.completeAllPendingRegistrations();
         assertThat(trackedPartitions, hasSize(1));
-        assertThat(testExecutionVertexOperations.getDeployedVertices(), hasSize(2));
+        assertThat(testExecutionOperations.getDeployedVertices(), hasSize(2));
     }
 
     @Test
@@ -1510,12 +1511,12 @@ public class DefaultSchedulerTest extends TestLogger {
 
         createSchedulerAndStartScheduling(jobGraph);
 
-        assertThat(testExecutionVertexOperations.getCanceledVertices(), hasSize(0));
-        assertThat(testExecutionVertexOperations.getFailedVertices(), hasSize(0));
+        assertThat(testExecutionOperations.getCanceledVertices(), hasSize(0));
+        assertThat(testExecutionOperations.getFailedVertices(), hasSize(0));
 
         shuffleMaster.failAllPendingRegistrations();
-        assertThat(testExecutionVertexOperations.getCanceledVertices(), hasSize(2));
-        assertThat(testExecutionVertexOperations.getFailedVertices(), hasSize(1));
+        assertThat(testExecutionOperations.getCanceledVertices(), hasSize(2));
+        assertThat(testExecutionOperations.getFailedVertices(), hasSize(1));
     }
 
     @Test
@@ -1526,8 +1527,8 @@ public class DefaultSchedulerTest extends TestLogger {
 
         createSchedulerAndStartScheduling(jobGraph);
 
-        assertThat(testExecutionVertexOperations.getCanceledVertices(), hasSize(2));
-        assertThat(testExecutionVertexOperations.getFailedVertices(), hasSize(1));
+        assertThat(testExecutionOperations.getCanceledVertices(), hasSize(2));
+        assertThat(testExecutionOperations.getFailedVertices(), hasSize(1));
     }
 
     @Test
@@ -1546,8 +1547,8 @@ public class DefaultSchedulerTest extends TestLogger {
             timeout = Time.milliseconds(1);
             createSchedulerAndStartScheduling(jobGraph, mainThreadExecutor);
 
-            testExecutionVertexOperations.awaitCanceledVertices(2);
-            testExecutionVertexOperations.awaitFailedVertices(1);
+            testExecutionOperations.awaitCanceledExecutions(2);
+            testExecutionOperations.awaitFailedExecutions(1);
         } finally {
             if (scheduledExecutorService != null) {
                 scheduledExecutorService.shutdown();
@@ -1857,7 +1858,7 @@ public class DefaultSchedulerTest extends TestLogger {
                 .setSchedulingStrategyFactory(new PipelinedRegionSchedulingStrategy.Factory())
                 .setFailoverStrategyFactory(new RestartPipelinedRegionFailoverStrategy.Factory())
                 .setRestartBackoffTimeStrategy(testRestartBackoffTimeStrategy)
-                .setExecutionVertexOperations(testExecutionVertexOperations)
+                .setExecutionOperations(testExecutionOperations)
                 .setExecutionVertexVersioner(executionVertexVersioner)
                 .setExecutionSlotAllocatorFactory(executionSlotAllocatorFactory)
                 .setShuffleMaster(shuffleMaster)
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DeploymentHandleTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DeploymentHandleTest.java
deleted file mode 100644
index 3ad93bbfea3..00000000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DeploymentHandleTest.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.flink.runtime.scheduler;
-
-import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.jobmaster.LogicalSlot;
-import org.apache.flink.runtime.jobmaster.TestingLogicalSlotBuilder;
-import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
-
-import org.junit.Before;
-import org.junit.Test;
-
-import java.util.concurrent.CompletableFuture;
-
-import static org.hamcrest.Matchers.containsString;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertSame;
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-/** Unit tests for {@link DeploymentHandle}. */
-public class DeploymentHandleTest {
-
-    private static final JobVertexID TEST_JOB_VERTEX_ID = new JobVertexID(0, 0);
-
-    private static final ExecutionVertexID TEST_EXECUTION_VERTEX_ID =
-            new ExecutionVertexID(TEST_JOB_VERTEX_ID, 0);
-
-    private static final ExecutionVertexVersion TEST_EXECUTION_VERTEX_VERSION =
-            new ExecutionVertexVersion(TEST_EXECUTION_VERTEX_ID, 0);
-
-    private CompletableFuture<LogicalSlot> logicalSlotFuture;
-
-    private DeploymentHandle deploymentHandle;
-
-    @Before
-    public void setUp() {
-        logicalSlotFuture = new CompletableFuture<>();
-        final SlotExecutionVertexAssignment slotExecutionVertexAssignment =
-                new SlotExecutionVertexAssignment(TEST_EXECUTION_VERTEX_ID, logicalSlotFuture);
-        deploymentHandle =
-                new DeploymentHandle(TEST_EXECUTION_VERTEX_VERSION, slotExecutionVertexAssignment);
-    }
-
-    @Test
-    public void getLogicalSlotThrowsExceptionIfSlotFutureNotCompleted() {
-        try {
-            assertFalse(deploymentHandle.getLogicalSlot().isPresent());
-            fail();
-        } catch (IllegalStateException e) {
-            assertThat(
-                    e.getMessage(),
-                    containsString("method can only be called after slot future is done"));
-        }
-    }
-
-    @Test
-    public void slotIsNotPresentIfFutureWasCancelled() {
-        logicalSlotFuture.cancel(false);
-        assertFalse(deploymentHandle.getLogicalSlot().isPresent());
-    }
-
-    @Test
-    public void slotIsNotPresentIfFutureWasCompletedExceptionally() {
-        logicalSlotFuture.completeExceptionally(new RuntimeException("expected"));
-        assertFalse(deploymentHandle.getLogicalSlot().isPresent());
-    }
-
-    @Test
-    public void getLogicalSlotReturnsSlotIfFutureCompletedNormally() {
-        final LogicalSlot logicalSlot = new TestingLogicalSlotBuilder().createTestingLogicalSlot();
-        logicalSlotFuture.complete(logicalSlot);
-        assertTrue(deploymentHandle.getLogicalSlot().isPresent());
-        assertSame(logicalSlot, deploymentHandle.getLogicalSlot().get());
-    }
-}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerTestingUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerTestingUtils.java
index 464d5c161db..ba6b7a0d880 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerTestingUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerTestingUtils.java
@@ -85,6 +85,7 @@ import java.util.concurrent.ScheduledExecutorService;
 import java.util.stream.Collectors;
 import java.util.stream.StreamSupport;
 
+import static org.apache.flink.runtime.scheduler.SchedulerBase.computeVertexParallelismStore;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
@@ -419,13 +420,14 @@ public class SchedulerTestingUtils {
                 new RestartPipelinedRegionFailoverStrategy.Factory();
         protected RestartBackoffTimeStrategy restartBackoffTimeStrategy =
                 NoRestartBackoffTimeStrategy.INSTANCE;
-        protected ExecutionVertexOperations executionVertexOperations =
-                new DefaultExecutionVertexOperations();
+        protected ExecutionOperations executionOperations = new DefaultExecutionOperations();
         protected ExecutionVertexVersioner executionVertexVersioner =
                 new ExecutionVertexVersioner();
         protected ExecutionSlotAllocatorFactory executionSlotAllocatorFactory =
                 new TestExecutionSlotAllocatorFactory();
         protected JobStatusListener jobStatusListener = (ignoredA, ignoredB, ignoredC) -> {};
+        protected ExecutionDeployer.Factory executionDeployerFactory =
+                new DefaultExecutionDeployer.Factory();
 
         public DefaultSchedulerBuilder(
                 final JobGraph jobGraph,
@@ -541,9 +543,9 @@ public class SchedulerTestingUtils {
             return this;
         }
 
-        public DefaultSchedulerBuilder setExecutionVertexOperations(
-                final ExecutionVertexOperations executionVertexOperations) {
-            this.executionVertexOperations = executionVertexOperations;
+        public DefaultSchedulerBuilder setExecutionOperations(
+                final ExecutionOperations executionOperations) {
+            this.executionOperations = executionOperations;
             return this;
         }
 
@@ -564,6 +566,12 @@ public class SchedulerTestingUtils {
             return this;
         }
 
+        public DefaultSchedulerBuilder setExecutionDeployerFactory(
+                ExecutionDeployer.Factory executionDeployerFactory) {
+            this.executionDeployerFactory = executionDeployerFactory;
+            return this;
+        }
+
         public DefaultScheduler build() throws Exception {
             final ExecutionGraphFactory executionGraphFactory =
                     new DefaultExecutionGraphFactory(
@@ -592,7 +600,7 @@ public class SchedulerTestingUtils {
                     schedulingStrategyFactory,
                     failoverStrategyFactory,
                     restartBackoffTimeStrategy,
-                    executionVertexOperations,
+                    executionOperations,
                     executionVertexVersioner,
                     executionSlotAllocatorFactory,
                     System.currentTimeMillis(),
@@ -600,7 +608,9 @@ public class SchedulerTestingUtils {
                     jobStatusListener,
                     executionGraphFactory,
                     shuffleMaster,
-                    rpcTimeout);
+                    rpcTimeout,
+                    computeVertexParallelismStore(jobGraph),
+                    executionDeployerFactory);
         }
     }
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SlotSharingExecutionSlotAllocatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SlotSharingExecutionSlotAllocatorTest.java
index cee964ae179..aeac6d7cc11 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SlotSharingExecutionSlotAllocatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SlotSharingExecutionSlotAllocatorTest.java
@@ -23,6 +23,7 @@ import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.clusterframework.types.SlotProfile;
 import org.apache.flink.runtime.clusterframework.types.SlotProfileTestingUtils;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.jobmaster.LogicalSlot;
 import org.apache.flink.runtime.jobmaster.SlotRequestId;
 import org.apache.flink.runtime.jobmaster.TestingPayload;
@@ -53,6 +54,7 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.stream.Collectors;
 
+import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.createExecutionAttemptId;
 import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.createRandomExecutionVertexId;
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.Matchers.containsInAnyOrder;
@@ -113,9 +115,9 @@ public class SlotSharingExecutionSlotAllocatorTest extends TestLogger {
         AllocationContext context =
                 AllocationContext.newBuilder().addGroup(EV1, EV2).addGroup(EV3, EV4).build();
 
-        List<SlotExecutionVertexAssignment> executionVertexAssignments =
+        List<ExecutionSlotAssignment> executionSlotAssignments =
                 context.allocateSlotsFor(EV1, EV2, EV3, EV4);
-        Collection<ExecutionVertexID> assignIds = getAssignIds(executionVertexAssignments);
+        Collection<ExecutionVertexID> assignIds = getAssignIds(executionSlotAssignments);
 
         assertThat(assignIds, containsInAnyOrder(EV1, EV2, EV3, EV4));
         assertThat(context.getSlotProvider().getRequests().keySet(), hasSize(2));
@@ -126,9 +128,8 @@ public class SlotSharingExecutionSlotAllocatorTest extends TestLogger {
         AllocationContext context = AllocationContext.newBuilder().addGroup(EV1, EV2).build();
 
         context.allocateSlotsFor(EV1);
-        List<SlotExecutionVertexAssignment> executionVertexAssignments =
-                context.allocateSlotsFor(EV2);
-        Collection<ExecutionVertexID> assignIds = getAssignIds(executionVertexAssignments);
+        List<ExecutionSlotAssignment> executionSlotAssignments = context.allocateSlotsFor(EV2);
+        Collection<ExecutionVertexID> assignIds = getAssignIds(executionSlotAssignments);
 
         // execution 0 from the first allocateSlotsFor call and execution 1 from the second
         // allocateSlotsFor call
@@ -142,8 +143,8 @@ public class SlotSharingExecutionSlotAllocatorTest extends TestLogger {
             throws ExecutionException, InterruptedException {
         AllocationContext context = AllocationContext.newBuilder().addGroup(EV1).build();
 
-        SlotExecutionVertexAssignment assignment1 = context.allocateSlotsFor(EV1).get(0);
-        SlotExecutionVertexAssignment assignment2 = context.allocateSlotsFor(EV1).get(0);
+        ExecutionSlotAssignment assignment1 = context.allocateSlotsFor(EV1).get(0);
+        ExecutionSlotAssignment assignment2 = context.allocateSlotsFor(EV1).get(0);
 
         assertThat(
                 assignment1.getLogicalSlotFuture().get()
@@ -226,7 +227,7 @@ public class SlotSharingExecutionSlotAllocatorTest extends TestLogger {
                 true,
                 true,
                 (context, assignment) -> {
-                    context.getAllocator().cancel(assignment.getExecutionVertexId());
+                    context.getAllocator().cancel(assignment.getExecutionAttemptId());
                     try {
                         assignment.getLogicalSlotFuture().get();
                         fail("The logical future must finish with the cancellation exception");
@@ -245,7 +246,7 @@ public class SlotSharingExecutionSlotAllocatorTest extends TestLogger {
                 false,
                 false,
                 (context, assignment) -> {
-                    context.getAllocator().cancel(assignment.getExecutionVertexId());
+                    context.getAllocator().cancel(assignment.getExecutionAttemptId());
                     assignment.getLogicalSlotFuture().get();
                 });
     }
@@ -253,7 +254,7 @@ public class SlotSharingExecutionSlotAllocatorTest extends TestLogger {
     private static void testLogicalSlotRequestCancellationOrRelease(
             boolean completePhysicalSlotFutureManually,
             boolean cancelsPhysicalSlotRequestAndRemovesSharedSlot,
-            BiConsumerWithException<AllocationContext, SlotExecutionVertexAssignment, Exception>
+            BiConsumerWithException<AllocationContext, ExecutionSlotAssignment, Exception>
                     cancelOrReleaseAction)
             throws Exception {
         AllocationContext.Builder allocationContextBuilder =
@@ -264,19 +265,19 @@ public class SlotSharingExecutionSlotAllocatorTest extends TestLogger {
         }
         AllocationContext context = allocationContextBuilder.build();
 
-        List<SlotExecutionVertexAssignment> assignments = context.allocateSlotsFor(EV1, EV2);
+        List<ExecutionSlotAssignment> assignments = context.allocateSlotsFor(EV1, EV2);
         assertThat(context.getSlotProvider().getRequests().keySet(), hasSize(1));
 
         // cancel or release only one sharing logical slots
         cancelOrReleaseAction.accept(context, assignments.get(0));
-        List<SlotExecutionVertexAssignment> assignmentsAfterOneCancellation =
+        List<ExecutionSlotAssignment> assignmentsAfterOneCancellation =
                 context.allocateSlotsFor(EV1, EV2);
         // there should be no more physical slot allocations, as the first logical slot reuses the
         // previous shared slot
         assertThat(context.getSlotProvider().getRequests().keySet(), hasSize(1));
 
         // cancel or release all sharing logical slots
-        for (SlotExecutionVertexAssignment assignment : assignmentsAfterOneCancellation) {
+        for (ExecutionSlotAssignment assignment : assignmentsAfterOneCancellation) {
             cancelOrReleaseAction.accept(context, assignment);
         }
         SlotRequestId slotRequestId =
@@ -298,7 +299,7 @@ public class SlotSharingExecutionSlotAllocatorTest extends TestLogger {
     public void testPhysicalSlotReleaseLogicalSlots()
             throws ExecutionException, InterruptedException {
         AllocationContext context = AllocationContext.newBuilder().addGroup(EV1, EV2).build();
-        List<SlotExecutionVertexAssignment> assignments = context.allocateSlotsFor(EV1, EV2);
+        List<ExecutionSlotAssignment> assignments = context.allocateSlotsFor(EV1, EV2);
         List<TestingPayload> payloads =
                 assignments.stream()
                         .map(
@@ -376,10 +377,10 @@ public class SlotSharingExecutionSlotAllocatorTest extends TestLogger {
         AllocationContext context = createBulkCheckerContextWithEv12GroupAndEv3Group(bulkChecker);
 
         // allocate 2 physical slots for 2 groups
-        List<SlotExecutionVertexAssignment> assignments1 = context.allocateSlotsFor(EV1, EV3);
+        List<ExecutionSlotAssignment> assignments1 = context.allocateSlotsFor(EV1, EV3);
         fulfilOneOfTwoSlotRequestsAndGetPendingProfile(context, new AllocationID());
         PhysicalSlotRequestBulk bulk1 = bulkChecker.getBulk();
-        List<SlotExecutionVertexAssignment> assignments2 = context.allocateSlotsFor(EV2);
+        List<ExecutionSlotAssignment> assignments2 = context.allocateSlotsFor(EV2);
 
         // cancelling of (EV1, EV3) releases assignments1 and only one physical slot for EV3
         // the second physical slot is held by sharing EV2 from the next bulk
@@ -438,10 +439,9 @@ public class SlotSharingExecutionSlotAllocatorTest extends TestLogger {
                                         new FlinkException("test failure")))
                         .build();
 
-        final List<SlotExecutionVertexAssignment> allocatedSlots =
-                context.allocateSlotsFor(EV1, EV2);
+        final List<ExecutionSlotAssignment> allocatedSlots = context.allocateSlotsFor(EV1, EV2);
 
-        for (SlotExecutionVertexAssignment allocatedSlot : allocatedSlots) {
+        for (ExecutionSlotAssignment allocatedSlot : allocatedSlots) {
             assertTrue(allocatedSlot.getLogicalSlotFuture().isCompletedExceptionally());
         }
 
@@ -473,9 +473,10 @@ public class SlotSharingExecutionSlotAllocatorTest extends TestLogger {
     }
 
     private static List<ExecutionVertexID> getAssignIds(
-            Collection<SlotExecutionVertexAssignment> assignments) {
+            Collection<ExecutionSlotAssignment> assignments) {
         return assignments.stream()
-                .map(SlotExecutionVertexAssignment::getExecutionVertexId)
+                .map(ExecutionSlotAssignment::getExecutionAttemptId)
+                .map(ExecutionAttemptID::getExecutionVertexId)
                 .collect(Collectors.toList());
     }
 
@@ -524,8 +525,13 @@ public class SlotSharingExecutionSlotAllocatorTest extends TestLogger {
             return allocator;
         }
 
-        private List<SlotExecutionVertexAssignment> allocateSlotsFor(ExecutionVertexID... ids) {
-            return allocator.allocateSlotsFor(Arrays.asList(ids));
+        private List<ExecutionSlotAssignment> allocateSlotsFor(ExecutionVertexID... ids) {
+            return allocator.allocateSlotsFor(
+                    Arrays.stream(ids)
+                            .map(
+                                    executionVertexId ->
+                                            createExecutionAttemptId(executionVertexId, 0))
+                            .collect(Collectors.toList()));
         }
 
         private TestingSlotSharingStrategy getSlotSharingStrategy() {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestExecutionOperationsDecorator.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestExecutionOperationsDecorator.java
new file mode 100644
index 00000000000..cd4205bba15
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestExecutionOperationsDecorator.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.runtime.JobException;
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+
+import javax.annotation.concurrent.GuardedBy;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * {@link ExecutionOperations} decorator that enables instrumentation of execution operations for
+ * testing purposes.
+ */
+public class TestExecutionOperationsDecorator implements ExecutionOperations {
+
+    private final ExecutionOperations delegate;
+
+    private final CountLatch deployedExecutions = new CountLatch();
+    private final CountLatch canceledExecutions = new CountLatch();
+    private final CountLatch failedExecutions = new CountLatch();
+
+    private boolean failDeploy;
+
+    public TestExecutionOperationsDecorator(final ExecutionOperations delegate) {
+        this.delegate = checkNotNull(delegate);
+    }
+
+    @Override
+    public void deploy(final Execution execution) throws JobException {
+        deployedExecutions.add(execution);
+        if (failDeploy) {
+            throw new RuntimeException("Expected");
+        }
+        delegate.deploy(execution);
+    }
+
+    @Override
+    public CompletableFuture<?> cancel(final Execution execution) {
+        canceledExecutions.add(execution);
+        return delegate.cancel(execution);
+    }
+
+    @Override
+    public void markFailed(Execution execution, Throwable cause) {
+        failedExecutions.add(execution);
+        delegate.markFailed(execution, cause);
+    }
+
+    public void enableFailDeploy() {
+        failDeploy = true;
+    }
+
+    public void disableFailDeploy() {
+        failDeploy = false;
+    }
+
+    public List<ExecutionAttemptID> getDeployedExecutions() {
+        return deployedExecutions.getExecutions();
+    }
+
+    public List<ExecutionAttemptID> getCanceledExecutions() {
+        return canceledExecutions.getExecutions();
+    }
+
+    public List<ExecutionAttemptID> getFailedExecutions() {
+        return failedExecutions.getExecutions();
+    }
+
+    public List<ExecutionVertexID> getDeployedVertices() {
+        return deployedExecutions.getVertices();
+    }
+
+    public List<ExecutionVertexID> getCanceledVertices() {
+        return canceledExecutions.getVertices();
+    }
+
+    public List<ExecutionVertexID> getFailedVertices() {
+        return failedExecutions.getVertices();
+    }
+
+    /** Waits until the given number of executions have been canceled. */
+    public void awaitCanceledExecutions(int count) throws InterruptedException {
+        canceledExecutions.await(count);
+    }
+
+    /** Waits until the given number of executions have been failed. */
+    public void awaitFailedExecutions(int count) throws InterruptedException {
+        failedExecutions.await(count);
+    }
+
+    private static class CountLatch {
+        @GuardedBy("lock")
+        private final List<Execution> executions = new ArrayList<>();
+
+        private final Object lock = new Object();
+
+        public void add(Execution execution) {
+            synchronized (lock) {
+                executions.add(execution);
+                lock.notifyAll();
+            }
+        }
+
+        public void await(int count) throws InterruptedException {
+            synchronized (lock) {
+                while (executions.size() < count) {
+                    lock.wait();
+                }
+            }
+        }
+
+        public List<ExecutionAttemptID> getExecutions() {
+            synchronized (lock) {
+                return executions.stream()
+                        .map(Execution::getAttemptId)
+                        .collect(Collectors.toList());
+            }
+        }
+
+        public List<ExecutionVertexID> getVertices() {
+            synchronized (lock) {
+                return executions.stream()
+                        .map(Execution::getVertex)
+                        .map(ExecutionVertex::getID)
+                        .collect(Collectors.toList());
+            }
+        }
+    }
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestExecutionSlotAllocator.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestExecutionSlotAllocator.java
index 095903e9968..cb36635d767 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestExecutionSlotAllocator.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestExecutionSlotAllocator.java
@@ -19,6 +19,7 @@
 
 package org.apache.flink.runtime.scheduler;
 
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
 import org.apache.flink.runtime.jobmaster.LogicalSlot;
 import org.apache.flink.runtime.jobmaster.SlotOwner;
@@ -39,7 +40,7 @@ import static org.apache.flink.util.Preconditions.checkState;
 /** Test {@link ExecutionSlotAllocator} implementation. */
 public class TestExecutionSlotAllocator implements ExecutionSlotAllocator, SlotOwner {
 
-    private final Map<ExecutionVertexID, SlotExecutionVertexAssignment> pendingRequests =
+    private final Map<ExecutionAttemptID, ExecutionSlotAssignment> pendingRequests =
             new HashMap<>();
 
     private final TestingLogicalSlotBuilder logicalSlotBuilder;
@@ -63,33 +64,34 @@ public class TestExecutionSlotAllocator implements ExecutionSlotAllocator, SlotO
     }
 
     @Override
-    public List<SlotExecutionVertexAssignment> allocateSlotsFor(
-            final List<ExecutionVertexID> executionVertexIds) {
-        final List<SlotExecutionVertexAssignment> slotVertexAssignments =
-                createSlotVertexAssignments(executionVertexIds);
-        registerPendingRequests(slotVertexAssignments);
+    public List<ExecutionSlotAssignment> allocateSlotsFor(
+            final List<ExecutionAttemptID> executionAttemptIds) {
+        final List<ExecutionSlotAssignment> executionSlotAssignments =
+                createExecutionSlotAssignments(executionAttemptIds);
+        registerPendingRequests(executionSlotAssignments);
         maybeCompletePendingRequests();
-        return slotVertexAssignments;
+        return executionSlotAssignments;
     }
 
-    private void registerPendingRequests(
-            final List<SlotExecutionVertexAssignment> slotVertexAssignments) {
-        for (SlotExecutionVertexAssignment slotVertexAssignment : slotVertexAssignments) {
-            pendingRequests.put(slotVertexAssignment.getExecutionVertexId(), slotVertexAssignment);
-        }
-    }
+    private List<ExecutionSlotAssignment> createExecutionSlotAssignments(
+            final List<ExecutionAttemptID> executionAttemptIds) {
 
-    private List<SlotExecutionVertexAssignment> createSlotVertexAssignments(
-            final Collection<ExecutionVertexID> executionVertexIds) {
-
-        final List<SlotExecutionVertexAssignment> result = new ArrayList<>();
-        for (ExecutionVertexID executionVertexId : executionVertexIds) {
+        final List<ExecutionSlotAssignment> result = new ArrayList<>();
+        for (ExecutionAttemptID executionAttemptId : executionAttemptIds) {
             final CompletableFuture<LogicalSlot> logicalSlotFuture = new CompletableFuture<>();
-            result.add(new SlotExecutionVertexAssignment(executionVertexId, logicalSlotFuture));
+            result.add(new ExecutionSlotAssignment(executionAttemptId, logicalSlotFuture));
         }
         return result;
     }
 
+    private void registerPendingRequests(
+            final List<ExecutionSlotAssignment> executionSlotAssignments) {
+        for (ExecutionSlotAssignment executionSlotAssignment : executionSlotAssignments) {
+            pendingRequests.put(
+                    executionSlotAssignment.getExecutionAttemptId(), executionSlotAssignment);
+        }
+    }
+
     private void maybeCompletePendingRequests() {
         if (autoCompletePendingRequests) {
             completePendingRequests();
@@ -97,36 +99,60 @@ public class TestExecutionSlotAllocator implements ExecutionSlotAllocator, SlotO
     }
 
     public void completePendingRequests() {
-        final Collection<ExecutionVertexID> vertexIds = new ArrayList<>(pendingRequests.keySet());
-        vertexIds.forEach(this::completePendingRequest);
+        final Collection<ExecutionAttemptID> executionIds =
+                new ArrayList<>(pendingRequests.keySet());
+        executionIds.forEach(this::completePendingRequest);
     }
 
-    public LogicalSlot completePendingRequest(final ExecutionVertexID executionVertexId) {
+    public LogicalSlot completePendingRequest(final ExecutionAttemptID executionAttemptId) {
         final LogicalSlot slot = logicalSlotBuilder.setSlotOwner(this).createTestingLogicalSlot();
-        final SlotExecutionVertexAssignment slotVertexAssignment =
-                removePendingRequest(executionVertexId);
-        checkState(slotVertexAssignment != null);
-        slotVertexAssignment.getLogicalSlotFuture().complete(slot);
+        final ExecutionSlotAssignment executionSlotAssignment =
+                removePendingRequest(executionAttemptId);
+        checkState(executionSlotAssignment != null);
+        executionSlotAssignment.getLogicalSlotFuture().complete(slot);
         return slot;
     }
 
-    private SlotExecutionVertexAssignment removePendingRequest(
-            final ExecutionVertexID executionVertexId) {
-        return pendingRequests.remove(executionVertexId);
+    public LogicalSlot completePendingRequest(final ExecutionVertexID executionVertexId) {
+        return completePendingRequest(findExecutionIdByVertexId(executionVertexId));
+    }
+
+    private ExecutionSlotAssignment removePendingRequest(
+            final ExecutionAttemptID executionAttemptId) {
+        return pendingRequests.remove(executionAttemptId);
     }
 
     public void timeoutPendingRequests() {
-        final Collection<ExecutionVertexID> vertexIds = new ArrayList<>(pendingRequests.keySet());
-        vertexIds.forEach(this::timeoutPendingRequest);
+        final Collection<ExecutionAttemptID> executionIds =
+                new ArrayList<>(pendingRequests.keySet());
+        executionIds.forEach(this::timeoutPendingRequest);
     }
 
-    public void timeoutPendingRequest(final ExecutionVertexID executionVertexId) {
-        final SlotExecutionVertexAssignment slotVertexAssignment =
-                removePendingRequest(executionVertexId);
+    public void timeoutPendingRequest(final ExecutionAttemptID executionAttemptId) {
+        final ExecutionSlotAssignment slotVertexAssignment =
+                removePendingRequest(executionAttemptId);
         checkState(slotVertexAssignment != null);
         slotVertexAssignment.getLogicalSlotFuture().completeExceptionally(new TimeoutException());
     }
 
+    public void timeoutPendingRequest(final ExecutionVertexID executionVertexId) {
+        timeoutPendingRequest(findExecutionIdByVertexId(executionVertexId));
+    }
+
+    private ExecutionAttemptID findExecutionIdByVertexId(
+            final ExecutionVertexID executionVertexId) {
+        final List<ExecutionAttemptID> executionIds = new ArrayList<>();
+        for (ExecutionAttemptID executionAttemptId : pendingRequests.keySet()) {
+            if (executionAttemptId.getExecutionVertexId().equals(executionVertexId)) {
+                executionIds.add(executionAttemptId);
+            }
+        }
+        checkState(
+                executionIds.size() == 1,
+                "It is expected to find one and only one ExecutionAttemptID of the given ExecutionVertexID.");
+        return executionIds.get(0);
+    }
+
     public void enableAutoCompletePendingRequests() {
         autoCompletePendingRequests = true;
     }
@@ -140,11 +166,11 @@ public class TestExecutionSlotAllocator implements ExecutionSlotAllocator, SlotO
     }
 
     @Override
-    public void cancel(final ExecutionVertexID executionVertexId) {
-        final SlotExecutionVertexAssignment slotVertexAssignment =
-                removePendingRequest(executionVertexId);
-        if (slotVertexAssignment != null) {
-            slotVertexAssignment.getLogicalSlotFuture().cancel(false);
+    public void cancel(ExecutionAttemptID executionAttemptId) {
+        final ExecutionSlotAssignment executionSlotAssignment =
+                removePendingRequest(executionAttemptId);
+        if (executionSlotAssignment != null) {
+            executionSlotAssignment.getLogicalSlotFuture().cancel(false);
         }
     }
 
@@ -158,10 +184,10 @@ public class TestExecutionSlotAllocator implements ExecutionSlotAllocator, SlotO
                 final LogicalSlot slot =
                         logicalSlotBuilder.setSlotOwner(this).createTestingLogicalSlot();
 
-                final SlotExecutionVertexAssignment slotVertexAssignment =
+                final ExecutionSlotAssignment executionSlotAssignment =
                         pendingRequests.remove(pendingRequests.keySet().stream().findAny().get());
 
-                slotVertexAssignment.getLogicalSlotFuture().complete(slot);
+                executionSlotAssignment.getLogicalSlotFuture().complete(slot);
             }
         }
     }
@@ -174,7 +200,7 @@ public class TestExecutionSlotAllocator implements ExecutionSlotAllocator, SlotO
         return logicalSlotBuilder;
     }
 
-    public Map<ExecutionVertexID, SlotExecutionVertexAssignment> getPendingRequests() {
+    public Map<ExecutionAttemptID, ExecutionSlotAssignment> getPendingRequests() {
         return Collections.unmodifiableMap(pendingRequests);
     }
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestExecutionVertexOperationsDecorator.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestExecutionVertexOperationsDecorator.java
deleted file mode 100644
index 1323510e422..00000000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestExecutionVertexOperationsDecorator.java
+++ /dev/null
@@ -1,133 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.flink.runtime.scheduler;
-
-import org.apache.flink.runtime.JobException;
-import org.apache.flink.runtime.executiongraph.ExecutionVertex;
-import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
-
-import javax.annotation.concurrent.GuardedBy;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.concurrent.CompletableFuture;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * {@link ExecutionVertexOperations} decorator that enables instrumentation of execution vertex
- * operations for testing purposes.
- */
-public class TestExecutionVertexOperationsDecorator implements ExecutionVertexOperations {
-
-    private final ExecutionVertexOperations delegate;
-
-    private final CountLatch deployedVertices = new CountLatch();
-    private final CountLatch canceledVertices = new CountLatch();
-    private final CountLatch failedVertices = new CountLatch();
-
-    private boolean failDeploy;
-
-    public TestExecutionVertexOperationsDecorator(final ExecutionVertexOperations delegate) {
-        this.delegate = checkNotNull(delegate);
-    }
-
-    @Override
-    public void deploy(final ExecutionVertex executionVertex) throws JobException {
-        deployedVertices.add(executionVertex.getID());
-
-        if (failDeploy) {
-            throw new RuntimeException("Expected");
-        }
-
-        delegate.deploy(executionVertex);
-    }
-
-    @Override
-    public CompletableFuture<?> cancel(final ExecutionVertex executionVertex) {
-        canceledVertices.add(executionVertex.getID());
-        return delegate.cancel(executionVertex);
-    }
-
-    @Override
-    public void markFailed(ExecutionVertex executionVertex, Throwable cause) {
-        failedVertices.add(executionVertex.getID());
-        delegate.markFailed(executionVertex, cause);
-    }
-
-    public void enableFailDeploy() {
-        failDeploy = true;
-    }
-
-    public void disableFailDeploy() {
-        failDeploy = false;
-    }
-
-    public List<ExecutionVertexID> getDeployedVertices() {
-        return deployedVertices.getVertices();
-    }
-
-    public List<ExecutionVertexID> getCanceledVertices() {
-        return canceledVertices.getVertices();
-    }
-
-    public List<ExecutionVertexID> getFailedVertices() {
-        return failedVertices.getVertices();
-    }
-
-    /** Waits until the given number of vertices have been canceled. */
-    public void awaitCanceledVertices(int count) throws InterruptedException {
-        canceledVertices.await(count);
-    }
-
-    /** Waits until the given number of vertices have been failed. */
-    public void awaitFailedVertices(int count) throws InterruptedException {
-        failedVertices.await(count);
-    }
-
-    private static class CountLatch {
-        @GuardedBy("lock")
-        private final List<ExecutionVertexID> vertices = new ArrayList<>();
-
-        private final Object lock = new Object();
-
-        public void add(ExecutionVertexID executionVertexId) {
-            synchronized (lock) {
-                vertices.add(executionVertexId);
-                lock.notifyAll();
-            }
-        }
-
-        public void await(int count) throws InterruptedException {
-            synchronized (lock) {
-                while (vertices.size() < count) {
-                    lock.wait();
-                }
-            }
-        }
-
-        public List<ExecutionVertexID> getVertices() {
-            synchronized (lock) {
-                return Collections.unmodifiableList(vertices);
-            }
-        }
-    }
-}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchSchedulerTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchSchedulerTestUtils.java
index 98152ee9546..12740adb7cf 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchSchedulerTestUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchSchedulerTestUtils.java
@@ -88,7 +88,7 @@ public class AdaptiveBatchSchedulerTestUtils {
                     schedulingStrategyFactory,
                     failoverStrategyFactory,
                     restartBackoffTimeStrategy,
-                    executionVertexOperations,
+                    executionOperations,
                     executionVertexVersioner,
                     executionSlotAllocatorFactory,
                     System.currentTimeMillis(),