You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ga...@apache.org on 2019/09/27 12:39:33 UTC

[flink] 06/06: [FLINK-12433][runtime] Add additional implementation to DefaultScheduler

This is an automated email from the ASF dual-hosted git repository.

gary pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit c546448a360ec19d0e9c9ba3fa7d84068367bbe5
Author: Gary Yao <ga...@apache.org>
AuthorDate: Thu Sep 5 09:46:39 2019 +0200

    [FLINK-12433][runtime] Add additional implementation to DefaultScheduler
    
    Implement DefaultScheduler to the point where streaming and batch WordCount can
    be executed. Note that restoring state does not work yet. This will be fixed in
    later commits.
    
    Add additional functionality to ExecutionGraph so that it can be used by
    DefaultScheduler. Some of the logic in ExecutionGraph must not be run when
    DefaultScheduler is configured. We introduce a function isLegacyScheduling() to
    be able to toggle off some of the legacy behavior when DefaultScheduler is
    configured.
    
    This closes #9663.
---
 .../flink/runtime/executiongraph/Execution.java    |  73 +++--
 .../runtime/executiongraph/ExecutionGraph.java     |  70 +++-
 .../runtime/executiongraph/ExecutionVertex.java    | 127 ++++---
 .../flip1/RestartPipelinedRegionStrategy.java      |   8 +
 .../DefaultExecutionVertexOperations.java}         |  30 +-
 .../flink/runtime/scheduler/DefaultScheduler.java  | 363 +++++++++++++++++++--
 .../runtime/scheduler/DefaultSchedulerFactory.java |  29 +-
 .../flink/runtime/scheduler/DeploymentHandle.java  |  83 +++++
 .../scheduler/ExecutionVertexOperations.java}      |  35 +-
 ...xecutionVertexSchedulingRequirementsMapper.java |  58 ++++
 .../scheduler/InternalTaskFailuresListener.java}   |  37 +--
 .../flink/runtime/scheduler/LegacyScheduler.java   |  65 +---
 .../flink/runtime/scheduler/SchedulerBase.java     | 139 +++++++-
 ...eSchedulerNgOnInternalTaskFailuresListener.java |  54 +++
 .../ExecutionVertexDeploymentTest.java             |   2 +-
 .../flip1/TestRestartBackoffTimeStrategy.java      |  12 +-
 .../runtime/scheduler/DefaultSchedulerTest.java    | 316 ++++++++++++++++++
 .../FailingExecutionVertexOperationsDecorator.java |  78 +++++
 .../SubmissionTrackingTaskManagerGateway.java      |  94 ++++++
 19 files changed, 1445 insertions(+), 228 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index 7ba2353..503b59e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -63,6 +63,7 @@ import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.FlinkRuntimeException;
 import org.apache.flink.util.OptionalFailure;
+import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.function.ThrowingRunnable;
 
 import org.slf4j.Logger;
@@ -295,8 +296,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 	 * @param logicalSlot to assign to this execution
 	 * @return true if the slot could be assigned to the execution, otherwise false
 	 */
-	@VisibleForTesting
-	boolean tryAssignResource(final LogicalSlot logicalSlot) {
+	public boolean tryAssignResource(final LogicalSlot logicalSlot) {
 
 		assertRunningInJobMasterMainThread();
 
@@ -597,12 +597,19 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 		}
 	}
 
-	@VisibleForTesting
-	CompletableFuture<Execution> registerProducedPartitions(TaskManagerLocation location) {
+	public CompletableFuture<Execution> registerProducedPartitions(TaskManagerLocation location) {
+		Preconditions.checkState(isLegacyScheduling());
+		return registerProducedPartitions(location, vertex.getExecutionGraph().getScheduleMode().allowLazyDeployment());
+	}
+
+	public CompletableFuture<Execution> registerProducedPartitions(
+			TaskManagerLocation location,
+			boolean sendScheduleOrUpdateConsumersMessage) {
+
 		assertRunningInJobMasterMainThread();
 
 		return FutureUtils.thenApplyAsyncIfNotDone(
-			registerProducedPartitions(vertex, location, attemptId),
+			registerProducedPartitions(vertex, location, attemptId, sendScheduleOrUpdateConsumersMessage),
 			vertex.getExecutionGraph().getJobMasterMainThreadExecutor(),
 			producedPartitionsCache -> {
 				producedPartitions = producedPartitionsCache;
@@ -615,10 +622,10 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 	static CompletableFuture<Map<IntermediateResultPartitionID, ResultPartitionDeploymentDescriptor>> registerProducedPartitions(
 			ExecutionVertex vertex,
 			TaskManagerLocation location,
-			ExecutionAttemptID attemptId) {
-		ProducerDescriptor producerDescriptor = ProducerDescriptor.create(location, attemptId);
+			ExecutionAttemptID attemptId,
+			boolean sendScheduleOrUpdateConsumersMessage) {
 
-		boolean lazyScheduling = vertex.getExecutionGraph().getScheduleMode().allowLazyDeployment();
+		ProducerDescriptor producerDescriptor = ProducerDescriptor.create(location, attemptId);
 
 		Collection<IntermediateResultPartition> partitions = vertex.getProducedPartitions().values();
 		Collection<CompletableFuture<ResultPartitionDeploymentDescriptor>> partitionRegistrations =
@@ -637,7 +644,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 					partitionDescriptor,
 					shuffleDescriptor,
 					maxParallelism,
-					lazyScheduling));
+					sendScheduleOrUpdateConsumersMessage));
 			partitionRegistrations.add(partitionRegistration);
 		}
 
@@ -753,7 +760,10 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 		}
 		catch (Throwable t) {
 			markFailed(t);
-			ExceptionUtils.rethrow(t);
+
+			if (isLegacyScheduling()) {
+				ExceptionUtils.rethrow(t);
+			}
 		}
 	}
 
@@ -845,6 +855,10 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 	}
 
 	private void scheduleConsumer(ExecutionVertex consumerVertex) {
+		if (!isLegacyScheduling()) {
+			return;
+		}
+
 		try {
 			final ExecutionGraph executionGraph = consumerVertex.getExecutionGraph();
 			consumerVertex.scheduleForExecution(
@@ -1031,10 +1045,17 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 		processFail(t, true);
 	}
 
+	/**
+	 * @deprecated Only used in tests.
+	 */
+	@Deprecated
+	@VisibleForTesting
 	void markFailed(Throwable t, Map<String, Accumulator<?, ?>> userAccumulators, IOMetrics metrics) {
-		// skip release of partitions since this is only called if the TM actually sent the FAILED state update
-		// in this case all partitions have already been cleaned up
-		processFail(t, true, userAccumulators, metrics, false);
+		markFailed(t, userAccumulators, metrics, false);
+	}
+
+	void markFailed(Throwable t, Map<String, Accumulator<?, ?>> userAccumulators, IOMetrics metrics, boolean fromSchedulerNg) {
+		processFail(t, true, userAccumulators, metrics, false, fromSchedulerNg);
 	}
 
 	@VisibleForTesting
@@ -1179,11 +1200,15 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 	//  Internal Actions
 	// --------------------------------------------------------------------------------------------
 
+	private boolean isLegacyScheduling() {
+		return getVertex().isLegacyScheduling();
+	}
+
 	private void processFail(Throwable t, boolean isCallback) {
-		processFail(t, isCallback, null, null, true);
+		processFail(t, isCallback, null, null, true, false);
 	}
 
-	private void processFail(Throwable t, boolean isCallback, Map<String, Accumulator<?, ?>> userAccumulators, IOMetrics metrics, boolean releasePartitions) {
+	private void processFail(Throwable t, boolean isCallback, Map<String, Accumulator<?, ?>> userAccumulators, IOMetrics metrics, boolean releasePartitions, boolean fromSchedulerNg) {
 		// damn, we failed. This means only that we keep our books and notify our parent JobExecutionVertex
 		// the actual computation on the task manager is cleaned up by the TaskManager that noticed the failure
 
@@ -1211,7 +1236,15 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 				return;
 			}
 
-			if (transitionState(current, FAILED, t)) {
+			if (!fromSchedulerNg && !isLegacyScheduling()) {
+				vertex.getExecutionGraph().notifySchedulerNgAboutInternalTaskFailure(attemptId, t);
+				// HACK: We informed the new generation scheduler about an internally detected task
+				// failure. The scheduler will call processFail() again with releasePartitions
+				// always set to false and fromSchedulerNg set to true. Because the original value
+				// of releasePartitions will be lost, we need to invoke handlePartitionCleanup() here.
+				handlePartitionCleanup(releasePartitions, releasePartitions);
+				return;
+			} else if (transitionState(current, FAILED, t)) {
 				// success (in a manner of speaking)
 				this.failureCause = t;
 
@@ -1273,9 +1306,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 				String message = String.format("Concurrent unexpected state transition of task %s to %s while deployment was in progress.",
 						getVertexWithAttempt(), currentState);
 
-				if (LOG.isDebugEnabled()) {
-					LOG.debug(message);
-				}
+				LOG.debug(message);
 
 				// undo the deployment
 				sendCancelRpcCall(NUM_CANCEL_CALL_TRIES);
@@ -1474,6 +1505,10 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 		return preferredLocationsFuture;
 	}
 
+	public void transitionState(ExecutionState targetState) {
+		transitionState(state, targetState);
+	}
+
 	private boolean transitionState(ExecutionState currentState, ExecutionState targetState) {
 		return transitionState(currentState, targetState, null);
 	}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index 4eb6a73..64ae21e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -67,6 +67,7 @@ import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguratio
 import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
 import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
 import org.apache.flink.runtime.query.KvStateLocationRegistry;
+import org.apache.flink.runtime.scheduler.InternalTaskFailuresListener;
 import org.apache.flink.runtime.scheduler.adapter.ExecutionGraphToSchedulingTopologyAdapter;
 import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
 import org.apache.flink.runtime.scheduler.strategy.SchedulingExecutionVertex;
@@ -249,6 +250,8 @@ public class ExecutionGraph implements AccessExecutionGraph {
 	/** Blob writer used to offload RPC messages. */
 	private final BlobWriter blobWriter;
 
+	private boolean legacyScheduling = true;
+
 	/** The total number of vertices currently in the execution graph. */
 	private int numVerticesTotal;
 
@@ -258,6 +261,9 @@ public class ExecutionGraph implements AccessExecutionGraph {
 
 	private SchedulingTopology schedulingTopology;
 
+	@Nullable
+	private InternalTaskFailuresListener internalTaskFailuresListener;
+
 	// ------ Configuration of the Execution -------
 
 	/** Flag to indicate whether the scheduler may queue tasks for execution, or needs to be able
@@ -512,8 +518,6 @@ public class ExecutionGraph implements AccessExecutionGraph {
 		this.resultPartitionAvailabilityChecker = new ExecutionGraphResultPartitionAvailabilityChecker(
 			this::createResultPartitionId,
 			partitionTracker);
-
-		LOG.info("Job recovers via failover strategy: {}", failoverStrategy.getStrategyName());
 	}
 
 	public void start(@Nonnull ComponentMainThreadExecutor jobMasterMainThreadExecutor) {
@@ -880,6 +884,13 @@ public class ExecutionGraph implements AccessExecutionGraph {
 		return StringifiedAccumulatorResult.stringifyAccumulatorResults(accumulatorMap);
 	}
 
+	public void enableNgScheduling(final InternalTaskFailuresListener internalTaskFailuresListener) {
+		checkNotNull(internalTaskFailuresListener);
+		checkState(this.internalTaskFailuresListener == null, "enableNgScheduling can be only called once");
+		this.internalTaskFailuresListener = internalTaskFailuresListener;
+		this.legacyScheduling = false;
+	}
+
 	// --------------------------------------------------------------------------------------------
 	//  Actions
 	// --------------------------------------------------------------------------------------------
@@ -942,10 +953,24 @@ public class ExecutionGraph implements AccessExecutionGraph {
 			new DefaultFailoverTopology(this));
 	}
 
+	public boolean isLegacyScheduling() {
+		return legacyScheduling;
+	}
+
+	public void transitionToRunning() {
+		if (!transitionState(JobStatus.CREATED, JobStatus.RUNNING)) {
+			throw new IllegalStateException("Job may only be scheduled from state " + JobStatus.CREATED);
+		}
+	}
+
 	public void scheduleForExecution() throws JobException {
 
 		assertRunningInJobMasterMainThread();
 
+		if (isLegacyScheduling()) {
+			LOG.info("Job recovers via failover strategy: {}", failoverStrategy.getStrategyName());
+		}
+
 		final long currentGlobalModVersion = globalModVersion;
 
 		if (transitionState(JobStatus.CREATED, JobStatus.RUNNING)) {
@@ -1129,6 +1154,11 @@ public class ExecutionGraph implements AccessExecutionGraph {
 	 * @param t The exception that caused the failure.
 	 */
 	public void failGlobal(Throwable t) {
+		if (!isLegacyScheduling()) {
+			// Implementation does not work for new generation scheduler.
+			// Will be fixed with FLINK-14232.
+			ExceptionUtils.rethrow(t);
+		}
 
 		assertRunningInJobMasterMainThread();
 
@@ -1310,6 +1340,10 @@ public class ExecutionGraph implements AccessExecutionGraph {
 		return transitionState(current, newState, null);
 	}
 
+	private void transitionState(JobStatus newState, Throwable error) {
+		transitionState(state, newState, error);
+	}
+
 	private boolean transitionState(JobStatus current, JobStatus newState, Throwable error) {
 		assertRunningInJobMasterMainThread();
 		// consistency check
@@ -1433,7 +1467,12 @@ public class ExecutionGraph implements AccessExecutionGraph {
 	 *
 	 * @return true if the operation could be executed; false if a concurrent job status change occurred
 	 */
+	@Deprecated
 	private boolean tryRestartOrFail(long globalModVersionForRestart) {
+		if (!isLegacyScheduling()) {
+			return true;
+		}
+
 		JobStatus currentState = state;
 
 		if (currentState == JobStatus.FAILING || currentState == JobStatus.RESTARTING) {
@@ -1485,6 +1524,21 @@ public class ExecutionGraph implements AccessExecutionGraph {
 		}
 	}
 
+	public void failJob(Throwable cause) {
+		if (state == JobStatus.FAILING || state.isGloballyTerminalState()) {
+			return;
+		}
+
+		transitionState(JobStatus.FAILING, cause);
+		initFailureCause(cause);
+
+		FutureUtils.assertNoException(
+			cancelVerticesAsync().whenComplete((aVoid, throwable) -> {
+				transitionState(JobStatus.FAILED, cause);
+				onTerminalState(JobStatus.FAILED);
+			}));
+	}
+
 	private void onTerminalState(JobStatus status) {
 		try {
 			CheckpointCoordinator coord = this.checkpointCoordinator;
@@ -1557,7 +1611,7 @@ public class ExecutionGraph implements AccessExecutionGraph {
 			case FAILED:
 				// this deserialization is exception-free
 				accumulators = deserializeAccumulators(state);
-				attempt.markFailed(state.getError(userClassLoader), accumulators, state.getIOMetrics());
+				attempt.markFailed(state.getError(userClassLoader), accumulators, state.getIOMetrics(), !isLegacyScheduling());
 				return true;
 
 			default:
@@ -1727,6 +1781,10 @@ public class ExecutionGraph implements AccessExecutionGraph {
 			final ExecutionState newExecutionState,
 			final Throwable error) {
 
+		if (!isLegacyScheduling()) {
+			return;
+		}
+
 		// see what this means for us. currently, the first FAILED state means -> FAILED
 		if (newExecutionState == ExecutionState.FAILED) {
 			final Throwable ex = error != null ? error : new FlinkException("Unknown Error (missing cause)");
@@ -1757,6 +1815,12 @@ public class ExecutionGraph implements AccessExecutionGraph {
 		}
 	}
 
+	void notifySchedulerNgAboutInternalTaskFailure(final ExecutionAttemptID attemptId, final Throwable t) {
+		if (internalTaskFailuresListener != null) {
+			internalTaskFailuresListener.notifyFailed(attemptId, t);
+		}
+	}
+
 	ShuffleMaster<?> getShuffleMaster() {
 		return shuffleMaster;
 	}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
index 6d262ee..500691b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
@@ -58,6 +58,7 @@ import java.util.LinkedHashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.stream.IntStream;
@@ -515,6 +516,18 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi
 	}
 
 	/**
+	 * Gets the preferred location to execute the current task execution attempt, based on the state
+	 * that the execution attempt will resume.
+	 */
+	public Optional<TaskManagerLocation> getPreferredLocationBasedOnState() {
+		if (currentExecution.getTaskRestore() != null) {
+			return Optional.ofNullable(getLatestPriorLocation());
+		}
+
+		return Optional.empty();
+	}
+
+	/**
 	 * Gets the location preferences of the vertex's current task execution, as determined by the locations
 	 * of the predecessors from which it receives input data.
 	 * If there are more than MAX_DISTINCT_LOCATIONS_TO_CONSIDER different locations of source data, this
@@ -599,61 +612,75 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi
 				throw new GlobalModVersionMismatch(originatingGlobalModVersion, actualModVersion);
 			}
 
-			final Execution oldExecution = currentExecution;
-			final ExecutionState oldState = oldExecution.getState();
+			return resetForNewExecutionInternal(timestamp, originatingGlobalModVersion);
+		}
+	}
 
-			if (oldState.isTerminal()) {
-				if (oldState == FINISHED) {
-					// pipelined partitions are released in Execution#cancel(), covering both job failures and vertex resets
-					// do not release pipelined partitions here to save RPC calls
-					oldExecution.handlePartitionCleanup(false, true);
-					getExecutionGraph().getPartitionReleaseStrategy().vertexUnfinished(executionVertexId);
-				}
+	public void resetForNewExecutionIfInTerminalState() {
+		if (isExecutionInTerminalState()) {
+			resetForNewExecutionInternal(System.currentTimeMillis(), getExecutionGraph().getGlobalModVersion());
+		}
+	}
 
-				priorExecutions.add(oldExecution.archive());
+	private boolean isExecutionInTerminalState() {
+		return currentExecution.getState().isTerminal();
+	}
 
-				final Execution newExecution = new Execution(
-					getExecutionGraph().getFutureExecutor(),
-					this,
-					oldExecution.getAttemptNumber() + 1,
-					originatingGlobalModVersion,
-					timestamp,
-					timeout);
+	private Execution resetForNewExecutionInternal(final long timestamp, final long originatingGlobalModVersion) {
+		final Execution oldExecution = currentExecution;
+		final ExecutionState oldState = oldExecution.getState();
 
-				currentExecution = newExecution;
+		if (oldState.isTerminal()) {
+			if (oldState == FINISHED) {
+				// pipelined partitions are released in Execution#cancel(), covering both job failures and vertex resets
+				// do not release pipelined partitions here to save RPC calls
+				oldExecution.handlePartitionCleanup(false, true);
+				getExecutionGraph().getPartitionReleaseStrategy().vertexUnfinished(executionVertexId);
+			}
 
-				synchronized (inputSplits) {
-					InputSplitAssigner assigner = jobVertex.getSplitAssigner();
-					if (assigner != null) {
-						assigner.returnInputSplit(inputSplits, getParallelSubtaskIndex());
-						inputSplits.clear();
-					}
-				}
+			priorExecutions.add(oldExecution.archive());
 
-				CoLocationGroup grp = jobVertex.getCoLocationGroup();
-				if (grp != null) {
-					locationConstraint = grp.getLocationConstraint(subTaskIndex);
-				}
+			final Execution newExecution = new Execution(
+				getExecutionGraph().getFutureExecutor(),
+				this,
+				oldExecution.getAttemptNumber() + 1,
+				originatingGlobalModVersion,
+				timestamp,
+				timeout);
 
-				// register this execution at the execution graph, to receive call backs
-				getExecutionGraph().registerExecution(newExecution);
+			currentExecution = newExecution;
 
-				// if the execution was 'FINISHED' before, tell the ExecutionGraph that
-				// we take one step back on the road to reaching global FINISHED
-				if (oldState == FINISHED) {
-					getExecutionGraph().vertexUnFinished();
+			synchronized (inputSplits) {
+				InputSplitAssigner assigner = jobVertex.getSplitAssigner();
+				if (assigner != null) {
+					assigner.returnInputSplit(inputSplits, getParallelSubtaskIndex());
+					inputSplits.clear();
 				}
+			}
 
-				// reset the intermediate results
-				for (IntermediateResultPartition resultPartition : resultPartitions.values()) {
-					resultPartition.resetForNewExecution();
-				}
+			CoLocationGroup grp = jobVertex.getCoLocationGroup();
+			if (grp != null) {
+				locationConstraint = grp.getLocationConstraint(subTaskIndex);
+			}
+
+			// register this execution at the execution graph, to receive call backs
+			getExecutionGraph().registerExecution(newExecution);
 
-				return newExecution;
+			// if the execution was 'FINISHED' before, tell the ExecutionGraph that
+			// we take one step back on the road to reaching global FINISHED
+			if (oldState == FINISHED) {
+				getExecutionGraph().vertexUnFinished();
 			}
-			else {
-				throw new IllegalStateException("Cannot reset a vertex that is in non-terminal state " + oldState);
+
+			// reset the intermediate results
+			for (IntermediateResultPartition resultPartition : resultPartitions.values()) {
+				resultPartition.resetForNewExecution();
 			}
+
+			return newExecution;
+		}
+		else {
+			throw new IllegalStateException("Cannot reset a vertex that is in non-terminal state " + oldState);
 		}
 	}
 
@@ -677,7 +704,17 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi
 			allPreviousExecutionGraphAllocationIds);
 	}
 
-	@VisibleForTesting
+	public void tryAssignResource(LogicalSlot slot) {
+		if (!currentExecution.tryAssignResource(slot)) {
+			throw new IllegalStateException("Could not assign resource " + slot + " to current execution " +
+				currentExecution + '.');
+		}
+	}
+
+	public void deploy() throws JobException {
+		currentExecution.deploy();
+	}
+
 	public void deployToSlot(LogicalSlot slot) throws JobException {
 		if (currentExecution.tryAssignResource(slot)) {
 			currentExecution.deploy();
@@ -830,4 +867,8 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi
 	public ArchivedExecutionVertex archive() {
 		return new ArchivedExecutionVertex(this);
 	}
+
+	public boolean isLegacyScheduling() {
+		return getExecutionGraph().isLegacyScheduling();
+	}
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/RestartPipelinedRegionStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/RestartPipelinedRegionStrategy.java
index a87764b..941b049 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/RestartPipelinedRegionStrategy.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/RestartPipelinedRegionStrategy.java
@@ -257,4 +257,12 @@ public class RestartPipelinedRegionStrategy implements FailoverStrategy {
 			failedPartitions.remove(resultPartitionID);
 		}
 	}
+
+	public static class Factory implements FailoverStrategy.Factory {
+
+		@Override
+		public FailoverStrategy create(final FailoverTopology topology) {
+			return new RestartPipelinedRegionStrategy(topology);
+		}
+	}
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/flip1/TestRestartBackoffTimeStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultExecutionVertexOperations.java
similarity index 58%
copy from flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/flip1/TestRestartBackoffTimeStrategy.java
copy to flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultExecutionVertexOperations.java
index 878c89f..028aac2 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/flip1/TestRestartBackoffTimeStrategy.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultExecutionVertexOperations.java
@@ -17,34 +17,22 @@
  * under the License.
  */
 
-package org.apache.flink.runtime.executiongraph.failover.flip1;
+package org.apache.flink.runtime.scheduler;
 
-/**
- * A RestartBackoffTimeStrategy implementation for tests.
- */
-public class TestRestartBackoffTimeStrategy implements RestartBackoffTimeStrategy {
-
-	private final boolean canRestart;
-
-	private final long backoffTime;
+import org.apache.flink.runtime.JobException;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
 
-	public TestRestartBackoffTimeStrategy(boolean canRestart, long backoffTime) {
-		this.canRestart = canRestart;
-		this.backoffTime = backoffTime;
-	}
+import java.util.concurrent.CompletableFuture;
 
-	@Override
-	public boolean canRestart() {
-		return canRestart;
-	}
+class DefaultExecutionVertexOperations implements ExecutionVertexOperations {
 
 	@Override
-	public long getBackoffTime() {
-		return backoffTime;
+	public void deploy(final ExecutionVertex executionVertex) throws JobException {
+		executionVertex.deploy();
 	}
 
 	@Override
-	public void notifyFailure(Throwable cause) {
-		// ignore
+	public CompletableFuture<?> cancel(final ExecutionVertex executionVertex) {
+		return executionVertex.cancel();
 	}
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java
index 759a56a..0d0af10 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java
@@ -23,40 +23,92 @@ import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.blob.BlobWriter;
 import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.concurrent.ScheduledExecutor;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler;
+import org.apache.flink.runtime.executiongraph.failover.flip1.FailoverStrategy;
+import org.apache.flink.runtime.executiongraph.failover.flip1.FailureHandlingResult;
+import org.apache.flink.runtime.executiongraph.failover.flip1.RestartBackoffTimeStrategy;
 import org.apache.flink.runtime.executiongraph.restart.ThrowingRestartStrategy;
 import org.apache.flink.runtime.io.network.partition.PartitionTracker;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobmaster.LogicalSlot;
 import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
 import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup;
 import org.apache.flink.runtime.rest.handler.legacy.backpressure.BackPressureStatsTracker;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.scheduler.strategy.LazyFromSourcesSchedulingStrategy;
+import org.apache.flink.runtime.scheduler.strategy.SchedulingStrategy;
+import org.apache.flink.runtime.scheduler.strategy.SchedulingStrategyFactory;
 import org.apache.flink.runtime.shuffle.ShuffleMaster;
+import org.apache.flink.runtime.taskmanager.TaskExecutionState;
 
 import org.slf4j.Logger;
 
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
 
 /**
- * Stub implementation of the future default scheduler.
+ * The future default scheduler.
  */
-public class DefaultScheduler extends SchedulerBase {
+public class DefaultScheduler extends SchedulerBase implements SchedulerOperations {
+
+	private final Logger log;
+
+	private final ClassLoader userCodeLoader;
+
+	private final ExecutionSlotAllocator executionSlotAllocator;
+
+	private final ExecutionFailureHandler executionFailureHandler;
+
+	private final ScheduledExecutor delayExecutor;
+
+	private final SchedulingStrategy schedulingStrategy;
+
+	private final ExecutionVertexVersioner executionVertexVersioner;
+
+	private final ExecutionVertexOperations executionVertexOperations;
 
 	public DefaultScheduler(
-			final Logger log,
-			final JobGraph jobGraph,
-			final BackPressureStatsTracker backPressureStatsTracker,
-			final Executor ioExecutor,
-			final Configuration jobMasterConfiguration,
-			final SlotProvider slotProvider,
-			final ScheduledExecutorService futureExecutor,
-			final ClassLoader userCodeLoader,
-			final CheckpointRecoveryFactory checkpointRecoveryFactory,
-			final Time rpcTimeout,
-			final BlobWriter blobWriter,
-			final JobManagerJobMetricGroup jobManagerJobMetricGroup,
-			final Time slotRequestTimeout,
-			final ShuffleMaster<?> shuffleMaster,
-			final PartitionTracker partitionTracker) throws Exception {
+		final Logger log,
+		final JobGraph jobGraph,
+		final BackPressureStatsTracker backPressureStatsTracker,
+		final Executor ioExecutor,
+		final Configuration jobMasterConfiguration,
+		final SlotProvider slotProvider,
+		final ScheduledExecutorService futureExecutor,
+		final ScheduledExecutor delayExecutor,
+		final ClassLoader userCodeLoader,
+		final CheckpointRecoveryFactory checkpointRecoveryFactory,
+		final Time rpcTimeout,
+		final BlobWriter blobWriter,
+		final JobManagerJobMetricGroup jobManagerJobMetricGroup,
+		final Time slotRequestTimeout,
+		final ShuffleMaster<?> shuffleMaster,
+		final PartitionTracker partitionTracker,
+		final SchedulingStrategyFactory schedulingStrategyFactory,
+		final FailoverStrategy.Factory failoverStrategyFactory,
+		final RestartBackoffTimeStrategy restartBackoffTimeStrategy,
+		final ExecutionVertexOperations executionVertexOperations,
+		final ExecutionVertexVersioner executionVertexVersioner) throws Exception {
 
 		super(
 			log,
@@ -75,10 +127,283 @@ public class DefaultScheduler extends SchedulerBase {
 			slotRequestTimeout,
 			shuffleMaster,
 			partitionTracker);
+
+		this.log = log;
+
+		this.delayExecutor = checkNotNull(delayExecutor);
+		this.userCodeLoader = checkNotNull(userCodeLoader);
+		this.executionVertexOperations = checkNotNull(executionVertexOperations);
+		this.executionVertexVersioner = checkNotNull(executionVertexVersioner);
+
+		this.executionFailureHandler = new ExecutionFailureHandler(failoverStrategyFactory.create(getFailoverTopology()), restartBackoffTimeStrategy);
+		this.schedulingStrategy = schedulingStrategyFactory.createInstance(this, getSchedulingTopology(), getJobGraph());
+		this.executionSlotAllocator = new DefaultExecutionSlotAllocator(slotProvider, getInputsLocationsRetriever(), slotRequestTimeout);
+	}
+
+	// ------------------------------------------------------------------------
+	// SchedulerNG
+	// ------------------------------------------------------------------------
+
+	@Override
+	protected void startSchedulingInternal() {
+		log.debug("Starting scheduling with scheduling strategy [{}]", schedulingStrategy.getClass().getName());
+		prepareExecutionGraphForNgScheduling();
+		schedulingStrategy.startScheduling();
+	}
+
+	@Override
+	protected void updateTaskExecutionStateInternal(final ExecutionVertexID executionVertexId, final TaskExecutionState taskExecutionState) {
+		schedulingStrategy.onExecutionStateChange(executionVertexId, taskExecutionState.getExecutionState());
+		maybeHandleTaskFailure(taskExecutionState, executionVertexId);
+	}
+
+	private void maybeHandleTaskFailure(final TaskExecutionState taskExecutionState, final ExecutionVertexID executionVertexId) {
+		if (taskExecutionState.getExecutionState() == ExecutionState.FAILED) {
+			final Throwable error = taskExecutionState.getError(userCodeLoader);
+			handleTaskFailure(executionVertexId, error);
+		}
 	}
 
+	private void handleTaskFailure(final ExecutionVertexID executionVertexId, final Throwable error) {
+		final FailureHandlingResult failureHandlingResult = executionFailureHandler.getFailureHandlingResult(executionVertexId, error);
+		maybeRestartTasks(failureHandlingResult);
+	}
+
+	private void maybeRestartTasks(final FailureHandlingResult failureHandlingResult) {
+		if (failureHandlingResult.canRestart()) {
+			restartTasksWithDelay(failureHandlingResult);
+		} else {
+			failJob(failureHandlingResult.getError());
+		}
+	}
+
+	private void restartTasksWithDelay(final FailureHandlingResult failureHandlingResult) {
+		final Set<ExecutionVertexID> verticesToRestart = failureHandlingResult.getVerticesToRestart();
+
+		final Set<ExecutionVertexVersion> executionVertexVersions =
+			new HashSet<>(executionVertexVersioner.recordVertexModifications(verticesToRestart).values());
+
+		final CompletableFuture<?> cancelFuture = cancelTasksAsync(verticesToRestart);
+
+		delayExecutor.schedule(
+			() -> FutureUtils.assertNoException(
+				cancelFuture.thenRunAsync(restartTasks(executionVertexVersions), getMainThreadExecutor())),
+			failureHandlingResult.getRestartDelayMS(),
+			TimeUnit.MILLISECONDS);
+	}
+
+	private Runnable restartTasks(final Set<ExecutionVertexVersion> executionVertexVersions) {
+		return () -> {
+			final Set<ExecutionVertexID> verticesToRestart = executionVertexVersioner.getUnmodifiedExecutionVertices(executionVertexVersions);
+			schedulingStrategy.restartTasks(verticesToRestart);
+		};
+	}
+
+	private CompletableFuture<?> cancelTasksAsync(final Set<ExecutionVertexID> verticesToRestart) {
+		final List<CompletableFuture<?>> cancelFutures = verticesToRestart.stream()
+			.map(this::cancelExecutionVertex)
+			.collect(Collectors.toList());
+
+		return FutureUtils.combineAll(cancelFutures);
+	}
+
+	private CompletableFuture<?> cancelExecutionVertex(final ExecutionVertexID executionVertexId) {
+		return executionVertexOperations.cancel(getExecutionVertex(executionVertexId));
+	}
+
+	@Override
+	protected void scheduleOrUpdateConsumersInternal(final ExecutionVertexID producerVertexId, final ResultPartitionID partitionId) {
+		schedulingStrategy.onPartitionConsumable(producerVertexId, partitionId);
+	}
+
+	// ------------------------------------------------------------------------
+	// SchedulerOperations
+	// ------------------------------------------------------------------------
+
 	@Override
-	public void startScheduling() {
-		throw new UnsupportedOperationException();
+	public void allocateSlotsAndDeploy(final Collection<ExecutionVertexDeploymentOption> executionVertexDeploymentOptions) {
+		final Map<ExecutionVertexID, ExecutionVertexDeploymentOption> deploymentOptionsByVertex = groupDeploymentOptionsByVertexId(executionVertexDeploymentOptions);
+		final Set<ExecutionVertexID> verticesToDeploy = deploymentOptionsByVertex.keySet();
+		final Map<ExecutionVertexID, ExecutionVertexVersion> requiredVersionByVertex = executionVertexVersioner.recordVertexModifications(verticesToDeploy);
+
+		prepareToDeployVertices(verticesToDeploy);
+
+		final Collection<SlotExecutionVertexAssignment> slotExecutionVertexAssignments = allocateSlots(executionVertexDeploymentOptions);
+
+		final Collection<DeploymentHandle> deploymentHandles = createDeploymentHandles(
+			requiredVersionByVertex,
+			deploymentOptionsByVertex,
+			slotExecutionVertexAssignments);
+
+		if (isDeployIndividually()) {
+			deployIndividually(deploymentHandles);
+		} else {
+			waitForAllSlotsAndDeploy(deploymentHandles);
+		}
+	}
+
+	private static Map<ExecutionVertexID, ExecutionVertexDeploymentOption> groupDeploymentOptionsByVertexId(
+			final Collection<ExecutionVertexDeploymentOption> executionVertexDeploymentOptions) {
+		return executionVertexDeploymentOptions.stream().collect(Collectors.toMap(
+				ExecutionVertexDeploymentOption::getExecutionVertexId,
+				Function.identity()));
+	}
+
+	private void prepareToDeployVertices(final Set<ExecutionVertexID> verticesToDeploy) {
+		cancelSlotAssignments(verticesToDeploy);
+		resetForNewExecutionIfInTerminalState(verticesToDeploy);
+		transitionToScheduled(verticesToDeploy);
+	}
+
+	private void cancelSlotAssignments(final Collection<ExecutionVertexID> vertices) {
+		vertices.forEach(executionSlotAllocator::cancel);
+	}
+
+	private Collection<SlotExecutionVertexAssignment> allocateSlots(final Collection<ExecutionVertexDeploymentOption> executionVertexDeploymentOptions) {
+		return executionSlotAllocator.allocateSlotsFor(executionVertexDeploymentOptions
+			.stream()
+			.map(ExecutionVertexDeploymentOption::getExecutionVertexId)
+			.map(this::getExecutionVertex)
+			.map(ExecutionVertexSchedulingRequirementsMapper::from)
+			.collect(Collectors.toList()));
+	}
+
+	private static Collection<DeploymentHandle> createDeploymentHandles(
+		final Map<ExecutionVertexID, ExecutionVertexVersion> requiredVersionByVertex,
+		final Map<ExecutionVertexID, ExecutionVertexDeploymentOption> deploymentOptionsByVertex,
+		final Collection<SlotExecutionVertexAssignment> slotExecutionVertexAssignments) {
+
+		return slotExecutionVertexAssignments
+			.stream()
+			.map(slotExecutionVertexAssignment -> {
+				final ExecutionVertexID executionVertexId = slotExecutionVertexAssignment.getExecutionVertexId();
+				return new DeploymentHandle(
+					requiredVersionByVertex.get(executionVertexId),
+					deploymentOptionsByVertex.get(executionVertexId),
+					slotExecutionVertexAssignment);
+			})
+			.collect(Collectors.toList());
+	}
+
+	/**
+	 * <b>HACK:</b> See <a href="https://issues.apache.org/jira/browse/FLINK-14162">FLINK-14162</a>
+	 * for details.
+	 */
+	private boolean isDeployIndividually() {
+		return schedulingStrategy instanceof LazyFromSourcesSchedulingStrategy;
+	}
+
+	private void deployIndividually(final Collection<DeploymentHandle> deploymentHandles) {
+		for (final DeploymentHandle deploymentHandle : deploymentHandles) {
+			FutureUtils.assertNoException(
+				deploymentHandle
+					.getSlotExecutionVertexAssignment()
+					.getLogicalSlotFuture()
+					.handle(assignResourceOrHandleError(deploymentHandle))
+					.handle(deployOrHandleError(deploymentHandle)));
+		}
+	}
+
+	private void waitForAllSlotsAndDeploy(final Collection<DeploymentHandle> deploymentHandles) {
+		FutureUtils.assertNoException(
+			assignAllResources(deploymentHandles).handle(deployAll(deploymentHandles)));
+	}
+
+	private CompletableFuture<Void> assignAllResources(final Collection<DeploymentHandle> deploymentHandles) {
+		final List<CompletableFuture<Void>> slotAssignedFutures = new ArrayList<>();
+		for (DeploymentHandle deploymentHandle : deploymentHandles) {
+			final CompletableFuture<Void> slotAssigned = deploymentHandle
+				.getSlotExecutionVertexAssignment()
+				.getLogicalSlotFuture()
+				.handle(assignResourceOrHandleError(deploymentHandle));
+			slotAssignedFutures.add(slotAssigned);
+		}
+		return FutureUtils.waitForAll(slotAssignedFutures);
+	}
+
+	private BiFunction<Void, Throwable, Void> deployAll(final Collection<DeploymentHandle> deploymentHandles) {
+		return (ignored, throwable) -> {
+			propagateIfNonNull(throwable);
+			for (final DeploymentHandle deploymentHandle : deploymentHandles) {
+				final SlotExecutionVertexAssignment slotExecutionVertexAssignment = deploymentHandle.getSlotExecutionVertexAssignment();
+				final CompletableFuture<LogicalSlot> slotAssigned = slotExecutionVertexAssignment.getLogicalSlotFuture();
+				checkState(slotAssigned.isDone());
+
+				FutureUtils.assertNoException(
+					slotAssigned.handle(deployOrHandleError(deploymentHandle)));
+			}
+			return null;
+		};
+	}
+
+	private static void propagateIfNonNull(final Throwable throwable) {
+		if (throwable != null) {
+			throw new CompletionException(throwable);
+		}
+	}
+
+	private BiFunction<LogicalSlot, Throwable, Void> assignResourceOrHandleError(final DeploymentHandle deploymentHandle) {
+		final ExecutionVertexVersion requiredVertexVersion = deploymentHandle.getRequiredVertexVersion();
+		final ExecutionVertexID executionVertexId = deploymentHandle.getExecutionVertexId();
+
+		return (logicalSlot, throwable) -> {
+			if (executionVertexVersioner.isModified(requiredVertexVersion)) {
+				log.debug("Refusing to assign slot to execution vertex {} because this deployment was " +
+					"superseded by another deployment", executionVertexId);
+				stopDeployment(deploymentHandle);
+				return null;
+			}
+
+			if (throwable == null) {
+				final ExecutionVertex executionVertex = getExecutionVertex(executionVertexId);
+				final boolean sendScheduleOrUpdateConsumerMessage = deploymentHandle.getDeploymentOption().sendScheduleOrUpdateConsumerMessage();
+				executionVertex
+					.getCurrentExecutionAttempt()
+					.registerProducedPartitions(logicalSlot.getTaskManagerLocation(), sendScheduleOrUpdateConsumerMessage);
+				executionVertex.tryAssignResource(logicalSlot);
+			} else {
+				handleTaskFailure(executionVertexId, throwable);
+			}
+			return null;
+		};
+	}
+
+	private BiFunction<Object, Throwable, Void> deployOrHandleError(final DeploymentHandle deploymentHandle) {
+		final ExecutionVertexVersion requiredVertexVersion = deploymentHandle.getRequiredVertexVersion();
+		final ExecutionVertexID executionVertexId = requiredVertexVersion.getExecutionVertexId();
+
+		return (ignored, throwable) -> {
+			if (executionVertexVersioner.isModified(requiredVertexVersion)) {
+				log.debug("Refusing to deploy execution vertex {} because this deployment was " +
+					"superseded by another deployment", executionVertexId);
+				stopDeployment(deploymentHandle);
+				return null;
+			}
+
+			if (throwable == null) {
+				deployTaskSafe(executionVertexId);
+			} else {
+				handleTaskFailure(executionVertexId, throwable);
+			}
+			return null;
+		};
+	}
+
+	private void stopDeployment(final DeploymentHandle deploymentHandle) {
+		cancelExecutionVertex(deploymentHandle.getExecutionVertexId());
+		// Canceling the vertex normally releases the slot. However, we might not have assigned
+		// the slot to the vertex yet.
+		deploymentHandle
+			.getLogicalSlot()
+			.ifPresent(logicalSlot -> logicalSlot.releaseSlot(null));
+	}
+
+	private void deployTaskSafe(final ExecutionVertexID executionVertexId) {
+		try {
+			final ExecutionVertex executionVertex = getExecutionVertex(executionVertexId);
+			executionVertexOperations.deploy(executionVertex);
+		} catch (Throwable e) {
+			handleTaskFailure(executionVertexId, e);
+		}
 	}
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultSchedulerFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultSchedulerFactory.java
index 5779a7e..0e663f3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultSchedulerFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultSchedulerFactory.java
@@ -23,11 +23,19 @@ import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.blob.BlobWriter;
 import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
+import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter;
+import org.apache.flink.runtime.executiongraph.failover.flip1.NoRestartBackoffTimeStrategy;
+import org.apache.flink.runtime.executiongraph.failover.flip1.RestartBackoffTimeStrategy;
+import org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionStrategy;
 import org.apache.flink.runtime.io.network.partition.PartitionTracker;
 import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.ScheduleMode;
 import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
 import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup;
 import org.apache.flink.runtime.rest.handler.legacy.backpressure.BackPressureStatsTracker;
+import org.apache.flink.runtime.scheduler.strategy.EagerSchedulingStrategy;
+import org.apache.flink.runtime.scheduler.strategy.LazyFromSourcesSchedulingStrategy;
+import org.apache.flink.runtime.scheduler.strategy.SchedulingStrategyFactory;
 import org.apache.flink.runtime.shuffle.ShuffleMaster;
 
 import org.slf4j.Logger;
@@ -58,6 +66,9 @@ public class DefaultSchedulerFactory implements SchedulerNGFactory {
 			final ShuffleMaster<?> shuffleMaster,
 			final PartitionTracker partitionTracker) throws Exception {
 
+		final SchedulingStrategyFactory schedulingStrategyFactory = createSchedulingStrategyFactory(jobGraph.getScheduleMode());
+		final RestartBackoffTimeStrategy restartBackoffTimeStrategy = NoRestartBackoffTimeStrategy.INSTANCE;
+
 		return new DefaultScheduler(
 			log,
 			jobGraph,
@@ -66,6 +77,7 @@ public class DefaultSchedulerFactory implements SchedulerNGFactory {
 			jobMasterConfiguration,
 			slotProvider,
 			futureExecutor,
+			new ScheduledExecutorServiceAdapter(futureExecutor),
 			userCodeLoader,
 			checkpointRecoveryFactory,
 			rpcTimeout,
@@ -73,7 +85,22 @@ public class DefaultSchedulerFactory implements SchedulerNGFactory {
 			jobManagerJobMetricGroup,
 			slotRequestTimeout,
 			shuffleMaster,
-			partitionTracker);
+			partitionTracker,
+			schedulingStrategyFactory,
+			new RestartPipelinedRegionStrategy.Factory(),
+			restartBackoffTimeStrategy,
+			new DefaultExecutionVertexOperations(),
+			new ExecutionVertexVersioner());
 	}
 
+	private SchedulingStrategyFactory createSchedulingStrategyFactory(final ScheduleMode scheduleMode) {
+		switch (scheduleMode) {
+			case EAGER:
+				return new EagerSchedulingStrategy.Factory();
+			case LAZY_FROM_SOURCES:
+				return new LazyFromSourcesSchedulingStrategy.Factory();
+			default:
+				throw new IllegalStateException("Unsupported schedule mode " + scheduleMode);
+		}
+	}
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DeploymentHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DeploymentHandle.java
new file mode 100644
index 0000000..2aed7fb
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DeploymentHandle.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.jobmaster.LogicalSlot;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * This class is a tuple holding the information necessary to deploy an {@link ExecutionVertex}.
+ *
+ * <p>The tuple consists of:
+ * <ul>
+ *     <li>{@link ExecutionVertexVersion}
+ *     <li>{@link ExecutionVertexDeploymentOption}
+ *     <li>{@link SlotExecutionVertexAssignment}
+ * </ul>
+ */
+class DeploymentHandle {
+
+	private final ExecutionVertexVersion requiredVertexVersion;
+
+	private final ExecutionVertexDeploymentOption executionVertexDeploymentOption;
+
+	private final SlotExecutionVertexAssignment slotExecutionVertexAssignment;
+
+	public DeploymentHandle(
+		final ExecutionVertexVersion requiredVertexVersion,
+		final ExecutionVertexDeploymentOption executionVertexDeploymentOption,
+		final SlotExecutionVertexAssignment slotExecutionVertexAssignment) {
+
+		this.requiredVertexVersion = Preconditions.checkNotNull(requiredVertexVersion);
+		this.executionVertexDeploymentOption = Preconditions.checkNotNull(executionVertexDeploymentOption);
+		this.slotExecutionVertexAssignment = Preconditions.checkNotNull(slotExecutionVertexAssignment);
+	}
+
+	public ExecutionVertexID getExecutionVertexId() {
+		return requiredVertexVersion.getExecutionVertexId();
+	}
+
+	public ExecutionVertexVersion getRequiredVertexVersion() {
+		return requiredVertexVersion;
+	}
+
+	public DeploymentOption getDeploymentOption() {
+		return executionVertexDeploymentOption.getDeploymentOption();
+	}
+
+	public SlotExecutionVertexAssignment getSlotExecutionVertexAssignment() {
+		return slotExecutionVertexAssignment;
+	}
+
+	public Optional<LogicalSlot> getLogicalSlot() {
+		final CompletableFuture<LogicalSlot> logicalSlotFuture = slotExecutionVertexAssignment.getLogicalSlotFuture();
+		Preconditions.checkState(logicalSlotFuture.isDone(), "method can only be called after slot future is done");
+
+		if (logicalSlotFuture.isCompletedExceptionally() || logicalSlotFuture.isCancelled()) {
+			return Optional.empty();
+		}
+		return Optional.ofNullable(logicalSlotFuture.getNow(null));
+	}
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/flip1/TestRestartBackoffTimeStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionVertexOperations.java
similarity index 56%
copy from flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/flip1/TestRestartBackoffTimeStrategy.java
copy to flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionVertexOperations.java
index 878c89f..23cca7c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/flip1/TestRestartBackoffTimeStrategy.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionVertexOperations.java
@@ -17,34 +17,19 @@
  * under the License.
  */
 
-package org.apache.flink.runtime.executiongraph.failover.flip1;
+package org.apache.flink.runtime.scheduler;
 
-/**
- * A RestartBackoffTimeStrategy implementation for tests.
- */
-public class TestRestartBackoffTimeStrategy implements RestartBackoffTimeStrategy {
-
-	private final boolean canRestart;
+import org.apache.flink.runtime.JobException;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
 
-	private final long backoffTime;
+import java.util.concurrent.CompletableFuture;
 
-	public TestRestartBackoffTimeStrategy(boolean canRestart, long backoffTime) {
-		this.canRestart = canRestart;
-		this.backoffTime = backoffTime;
-	}
-
-	@Override
-	public boolean canRestart() {
-		return canRestart;
-	}
+/**
+ * Operations on the {@link ExecutionVertex}.
+ */
+interface ExecutionVertexOperations {
 
-	@Override
-	public long getBackoffTime() {
-		return backoffTime;
-	}
+	void deploy(ExecutionVertex executionVertex) throws JobException;
 
-	@Override
-	public void notifyFailure(Throwable cause) {
-		// ignore
-	}
+	CompletableFuture<?> cancel(ExecutionVertex executionVertex);
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionVertexSchedulingRequirementsMapper.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionVertexSchedulingRequirementsMapper.java
new file mode 100644
index 0000000..b089e5f
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionVertexSchedulingRequirementsMapper.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+
+import java.util.Collection;
+import java.util.Collections;
+
+final class ExecutionVertexSchedulingRequirementsMapper {
+
+	public static ExecutionVertexSchedulingRequirements from(final ExecutionVertex executionVertex) {
+
+		final ExecutionVertexID executionVertexId = executionVertex.getID();
+
+		final AllocationID latestPriorAllocation = executionVertex.getLatestPriorAllocation();
+		final SlotSharingGroup slotSharingGroup = executionVertex.getJobVertex().getSlotSharingGroup();
+
+		return new ExecutionVertexSchedulingRequirements.Builder()
+			.withExecutionVertexId(executionVertexId)
+			.withPreviousAllocationId(latestPriorAllocation)
+			.withResourceProfile(executionVertex.getResourceProfile())
+			.withSlotSharingGroupId(slotSharingGroup == null ? null : slotSharingGroup.getSlotSharingGroupId())
+			.withCoLocationConstraint(executionVertex.getLocationConstraint())
+			.withPreferredLocations(getPreferredLocationBasedOnState(executionVertex)).build();
+	}
+
+	private static Collection<TaskManagerLocation> getPreferredLocationBasedOnState(final ExecutionVertex executionVertex) {
+		return executionVertex
+			.getPreferredLocationBasedOnState()
+			.map(Collections::singleton)
+			.orElse(Collections.emptySet());
+	}
+
+	private ExecutionVertexSchedulingRequirementsMapper() {
+	}
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/flip1/TestRestartBackoffTimeStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/InternalTaskFailuresListener.java
similarity index 52%
copy from flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/flip1/TestRestartBackoffTimeStrategy.java
copy to flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/InternalTaskFailuresListener.java
index 878c89f..6dbda3b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/flip1/TestRestartBackoffTimeStrategy.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/InternalTaskFailuresListener.java
@@ -17,34 +17,21 @@
  * under the License.
  */
 
-package org.apache.flink.runtime.executiongraph.failover.flip1;
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.jobmaster.JobMasterGateway;
+import org.apache.flink.runtime.taskmanager.TaskExecutionState;
 
 /**
- * A RestartBackoffTimeStrategy implementation for tests.
+ * This interface enables subscribing to Task failures that are detected from the JobMaster side
+ * (e.g., from within the {@link ExecutionGraph}).
+ * In contrast, there are also failures that are detected by the TaskManager, which are communicated
+ * via {@link JobMasterGateway#updateTaskExecutionState(TaskExecutionState)}.
  */
-public class TestRestartBackoffTimeStrategy implements RestartBackoffTimeStrategy {
-
-	private final boolean canRestart;
-
-	private final long backoffTime;
-
-	public TestRestartBackoffTimeStrategy(boolean canRestart, long backoffTime) {
-		this.canRestart = canRestart;
-		this.backoffTime = backoffTime;
-	}
-
-	@Override
-	public boolean canRestart() {
-		return canRestart;
-	}
+public interface InternalTaskFailuresListener {
 
-	@Override
-	public long getBackoffTime() {
-		return backoffTime;
-	}
+	void notifyFailed(ExecutionAttemptID attemptId, Throwable t);
 
-	@Override
-	public void notifyFailure(Throwable cause) {
-		// ignore
-	}
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/LegacyScheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/LegacyScheduler.java
index bc6ed69..bc5c450 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/LegacyScheduler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/LegacyScheduler.java
@@ -19,78 +19,24 @@
 
 package org.apache.flink.runtime.scheduler;
 
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.api.common.time.Time;
-import org.apache.flink.configuration.CheckpointingOptions;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.io.InputSplit;
-import org.apache.flink.queryablestate.KvStateID;
-import org.apache.flink.runtime.JobException;
-import org.apache.flink.runtime.accumulators.AccumulatorSnapshot;
 import org.apache.flink.runtime.blob.BlobWriter;
-import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
-import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
 import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
-import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
-import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
-import org.apache.flink.runtime.client.JobExecutionException;
-import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
-import org.apache.flink.runtime.concurrent.FutureUtils;
-import org.apache.flink.runtime.execution.ExecutionState;
-import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
-import org.apache.flink.runtime.executiongraph.Execution;
-import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.executiongraph.ExecutionGraph;
-import org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder;
-import org.apache.flink.runtime.executiongraph.ExecutionGraphException;
-import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
-import org.apache.flink.runtime.executiongraph.IntermediateResult;
-import org.apache.flink.runtime.executiongraph.JobStatusListener;
-import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
 import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory;
-import org.apache.flink.runtime.executiongraph.restart.RestartStrategyResolving;
 import org.apache.flink.runtime.io.network.partition.PartitionTracker;
-import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
-import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.jobgraph.JobStatus;
-import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
-import org.apache.flink.runtime.jobmanager.PartitionProducerDisposedException;
-import org.apache.flink.runtime.jobmaster.SerializedInputSplit;
 import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
-import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
-import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
-import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
-import org.apache.flink.runtime.messages.webmonitor.JobDetails;
 import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup;
-import org.apache.flink.runtime.query.KvStateLocation;
-import org.apache.flink.runtime.query.KvStateLocationRegistry;
-import org.apache.flink.runtime.query.UnknownKvStateLocation;
 import org.apache.flink.runtime.rest.handler.legacy.backpressure.BackPressureStatsTracker;
-import org.apache.flink.runtime.rest.handler.legacy.backpressure.OperatorBackPressureStats;
 import org.apache.flink.runtime.shuffle.ShuffleMaster;
-import org.apache.flink.runtime.state.KeyGroupRange;
-import org.apache.flink.runtime.taskmanager.TaskExecutionState;
-import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
-import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
-import org.apache.flink.util.FlinkException;
-import org.apache.flink.util.InstantiationUtil;
-import org.apache.flink.util.function.FunctionUtils;
 
 import org.slf4j.Logger;
 
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.util.Optional;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CompletionException;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ScheduledExecutorService;
 
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
 /**
  * A scheduler that delegates to the scheduling logic in the {@link ExecutionGraph}.
  *
@@ -134,4 +80,15 @@ public class LegacyScheduler extends SchedulerBase {
 			shuffleMaster,
 			partitionTracker);
 	}
+
+	@Override
+	protected void startSchedulingInternal() {
+		final ExecutionGraph executionGraph = getExecutionGraph();
+		try {
+			executionGraph.scheduleForExecution();
+		}
+		catch (Throwable t) {
+			executionGraph.failGlobal(t);
+		}
+	}
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
index 6c2ed51..4051ed4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
@@ -45,8 +45,11 @@ import org.apache.flink.runtime.executiongraph.ExecutionGraph;
 import org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder;
 import org.apache.flink.runtime.executiongraph.ExecutionGraphException;
 import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
 import org.apache.flink.runtime.executiongraph.IntermediateResult;
 import org.apache.flink.runtime.executiongraph.JobStatusListener;
+import org.apache.flink.runtime.executiongraph.failover.adapter.DefaultFailoverTopology;
+import org.apache.flink.runtime.executiongraph.failover.flip1.FailoverTopology;
 import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
 import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory;
 import org.apache.flink.runtime.executiongraph.restart.RestartStrategyResolving;
@@ -70,6 +73,9 @@ import org.apache.flink.runtime.query.KvStateLocationRegistry;
 import org.apache.flink.runtime.query.UnknownKvStateLocation;
 import org.apache.flink.runtime.rest.handler.legacy.backpressure.BackPressureStatsTracker;
 import org.apache.flink.runtime.rest.handler.legacy.backpressure.OperatorBackPressureStats;
+import org.apache.flink.runtime.scheduler.adapter.ExecutionGraphToSchedulingTopologyAdapter;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.scheduler.strategy.SchedulingTopology;
 import org.apache.flink.runtime.shuffle.ShuffleMaster;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.taskmanager.TaskExecutionState;
@@ -83,6 +89,7 @@ import org.slf4j.Logger;
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
+import java.util.Collection;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
@@ -102,6 +109,12 @@ public abstract class SchedulerBase implements SchedulerNG {
 
 	private final ExecutionGraph executionGraph;
 
+	private final SchedulingTopology schedulingTopology;
+
+	private final FailoverTopology failoverTopology;
+
+	private final InputsLocationsRetriever inputsLocationsRetriever;
+
 	private final BackPressureStatsTracker backPressureStatsTracker;
 
 	private final Executor ioExecutor;
@@ -124,9 +137,10 @@ public abstract class SchedulerBase implements SchedulerNG {
 
 	private final Time slotRequestTimeout;
 
+
 	private ComponentMainThreadExecutor mainThreadExecutor = new ComponentMainThreadExecutor.DummyComponentMainThreadExecutor(
-		"LegacyScheduler is not initialized with proper main thread executor. " +
-			"Call to LegacyScheduler.setMainThreadExecutor(...) required.");
+		"SchedulerBase is not initialized with proper main thread executor. " +
+			"Call to SchedulerBase.setMainThreadExecutor(...) required.");
 
 	public SchedulerBase(
 		final Logger log,
@@ -172,6 +186,9 @@ public abstract class SchedulerBase implements SchedulerNG {
 		this.slotRequestTimeout = checkNotNull(slotRequestTimeout);
 
 		this.executionGraph = createAndRestoreExecutionGraph(jobManagerJobMetricGroup, checkNotNull(shuffleMaster), checkNotNull(partitionTracker));
+		this.schedulingTopology = new ExecutionGraphToSchedulingTopologyAdapter(executionGraph);
+		this.failoverTopology = new DefaultFailoverTopology(executionGraph);
+		this.inputsLocationsRetriever = new ExecutionGraphToInputsLocationsRetrieverAdapter(executionGraph);
 	}
 
 	private ExecutionGraph createAndRestoreExecutionGraph(
@@ -222,6 +239,26 @@ public abstract class SchedulerBase implements SchedulerNG {
 	}
 
 	/**
+	 * @deprecated Direct access to the execution graph by scheduler implementations is discouraged
+	 * because currently the execution graph has various features and responsibilities that a
+	 * scheduler should not be concerned about. The following specialized abstractions to the
+	 * execution graph and accessors should be preferred over direct access:
+	 * <ul>
+	 *     <li>{@link #getSchedulingTopology()}
+	 *     <li>{@link #getFailoverTopology()}
+	 *     <li>{@link #getInputsLocationsRetriever()}
+	 *     <li>{@link #getExecutionVertex(ExecutionVertexID)}
+	 *     <li>{@link #getExecutionVertexId(ExecutionAttemptID)}
+	 *     <li>{@link #getExecutionVertexIdOrThrow(ExecutionAttemptID)}
+	 * </ul>
+	 * Currently, only {@link LegacyScheduler} requires direct access to the execution graph.
+	 */
+	@Deprecated
+	protected ExecutionGraph getExecutionGraph() {
+		return executionGraph;
+	}
+
+	/**
 	 * Tries to restore the given {@link ExecutionGraph} from the provided {@link SavepointRestoreSettings}.
 	 *
 	 * @param executionGraphToRestore {@link ExecutionGraph} which is supposed to be restored
@@ -241,6 +278,68 @@ public abstract class SchedulerBase implements SchedulerNG {
 		}
 	}
 
+	protected void resetForNewExecutionIfInTerminalState(final Collection<ExecutionVertexID> verticesToDeploy) {
+		verticesToDeploy.forEach(executionVertexId -> getExecutionVertex(executionVertexId)
+			.resetForNewExecutionIfInTerminalState());
+	}
+
+	protected void transitionToScheduled(final Collection<ExecutionVertexID> verticesToDeploy) {
+		verticesToDeploy.forEach(executionVertexId -> getExecutionVertex(executionVertexId)
+			.getCurrentExecutionAttempt()
+			.transitionState(ExecutionState.SCHEDULED));
+	}
+
+	protected ComponentMainThreadExecutor getMainThreadExecutor() {
+		return mainThreadExecutor;
+	}
+
+	protected void failJob(Throwable cause) {
+		executionGraph.failJob(cause);
+	}
+
+	protected FailoverTopology getFailoverTopology() {
+		return failoverTopology;
+	}
+
+	protected SchedulingTopology getSchedulingTopology() {
+		return schedulingTopology;
+	}
+
+	protected InputsLocationsRetriever getInputsLocationsRetriever() {
+		return inputsLocationsRetriever;
+	}
+
+	protected final void prepareExecutionGraphForNgScheduling() {
+		executionGraph.enableNgScheduling(new UpdateSchedulerNgOnInternalTaskFailuresListener(this, jobGraph.getJobID()));
+		executionGraph.transitionToRunning();
+	}
+
+	protected Optional<ExecutionVertexID> getExecutionVertexId(final ExecutionAttemptID executionAttemptId) {
+		return Optional.ofNullable(executionGraph.getRegisteredExecutions().get(executionAttemptId))
+			.map(this::getExecutionVertexId);
+	}
+
+	protected ExecutionVertexID getExecutionVertexIdOrThrow(final ExecutionAttemptID executionAttemptId) {
+		return getExecutionVertexId(executionAttemptId)
+			.orElseThrow(() -> new IllegalStateException("Cannot find execution " + executionAttemptId));
+	}
+
+	private ExecutionVertexID getExecutionVertexId(final Execution execution) {
+		return execution.getVertex().getID();
+	}
+
+	protected ExecutionVertex getExecutionVertex(final ExecutionVertexID executionVertexId) {
+		return executionGraph.getAllVertices().get(executionVertexId.getJobVertexId()).getTaskVertices()[executionVertexId.getSubtaskIndex()];
+	}
+
+	protected JobGraph getJobGraph() {
+		return jobGraph;
+	}
+
+	// ------------------------------------------------------------------------
+	// SchedulerNG
+	// ------------------------------------------------------------------------
+
 	@Override
 	public void setMainThreadExecutor(final ComponentMainThreadExecutor mainThreadExecutor) {
 		this.mainThreadExecutor = checkNotNull(mainThreadExecutor);
@@ -253,17 +352,13 @@ public abstract class SchedulerBase implements SchedulerNG {
 	}
 
 	@Override
-	public void startScheduling() {
+	public final void startScheduling() {
 		mainThreadExecutor.assertRunningInMainThread();
-
-		try {
-			executionGraph.scheduleForExecution();
-		}
-		catch (Throwable t) {
-			executionGraph.failGlobal(t);
-		}
+		startSchedulingInternal();
 	}
 
+	protected abstract void startSchedulingInternal();
+
 	@Override
 	public void suspend(Throwable cause) {
 		mainThreadExecutor.assertRunningInMainThread();
@@ -282,9 +377,17 @@ public abstract class SchedulerBase implements SchedulerNG {
 	}
 
 	@Override
-	public boolean updateTaskExecutionState(final TaskExecutionState taskExecutionState) {
-		mainThreadExecutor.assertRunningInMainThread();
-		return executionGraph.updateState(taskExecutionState);
+	public final boolean updateTaskExecutionState(final TaskExecutionState taskExecutionState) {
+		final Optional<ExecutionVertexID> executionVertexId = getExecutionVertexId(taskExecutionState.getID());
+		if (executionVertexId.isPresent()) {
+			executionGraph.updateState(taskExecutionState);
+			updateTaskExecutionStateInternal(executionVertexId.get(), taskExecutionState);
+			return true;
+		}
+		return false;
+	}
+
+	protected void updateTaskExecutionStateInternal(final ExecutionVertexID executionVertexId, final TaskExecutionState taskExecutionState) {
 	}
 
 	@Override
@@ -363,14 +466,20 @@ public abstract class SchedulerBase implements SchedulerNG {
 	}
 
 	@Override
-	public void scheduleOrUpdateConsumers(final ResultPartitionID partitionID) {
+	public final void scheduleOrUpdateConsumers(final ResultPartitionID partitionId) {
 		mainThreadExecutor.assertRunningInMainThread();
 
 		try {
-			executionGraph.scheduleOrUpdateConsumers(partitionID);
+			executionGraph.scheduleOrUpdateConsumers(partitionId);
 		} catch (ExecutionGraphException e) {
 			throw new RuntimeException(e);
 		}
+
+		final ExecutionVertexID producerVertexId = getExecutionVertexIdOrThrow(partitionId.getProducerId());
+		scheduleOrUpdateConsumersInternal(producerVertexId, partitionId);
+	}
+
+	protected void scheduleOrUpdateConsumersInternal(ExecutionVertexID producerVertexId, ResultPartitionID resultPartitionId) {
 	}
 
 	@Override
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/UpdateSchedulerNgOnInternalTaskFailuresListener.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/UpdateSchedulerNgOnInternalTaskFailuresListener.java
new file mode 100644
index 0000000..f37e4ec
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/UpdateSchedulerNgOnInternalTaskFailuresListener.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.taskmanager.TaskExecutionState;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Calls {@link SchedulerNG#updateTaskExecutionState(TaskExecutionState)} on task failure.
+ */
+class UpdateSchedulerNgOnInternalTaskFailuresListener implements InternalTaskFailuresListener {
+
+	private final SchedulerNG schedulerNg;
+
+	private final JobID jobId;
+
+	public UpdateSchedulerNgOnInternalTaskFailuresListener(
+		final SchedulerNG schedulerNg,
+		final JobID jobId) {
+
+		this.schedulerNg = checkNotNull(schedulerNg);
+		this.jobId = checkNotNull(jobId);
+	}
+
+	@Override
+	public void notifyFailed(final ExecutionAttemptID attemptId, final Throwable t) {
+		schedulerNg.updateTaskExecutionState(new TaskExecutionState(
+			jobId,
+			attemptId,
+			ExecutionState.FAILED,
+			t));
+	}
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java
index 6093000..f0a4524 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java
@@ -314,7 +314,7 @@ public class ExecutionVertexDeploymentTest extends TestLogger {
 				new AllocationID(),
 				0,
 				null,
-				Execution.registerProducedPartitions(vertex, location, attemptID).get().values());
+				Execution.registerProducedPartitions(vertex, location, attemptID, scheduleMode.allowLazyDeployment()).get().values());
 
 			Collection<ResultPartitionDeploymentDescriptor> producedPartitions = tdd.getProducedPartitions();
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/flip1/TestRestartBackoffTimeStrategy.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/flip1/TestRestartBackoffTimeStrategy.java
index 878c89f..0606cce 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/flip1/TestRestartBackoffTimeStrategy.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/flip1/TestRestartBackoffTimeStrategy.java
@@ -24,9 +24,9 @@ package org.apache.flink.runtime.executiongraph.failover.flip1;
  */
 public class TestRestartBackoffTimeStrategy implements RestartBackoffTimeStrategy {
 
-	private final boolean canRestart;
+	private boolean canRestart;
 
-	private final long backoffTime;
+	private long backoffTime;
 
 	public TestRestartBackoffTimeStrategy(boolean canRestart, long backoffTime) {
 		this.canRestart = canRestart;
@@ -47,4 +47,12 @@ public class TestRestartBackoffTimeStrategy implements RestartBackoffTimeStrateg
 	public void notifyFailure(Throwable cause) {
 		// ignore
 	}
+
+	public void setCanRestart(final boolean canRestart) {
+		this.canRestart = canRestart;
+	}
+
+	public void setBackoffTime(final long backoffTime) {
+		this.backoffTime = backoffTime;
+	}
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
new file mode 100644
index 0000000..7988e73
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
@@ -0,0 +1,316 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.runtime.blob.VoidBlobWriter;
+import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
+import org.apache.flink.runtime.clusterframework.types.SlotProfile;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
+import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutor;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionVertex;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.failover.FailoverStrategyLoader;
+import org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionStrategy;
+import org.apache.flink.runtime.executiongraph.failover.flip1.TestRestartBackoffTimeStrategy;
+import org.apache.flink.runtime.executiongraph.utils.SimpleSlotProvider;
+import org.apache.flink.runtime.io.network.partition.NoOpPartitionTracker;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.ScheduleMode;
+import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
+import org.apache.flink.runtime.jobmaster.SlotRequestId;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
+import org.apache.flink.runtime.rest.handler.legacy.backpressure.VoidBackPressureStatsTracker;
+import org.apache.flink.runtime.scheduler.strategy.EagerSchedulingStrategy;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.scheduler.strategy.LazyFromSourcesSchedulingStrategy;
+import org.apache.flink.runtime.shuffle.NettyShuffleMaster;
+import org.apache.flink.runtime.taskmanager.TaskExecutionState;
+import org.apache.flink.runtime.testtasks.NoOpInvokable;
+import org.apache.flink.runtime.testutils.DirectScheduledExecutorService;
+import org.apache.flink.util.ExecutorUtils;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.TestLogger;
+
+import org.apache.flink.shaded.guava18.com.google.common.collect.Iterables;
+
+import org.hamcrest.Matchers;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests for {@link DefaultScheduler}.
+ */
+public class DefaultSchedulerTest extends TestLogger {
+
+	private static final int TIMEOUT_MS = 1000;
+
+	private static final JobID TEST_JOB_ID = new JobID();
+
+	private ManuallyTriggeredScheduledExecutor taskRestartExecutor = new ManuallyTriggeredScheduledExecutor();
+
+	private ExecutorService executor;
+
+	private ScheduledExecutorService scheduledExecutorService;
+
+	private Configuration configuration;
+
+	private SubmissionTrackingTaskManagerGateway testTaskManagerGateway;
+
+	private TestRestartBackoffTimeStrategy testRestartBackoffTimeStrategy;
+
+	private FailingExecutionVertexOperationsDecorator testExecutionVertexOperations;
+
+	private SimpleSlotProvider slotProvider;
+
+	private ExecutionVertexVersioner executionVertexVersioner;
+
+	@Before
+	public void setUp() throws Exception {
+		executor = Executors.newSingleThreadExecutor();
+		scheduledExecutorService = new DirectScheduledExecutorService();
+
+		configuration = new Configuration();
+		configuration.setString(JobManagerOptions.EXECUTION_FAILOVER_STRATEGY, FailoverStrategyLoader.NO_OP_FAILOVER_STRATEGY);
+		testTaskManagerGateway = new SubmissionTrackingTaskManagerGateway();
+
+		testRestartBackoffTimeStrategy = new TestRestartBackoffTimeStrategy(true, 0);
+
+		testExecutionVertexOperations = new FailingExecutionVertexOperationsDecorator(new DefaultExecutionVertexOperations());
+
+		slotProvider = new SimpleSlotProvider(TEST_JOB_ID, 12, testTaskManagerGateway);
+
+		executionVertexVersioner = new ExecutionVertexVersioner();
+	}
+
+	@After
+	public void tearDown() throws Exception {
+		if (scheduledExecutorService != null) {
+			ExecutorUtils.gracefulShutdown(TIMEOUT_MS, TimeUnit.MILLISECONDS, scheduledExecutorService);
+		}
+
+		if (executor != null) {
+			ExecutorUtils.gracefulShutdown(TIMEOUT_MS, TimeUnit.MILLISECONDS, executor);
+		}
+	}
+
+	@Test
+	public void startScheduling() {
+		final JobGraph jobGraph = singleNonParallelJobVertexJobGraph();
+		final JobVertex onlyJobVertex = getOnlyJobVertex(jobGraph);
+
+		createSchedulerAndStartScheduling(jobGraph);
+
+		final List<ExecutionVertexID> deployedExecutionVertices = testTaskManagerGateway.getDeployedExecutionVertices(1, TIMEOUT_MS);
+
+		final ExecutionVertexID executionVertexId = new ExecutionVertexID(onlyJobVertex.getID(), 0);
+		assertThat(deployedExecutionVertices, contains(executionVertexId));
+	}
+
+	@Test
+	public void restartAfterDeploymentFails() {
+		final JobGraph jobGraph = singleNonParallelJobVertexJobGraph();
+		final JobVertex onlyJobVertex = getOnlyJobVertex(jobGraph);
+
+		testExecutionVertexOperations.enableFailDeploy();
+
+		createSchedulerAndStartScheduling(jobGraph);
+
+		testExecutionVertexOperations.disableFailDeploy();
+		taskRestartExecutor.triggerScheduledTasks();
+
+		final List<ExecutionVertexID> deployedExecutionVertices = testTaskManagerGateway.getDeployedExecutionVertices(1, TIMEOUT_MS);
+
+		final ExecutionVertexID executionVertexId = new ExecutionVertexID(onlyJobVertex.getID(), 0);
+		assertThat(deployedExecutionVertices, contains(executionVertexId));
+	}
+
+	@Test
+	public void scheduleWithLazyStrategy() {
+		final JobGraph jobGraph = singleNonParallelJobVertexJobGraph();
+		jobGraph.setScheduleMode(ScheduleMode.LAZY_FROM_SOURCES);
+		final JobVertex onlyJobVertex = getOnlyJobVertex(jobGraph);
+
+		createSchedulerAndStartScheduling(jobGraph);
+
+		final List<ExecutionVertexID> deployedExecutionVertices = testTaskManagerGateway.getDeployedExecutionVertices(1, TIMEOUT_MS);
+
+		final ExecutionVertexID executionVertexId = new ExecutionVertexID(onlyJobVertex.getID(), 0);
+		assertThat(deployedExecutionVertices, contains(executionVertexId));
+	}
+
+	@Test
+	public void restartFailedTask() {
+		final JobGraph jobGraph = singleNonParallelJobVertexJobGraph();
+		final JobVertex onlyJobVertex = getOnlyJobVertex(jobGraph);
+
+		final DefaultScheduler scheduler = createSchedulerAndStartScheduling(jobGraph);
+
+		final ArchivedExecutionVertex archivedExecutionVertex = Iterables.getOnlyElement(scheduler.requestJob().getAllExecutionVertices());
+		final ExecutionAttemptID attemptId = archivedExecutionVertex.getCurrentExecutionAttempt().getAttemptId();
+
+		scheduler.updateTaskExecutionState(new TaskExecutionState(jobGraph.getJobID(), attemptId, ExecutionState.FAILED));
+
+		taskRestartExecutor.triggerScheduledTasks();
+
+		final List<ExecutionVertexID> deployedExecutionVertices = testTaskManagerGateway.getDeployedExecutionVertices(2, TIMEOUT_MS);
+		final ExecutionVertexID executionVertexId = new ExecutionVertexID(onlyJobVertex.getID(), 0);
+		assertThat(deployedExecutionVertices, contains(executionVertexId, executionVertexId));
+	}
+
+	@Test
+	public void updateTaskExecutionStateReturnsFalseIfExecutionDoesNotExist() {
+		final JobGraph jobGraph = singleNonParallelJobVertexJobGraph();
+		final DefaultScheduler scheduler = createSchedulerAndStartScheduling(jobGraph);
+
+		final TaskExecutionState taskExecutionState = new TaskExecutionState(
+			jobGraph.getJobID(),
+			new ExecutionAttemptID(),
+			ExecutionState.FAILED);
+
+		assertFalse(scheduler.updateTaskExecutionState(taskExecutionState));
+	}
+
+	@Test
+	public void failJobIfCannotRestart() throws Exception {
+		final JobGraph jobGraph = singleNonParallelJobVertexJobGraph();
+		testRestartBackoffTimeStrategy.setCanRestart(false);
+
+		final DefaultScheduler scheduler = createSchedulerAndStartScheduling(jobGraph);
+
+		final ArchivedExecutionVertex onlyExecutionVertex = Iterables.getOnlyElement(scheduler.requestJob().getAllExecutionVertices());
+		final ExecutionAttemptID attemptId = onlyExecutionVertex.getCurrentExecutionAttempt().getAttemptId();
+
+		scheduler.updateTaskExecutionState(new TaskExecutionState(jobGraph.getJobID(), attemptId, ExecutionState.FAILED));
+
+		taskRestartExecutor.triggerScheduledTasks();
+
+		waitForTermination(scheduler);
+		final JobStatus jobStatus = scheduler.requestJobStatus();
+		assertThat(jobStatus, is(Matchers.equalTo(JobStatus.FAILED)));
+	}
+
+	@Test
+	public void failJobIfNotEnoughResources() throws Exception {
+		drainAllAvailableSlots();
+
+		final JobGraph jobGraph = singleNonParallelJobVertexJobGraph();
+		testRestartBackoffTimeStrategy.setCanRestart(false);
+
+		final DefaultScheduler scheduler = createSchedulerAndStartScheduling(jobGraph);
+
+		waitForTermination(scheduler);
+		final JobStatus jobStatus = scheduler.requestJobStatus();
+		assertThat(jobStatus, is(Matchers.equalTo(JobStatus.FAILED)));
+	}
+
+	private void drainAllAvailableSlots() {
+		final int numberOfAvailableSlots = slotProvider.getNumberOfAvailableSlots();
+		for (int i = 0; i < numberOfAvailableSlots; i++) {
+			slotProvider.allocateSlot(
+				new SlotRequestId(),
+				new ScheduledUnit(new JobVertexID(), null, null),
+				SlotProfile.noRequirements(),
+				true,
+				Time.milliseconds(TIMEOUT_MS));
+		}
+	}
+
+	private void waitForTermination(final DefaultScheduler scheduler) throws Exception {
+		scheduler.getTerminationFuture().get(TIMEOUT_MS, TimeUnit.MILLISECONDS);
+	}
+
+	private static JobGraph singleNonParallelJobVertexJobGraph() {
+		final JobGraph jobGraph = new JobGraph(TEST_JOB_ID, "Testjob");
+		jobGraph.setScheduleMode(ScheduleMode.EAGER);
+		final JobVertex vertex = new JobVertex("source");
+		vertex.setInvokableClass(NoOpInvokable.class);
+		jobGraph.addVertex(vertex);
+		return jobGraph;
+	}
+
+	private static JobVertex getOnlyJobVertex(final JobGraph jobGraph) {
+		final List<JobVertex> sortedVertices = jobGraph.getVerticesSortedTopologicallyFromSources();
+		Preconditions.checkState(sortedVertices.size() == 1);
+		return sortedVertices.get(0);
+	}
+
+	private DefaultScheduler createSchedulerAndStartScheduling(final JobGraph jobGraph) {
+		try {
+			final DefaultScheduler scheduler = createScheduler(jobGraph);
+			startScheduling(scheduler);
+			return scheduler;
+		} catch (Exception e) {
+			throw new RuntimeException(e);
+		}
+	}
+
+	private DefaultScheduler createScheduler(final JobGraph jobGraph) throws Exception {
+		return new DefaultScheduler(
+			log,
+			jobGraph,
+			VoidBackPressureStatsTracker.INSTANCE,
+			executor,
+			configuration,
+			slotProvider,
+			scheduledExecutorService,
+			taskRestartExecutor,
+			ClassLoader.getSystemClassLoader(),
+			new StandaloneCheckpointRecoveryFactory(),
+			Time.seconds(300),
+			VoidBlobWriter.getInstance(),
+			UnregisteredMetricGroups.createUnregisteredJobManagerJobMetricGroup(),
+			Time.seconds(300),
+			NettyShuffleMaster.INSTANCE,
+			NoOpPartitionTracker.INSTANCE,
+			jobGraph.getScheduleMode() == ScheduleMode.LAZY_FROM_SOURCES ?
+				new LazyFromSourcesSchedulingStrategy.Factory() :
+				new EagerSchedulingStrategy.Factory(),
+			new RestartPipelinedRegionStrategy.Factory(),
+			testRestartBackoffTimeStrategy,
+			testExecutionVertexOperations,
+			executionVertexVersioner);
+	}
+
+	private void startScheduling(final SchedulerNG scheduler) {
+		scheduler.setMainThreadExecutor(ComponentMainThreadExecutorServiceAdapter.forMainThread());
+		scheduler.startScheduling();
+	}
+
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/FailingExecutionVertexOperationsDecorator.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/FailingExecutionVertexOperationsDecorator.java
new file mode 100644
index 0000000..b5ad29f
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/FailingExecutionVertexOperationsDecorator.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.runtime.JobException;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+
+import java.util.concurrent.CompletableFuture;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Allows to fail ExecutionVertex operations for testing.
+ */
+public class FailingExecutionVertexOperationsDecorator implements ExecutionVertexOperations {
+
+	private final ExecutionVertexOperations delegate;
+
+	private boolean failDeploy;
+
+	private boolean failCancel;
+
+	public FailingExecutionVertexOperationsDecorator(final ExecutionVertexOperations delegate) {
+		this.delegate = checkNotNull(delegate);
+	}
+
+	@Override
+	public void deploy(final ExecutionVertex executionVertex) throws JobException {
+		if (failDeploy) {
+			throw new RuntimeException("Expected");
+		} else {
+			delegate.deploy(executionVertex);
+		}
+	}
+
+	@Override
+	public CompletableFuture<?> cancel(final ExecutionVertex executionVertex) {
+		if (failCancel) {
+			return FutureUtils.completedExceptionally(new RuntimeException("Expected"));
+		} else {
+			return delegate.cancel(executionVertex);
+		}
+	}
+
+	public void enableFailDeploy() {
+		failDeploy = true;
+	}
+
+	public void disableFailDeploy() {
+		failDeploy = false;
+	}
+
+	public void enableFailCancel() {
+		failCancel = true;
+	}
+
+	public void disableFailCancel() {
+		failCancel = false;
+	}
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SubmissionTrackingTaskManagerGateway.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SubmissionTrackingTaskManagerGateway.java
new file mode 100644
index 0000000..ae1c01b
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SubmissionTrackingTaskManagerGateway.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
+import org.apache.flink.runtime.executiongraph.TaskInformation;
+import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+class SubmissionTrackingTaskManagerGateway extends SimpleAckingTaskManagerGateway {
+
+	private final BlockingQueue<TaskDeploymentDescriptor> taskDeploymentDescriptors = new LinkedBlockingDeque<>();
+
+	private boolean failSubmission;
+
+	public void setFailSubmission(final boolean failSubmission) {
+		this.failSubmission = failSubmission;
+	}
+
+	@Override
+	public CompletableFuture<Acknowledge> submitTask(final TaskDeploymentDescriptor tdd, final Time timeout) {
+		super.submitTask(tdd, timeout);
+
+		taskDeploymentDescriptors.add(tdd);
+
+		if (failSubmission) {
+			return FutureUtils.completedExceptionally(new RuntimeException("Task submission failed."));
+		} else {
+			return CompletableFuture.completedFuture(Acknowledge.get());
+		}
+	}
+
+	public List<ExecutionVertexID> getDeployedExecutionVertices(int num, long timeoutMs) {
+		final List<ExecutionVertexID> deployedVertices = new ArrayList<>();
+		for (int i = 0; i < num; i++) {
+			try {
+				final TaskDeploymentDescriptor taskDeploymentDescriptor = taskDeploymentDescriptors.poll(timeoutMs, TimeUnit.MILLISECONDS);
+				checkState(taskDeploymentDescriptor != null, "Expected %s tasks to be submitted within %s ms, got %s", num, timeoutMs, i);
+				deployedVertices.add(getExecutionVertexId(taskDeploymentDescriptor));
+			} catch (InterruptedException e) {
+				Thread.currentThread().interrupt();
+			}
+		}
+		return deployedVertices;
+	}
+
+	private ExecutionVertexID getExecutionVertexId(final TaskDeploymentDescriptor deploymentDescriptor) {
+		final TaskInformation taskInformation = deserializeTaskInformation(deploymentDescriptor);
+		final JobVertexID jobVertexId = taskInformation.getJobVertexId();
+		final int subtaskIndex = deploymentDescriptor.getSubtaskIndex();
+		return new ExecutionVertexID(jobVertexId, subtaskIndex);
+	}
+
+	private TaskInformation deserializeTaskInformation(final TaskDeploymentDescriptor taskDeploymentDescriptor) {
+		try {
+			return taskDeploymentDescriptor
+				.getSerializedTaskInformation()
+				.deserializeValue(ClassLoader.getSystemClassLoader());
+		} catch (IOException | ClassNotFoundException e) {
+			throw new RuntimeException(e);
+		}
+	}
+}