You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ga...@apache.org on 2019/05/08 13:12:17 UTC

[flink] 03/03: [FLINK-12231][runtime] Introduce SchedulerNG interface

This is an automated email from the ASF dual-hosted git repository.

gary pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit b9704be0512337b3c74a26f3fb36035e6480d522
Author: Gary Yao <ga...@apache.org>
AuthorDate: Tue Apr 23 11:37:28 2019 +0200

    [FLINK-12231][runtime] Introduce SchedulerNG interface
    
    Introduce SchedulerNG interface with a LegacyScheduler implementation, which
    hides direct calls to the ExecutionGraph from the JobMaster.
    
    The LegacyScheduler is only needed so that the existing scheduling code paths
    keep functioning while work on the new scheduling abstractions is in progress.
    
    Remove JobMasterTest#testAutomaticRestartingWhenCheckpointing() because it
    requires the JobMaster to expose internal state (RestartStrategy) - add unit
    tests for JobGraph#isCheckpointingEnabled() to make up for the removed test.
---
 .../dispatcher/DefaultJobManagerRunnerFactory.java |   7 +-
 .../apache/flink/runtime/jobmaster/JobMaster.java  | 506 ++++-------------
 .../flink/runtime/jobmaster/LegacyScheduler.java   | 626 +++++++++++++++++++++
 .../runtime/jobmaster/LegacySchedulerFactory.java  |  82 +++
 .../flink/runtime/jobmaster/SchedulerNG.java       | 120 ++++
 .../runtime/jobmaster/SchedulerNGFactory.java      |  56 ++
 .../factories/DefaultJobMasterServiceFactory.java  |  16 +-
 .../flink/runtime/jobgraph/JobGraphTest.java       |  46 ++
 .../flink/runtime/jobmaster/JobMasterTest.java     |  64 +--
 9 files changed, 1069 insertions(+), 454 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DefaultJobManagerRunnerFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DefaultJobManagerRunnerFactory.java
index c6c537f..6962b6e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DefaultJobManagerRunnerFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DefaultJobManagerRunnerFactory.java
@@ -25,6 +25,8 @@ import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobmaster.JobManagerRunner;
 import org.apache.flink.runtime.jobmaster.JobManagerSharedServices;
 import org.apache.flink.runtime.jobmaster.JobMasterConfiguration;
+import org.apache.flink.runtime.jobmaster.LegacySchedulerFactory;
+import org.apache.flink.runtime.jobmaster.SchedulerNGFactory;
 import org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory;
 import org.apache.flink.runtime.jobmaster.factories.JobManagerJobMetricGroupFactory;
 import org.apache.flink.runtime.jobmaster.factories.JobMasterServiceFactory;
@@ -56,6 +58,8 @@ public enum DefaultJobManagerRunnerFactory implements JobManagerRunnerFactory {
 
 		final SlotPoolFactory slotPoolFactory = DefaultSlotPoolFactory.fromConfiguration(configuration);
 		final SchedulerFactory schedulerFactory = DefaultSchedulerFactory.fromConfiguration(configuration);
+		final SchedulerNGFactory schedulerNGFactory = new LegacySchedulerFactory(
+			jobManagerServices.getRestartStrategyFactory());
 
 		final JobMasterServiceFactory jobMasterFactory = new DefaultJobMasterServiceFactory(
 			jobMasterConfiguration,
@@ -66,7 +70,8 @@ public enum DefaultJobManagerRunnerFactory implements JobManagerRunnerFactory {
 			jobManagerServices,
 			heartbeatServices,
 			jobManagerJobMetricGroupFactory,
-			fatalErrorHandler);
+			fatalErrorHandler,
+			schedulerNGFactory);
 
 		return new JobManagerRunner(
 			jobGraph,
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index cae6922..627cc45 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -18,37 +18,22 @@
 
 package org.apache.flink.runtime.jobmaster;
 
-import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.functions.AggregateFunction;
-import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.configuration.CheckpointingOptions;
-import org.apache.flink.core.io.InputSplit;
 import org.apache.flink.queryablestate.KvStateID;
-import org.apache.flink.runtime.JobException;
 import org.apache.flink.runtime.accumulators.AccumulatorSnapshot;
 import org.apache.flink.runtime.blob.BlobWriter;
-import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
 import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
-import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
 import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
-import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
-import org.apache.flink.runtime.executiongraph.Execution;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
-import org.apache.flink.runtime.executiongraph.ExecutionGraph;
-import org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder;
-import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
-import org.apache.flink.runtime.executiongraph.IntermediateResult;
 import org.apache.flink.runtime.executiongraph.JobStatusListener;
-import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
-import org.apache.flink.runtime.executiongraph.restart.RestartStrategyResolving;
 import org.apache.flink.runtime.heartbeat.HeartbeatListener;
 import org.apache.flink.runtime.heartbeat.HeartbeatManager;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
@@ -59,7 +44,6 @@ import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
 import org.apache.flink.runtime.jobmanager.OnCompletionActions;
 import org.apache.flink.runtime.jobmanager.PartitionProducerDisposedException;
 import org.apache.flink.runtime.jobmaster.factories.JobManagerJobMetricGroupFactory;
@@ -71,12 +55,10 @@ import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
-import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
 import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
 import org.apache.flink.runtime.messages.webmonitor.JobDetails;
 import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup;
 import org.apache.flink.runtime.query.KvStateLocation;
-import org.apache.flink.runtime.query.KvStateLocationRegistry;
 import org.apache.flink.runtime.query.UnknownKvStateLocation;
 import org.apache.flink.runtime.registration.RegisteredRpcConnection;
 import org.apache.flink.runtime.registration.RegistrationResponse;
@@ -97,7 +79,6 @@ import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
 import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
 import org.apache.flink.runtime.taskmanager.TaskExecutionState;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
-import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.InstantiationUtil;
@@ -118,7 +99,6 @@ import java.util.Optional;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CompletionException;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeoutException;
@@ -174,7 +154,7 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast
 
 	private final Scheduler scheduler;
 
-	private final RestartStrategy restartStrategy;
+	private final SchedulerNGFactory schedulerNGFactory;
 
 	// --------- BackPressure --------
 
@@ -190,7 +170,7 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast
 
 	// -------- Mutable fields ---------
 
-	private ExecutionGraph executionGraph;
+	private SchedulerNG schedulerNG;
 
 	@Nullable
 	private JobManagerJobStatusListener jobStatusListener;
@@ -224,7 +204,8 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast
 			JobManagerJobMetricGroupFactory jobMetricGroupFactory,
 			OnCompletionActions jobCompletionActions,
 			FatalErrorHandler fatalErrorHandler,
-			ClassLoader userCodeLoader) throws Exception {
+			ClassLoader userCodeLoader,
+			SchedulerNGFactory schedulerNGFactory) throws Exception {
 
 		super(rpcService, AkkaRpcServiceUtils.createRandomName(JOB_MANAGER_NAME));
 
@@ -240,6 +221,7 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast
 		this.jobCompletionActions = checkNotNull(jobCompletionActions);
 		this.fatalErrorHandler = checkNotNull(fatalErrorHandler);
 		this.userCodeLoader = checkNotNull(userCodeLoader);
+		this.schedulerNGFactory = checkNotNull(schedulerNGFactory);
 		this.jobMetricGroupFactory = checkNotNull(jobMetricGroupFactory);
 
 		this.taskManagerHeartbeatManager = heartbeatServices.createHeartbeatManagerSender(
@@ -259,17 +241,6 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast
 
 		log.info("Initializing job {} ({}).", jobName, jid);
 
-		final RestartStrategies.RestartStrategyConfiguration restartStrategyConfiguration =
-				jobGraph.getSerializedExecutionConfig()
-						.deserializeValue(userCodeLoader)
-						.getRestartStrategy();
-
-		this.restartStrategy = RestartStrategyResolving.resolve(restartStrategyConfiguration,
-			jobManagerSharedServices.getRestartStrategyFactory(),
-			jobGraph.isCheckpointingEnabled());
-
-		log.info("Using restart strategy {} for {} ({}).", this.restartStrategy, jobName, jid);
-
 		resourceManagerLeaderRetriever = highAvailabilityServices.getResourceManagerLeaderRetriever();
 
 		this.slotPool = checkNotNull(slotPoolFactory).createSlotPool(jobGraph.getJobID());
@@ -281,7 +252,7 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast
 		this.backPressureStatsTracker = checkNotNull(jobManagerSharedServices.getBackPressureStatsTracker());
 
 		this.jobManagerJobMetricGroup = jobMetricGroupFactory.create(jobGraph);
-		this.executionGraph = createAndRestoreExecutionGraph(jobManagerJobMetricGroup);
+		this.schedulerNG = createScheduler(jobManagerJobMetricGroup);
 		this.jobStatusListener = null;
 
 		this.resourceManagerConnection = null;
@@ -290,6 +261,23 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast
 		this.accumulators = new HashMap<>();
 	}
 
+	private SchedulerNG createScheduler(final JobManagerJobMetricGroup jobManagerJobMetricGroup) throws Exception {
+		return schedulerNGFactory.createInstance(
+			log,
+			jobGraph,
+			backPressureStatsTracker,
+			scheduledExecutorService,
+			jobMasterConfiguration.getConfiguration(),
+			scheduler,
+			scheduledExecutorService,
+			userCodeLoader,
+			highAvailabilityServices.getCheckpointRecoveryFactory(),
+			rpcTimeout,
+			blobWriter,
+			jobManagerJobMetricGroup,
+			jobMasterConfiguration.getSlotRequestTimeout());
+	}
+
 	//----------------------------------------------------------------------------------------------
 	// Lifecycle management
 	//----------------------------------------------------------------------------------------------
@@ -361,7 +349,7 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast
 
 	@Override
 	public CompletableFuture<Acknowledge> cancel(Time timeout) {
-		executionGraph.cancel();
+		schedulerNG.cancel();
 
 		return CompletableFuture.completedFuture(Acknowledge.get());
 	}
@@ -377,7 +365,7 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast
 			final TaskExecutionState taskExecutionState) {
 		checkNotNull(taskExecutionState, "taskExecutionState");
 
-		if (executionGraph.updateState(taskExecutionState)) {
+		if (schedulerNG.updateTaskExecutionState(taskExecutionState)) {
 			return CompletableFuture.completedFuture(Acknowledge.get());
 		} else {
 			return FutureUtils.completedExceptionally(
@@ -391,43 +379,11 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast
 			final JobVertexID vertexID,
 			final ExecutionAttemptID executionAttempt) {
 
-		final Execution execution = executionGraph.getRegisteredExecutions().get(executionAttempt);
-		if (execution == null) {
-			// can happen when JobManager had already unregistered this execution upon on task failure,
-			// but TaskManager get some delay to aware of that situation
-			if (log.isDebugEnabled()) {
-				log.debug("Can not find Execution for attempt {}.", executionAttempt);
-			}
-			// but we should TaskManager be aware of this
-			return FutureUtils.completedExceptionally(new Exception("Can not find Execution for attempt " + executionAttempt));
-		}
-
-		final ExecutionJobVertex vertex = executionGraph.getJobVertex(vertexID);
-		if (vertex == null) {
-			log.error("Cannot find execution vertex for vertex ID {}.", vertexID);
-			return FutureUtils.completedExceptionally(new Exception("Cannot find execution vertex for vertex ID " + vertexID));
-		}
-
-		if (vertex.getSplitAssigner() == null) {
-			log.error("No InputSplitAssigner for vertex ID {}.", vertexID);
-			return FutureUtils.completedExceptionally(new Exception("No InputSplitAssigner for vertex ID " + vertexID));
-		}
-
-		final InputSplit nextInputSplit = execution.getNextInputSplit();
-
-		if (log.isDebugEnabled()) {
-			log.debug("Send next input split {}.", nextInputSplit);
-		}
-
 		try {
-			final byte[] serializedInputSplit = InstantiationUtil.serializeObject(nextInputSplit);
-			return CompletableFuture.completedFuture(new SerializedInputSplit(serializedInputSplit));
-		} catch (Exception ex) {
-			log.error("Could not serialize the next input split of class {}.", nextInputSplit.getClass(), ex);
-			IOException reason = new IOException("Could not serialize the next input split of class " +
-					nextInputSplit.getClass() + ".", ex);
-			vertex.fail(reason);
-			return FutureUtils.completedExceptionally(reason);
+			return CompletableFuture.completedFuture(schedulerNG.requestNextInputSplit(vertexID, executionAttempt));
+		} catch (IOException e) {
+			log.warn("Error while requesting next input split", e);
+			return FutureUtils.completedExceptionally(e);
 		}
 	}
 
@@ -436,30 +392,11 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast
 			final IntermediateDataSetID intermediateResultId,
 			final ResultPartitionID resultPartitionId) {
 
-		final Execution execution = executionGraph.getRegisteredExecutions().get(resultPartitionId.getProducerId());
-		if (execution != null) {
-			return CompletableFuture.completedFuture(execution.getState());
-		}
-		else {
-			final IntermediateResult intermediateResult =
-					executionGraph.getAllIntermediateResults().get(intermediateResultId);
-
-			if (intermediateResult != null) {
-				// Try to find the producing execution
-				Execution producerExecution = intermediateResult
-						.getPartitionById(resultPartitionId.getPartitionId())
-						.getProducer()
-						.getCurrentExecutionAttempt();
-
-				if (producerExecution.getAttemptId().equals(resultPartitionId.getProducerId())) {
-					return CompletableFuture.completedFuture(producerExecution.getState());
-				} else {
-					return FutureUtils.completedExceptionally(new PartitionProducerDisposedException(resultPartitionId));
-				}
-			} else {
-				return FutureUtils.completedExceptionally(new IllegalArgumentException("Intermediate data set with ID "
-						+ intermediateResultId + " not found."));
-			}
+		try {
+			return CompletableFuture.completedFuture(schedulerNG.requestPartitionState(intermediateResultId, resultPartitionId));
+		} catch (PartitionProducerDisposedException e) {
+			log.info("Error while requesting partition state", e);
+			return FutureUtils.completedExceptionally(e);
 		}
 	}
 
@@ -467,12 +404,9 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast
 	public CompletableFuture<Acknowledge> scheduleOrUpdateConsumers(
 			final ResultPartitionID partitionID,
 			final Time timeout) {
-		try {
-			executionGraph.scheduleOrUpdateConsumers(partitionID);
-			return CompletableFuture.completedFuture(Acknowledge.get());
-		} catch (Exception e) {
-			return FutureUtils.completedExceptionally(e);
-		}
+
+		schedulerNG.scheduleOrUpdateConsumers(partitionID);
+		return CompletableFuture.completedFuture(Acknowledge.get());
 	}
 
 	@Override
@@ -500,76 +434,22 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast
 			final CheckpointMetrics checkpointMetrics,
 			final TaskStateSnapshot checkpointState) {
 
-		final CheckpointCoordinator checkpointCoordinator = executionGraph.getCheckpointCoordinator();
-		final AcknowledgeCheckpoint ackMessage = new AcknowledgeCheckpoint(
-			jobID,
-			executionAttemptID,
-			checkpointId,
-			checkpointMetrics,
-			checkpointState);
-
-		if (checkpointCoordinator != null) {
-			getRpcService().execute(() -> {
-				try {
-					checkpointCoordinator.receiveAcknowledgeMessage(ackMessage);
-				} catch (Throwable t) {
-					log.warn("Error while processing checkpoint acknowledgement message", t);
-				}
-			});
-		} else {
-			String errorMessage = "Received AcknowledgeCheckpoint message for job {} with no CheckpointCoordinator";
-			if (executionGraph.getState() == JobStatus.RUNNING) {
-				log.error(errorMessage, jobGraph.getJobID());
-			} else {
-				log.debug(errorMessage, jobGraph.getJobID());
-			}
-		}
+		schedulerNG.acknowledgeCheckpoint(jobID, executionAttemptID, checkpointId, checkpointMetrics, checkpointState);
 	}
 
 	// TODO: This method needs a leader session ID
 	@Override
 	public void declineCheckpoint(DeclineCheckpoint decline) {
-		final CheckpointCoordinator checkpointCoordinator = executionGraph.getCheckpointCoordinator();
-
-		if (checkpointCoordinator != null) {
-			getRpcService().execute(() -> {
-				try {
-					checkpointCoordinator.receiveDeclineMessage(decline);
-				} catch (Exception e) {
-					log.error("Error in CheckpointCoordinator while processing {}", decline, e);
-				}
-			});
-		} else {
-			String errorMessage = "Received DeclineCheckpoint message for job {} with no CheckpointCoordinator";
-			if (executionGraph.getState() == JobStatus.RUNNING) {
-				log.error(errorMessage, jobGraph.getJobID());
-			} else {
-				log.debug(errorMessage, jobGraph.getJobID());
-			}
-		}
+		schedulerNG.declineCheckpoint(decline);
 	}
 
 	@Override
 	public CompletableFuture<KvStateLocation> requestKvStateLocation(final JobID jobId, final String registrationName) {
-		// sanity check for the correct JobID
-		if (jobGraph.getJobID().equals(jobId)) {
-			if (log.isDebugEnabled()) {
-				log.debug("Lookup key-value state for job {} with registration " +
-					"name {}.", jobGraph.getJobID(), registrationName);
-			}
-
-			final KvStateLocationRegistry registry = executionGraph.getKvStateLocationRegistry();
-			final KvStateLocation location = registry.getKvStateLocation(registrationName);
-			if (location != null) {
-				return CompletableFuture.completedFuture(location);
-			} else {
-				return FutureUtils.completedExceptionally(new UnknownKvStateLocation(registrationName));
-			}
-		} else {
-			if (log.isDebugEnabled()) {
-				log.debug("Request of key-value state location for unknown job {} received.", jobId);
-			}
-			return FutureUtils.completedExceptionally(new FlinkJobNotFoundException(jobId));
+		try {
+			return CompletableFuture.completedFuture(schedulerNG.requestKvStateLocation(jobId, registrationName));
+		} catch (UnknownKvStateLocation | FlinkJobNotFoundException e) {
+			log.info("Error while request key-value state location", e);
+			return FutureUtils.completedExceptionally(e);
 		}
 	}
 
@@ -581,26 +461,13 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast
 			final String registrationName,
 			final KvStateID kvStateId,
 			final InetSocketAddress kvStateServerAddress) {
-		if (jobGraph.getJobID().equals(jobId)) {
-			if (log.isDebugEnabled()) {
-				log.debug("Key value state registered for job {} under name {}.",
-					jobGraph.getJobID(), registrationName);
-			}
-
-			try {
-				executionGraph.getKvStateLocationRegistry().notifyKvStateRegistered(
-					jobVertexId, keyGroupRange, registrationName, kvStateId, kvStateServerAddress);
 
-				return CompletableFuture.completedFuture(Acknowledge.get());
-			} catch (Exception e) {
-				log.error("Failed to notify KvStateRegistry about registration {}.", registrationName, e);
-				return FutureUtils.completedExceptionally(e);
-			}
-		} else {
-			if (log.isDebugEnabled()) {
-				log.debug("Notification about key-value state registration for unknown job {} received.", jobId);
-			}
-			return FutureUtils.completedExceptionally(new FlinkJobNotFoundException(jobId));
+		try {
+			schedulerNG.notifyKvStateRegistered(jobId, jobVertexId, keyGroupRange, registrationName, kvStateId, kvStateServerAddress);
+			return CompletableFuture.completedFuture(Acknowledge.get());
+		} catch (FlinkJobNotFoundException e) {
+			log.info("Error while receiving notification about key-value state registration", e);
+			return FutureUtils.completedExceptionally(e);
 		}
 	}
 
@@ -610,26 +477,12 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast
 			JobVertexID jobVertexId,
 			KeyGroupRange keyGroupRange,
 			String registrationName) {
-		if (jobGraph.getJobID().equals(jobId)) {
-			if (log.isDebugEnabled()) {
-				log.debug("Key value state unregistered for job {} under name {}.",
-					jobGraph.getJobID(), registrationName);
-			}
-
-			try {
-				executionGraph.getKvStateLocationRegistry().notifyKvStateUnregistered(
-					jobVertexId, keyGroupRange, registrationName);
-
-				return CompletableFuture.completedFuture(Acknowledge.get());
-			} catch (Exception e) {
-				log.error("Failed to notify KvStateRegistry about unregistration {}.", registrationName, e);
-				return FutureUtils.completedExceptionally(e);
-			}
-		} else {
-			if (log.isDebugEnabled()) {
-				log.debug("Notification about key-value state deregistration for unknown job {} received.", jobId);
-			}
-			return FutureUtils.completedExceptionally(new FlinkJobNotFoundException(jobId));
+		try {
+			schedulerNG.notifyKvStateUnregistered(jobId, jobVertexId, keyGroupRange, registrationName);
+			return CompletableFuture.completedFuture(Acknowledge.get());
+		} catch (FlinkJobNotFoundException e) {
+			log.info("Error while receiving notification about key-value state de-registration", e);
+			return FutureUtils.completedExceptionally(e);
 		}
 	}
 
@@ -749,18 +602,17 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast
 
 	@Override
 	public CompletableFuture<JobDetails> requestJobDetails(Time timeout) {
-		final ExecutionGraph currentExecutionGraph = executionGraph;
-		return CompletableFuture.supplyAsync(() -> WebMonitorUtils.createDetailsForJob(currentExecutionGraph), scheduledExecutorService);
+		return CompletableFuture.completedFuture(schedulerNG.requestJobDetails());
 	}
 
 	@Override
 	public CompletableFuture<JobStatus> requestJobStatus(Time timeout) {
-		return CompletableFuture.completedFuture(executionGraph.getState());
+		return CompletableFuture.completedFuture(schedulerNG.requestJobStatus());
 	}
 
 	@Override
 	public CompletableFuture<ArchivedExecutionGraph> requestJob(Time timeout) {
-		return CompletableFuture.completedFuture(ArchivedExecutionGraph.createFrom(executionGraph));
+		return CompletableFuture.completedFuture(schedulerNG.requestJob());
 	}
 
 	@Override
@@ -769,37 +621,7 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast
 			final boolean cancelJob,
 			final Time timeout) {
 
-		final CheckpointCoordinator checkpointCoordinator = executionGraph.getCheckpointCoordinator();
-		if (checkpointCoordinator == null) {
-			return FutureUtils.completedExceptionally(new IllegalStateException(
-				String.format("Job %s is not a streaming job.", jobGraph.getJobID())));
-		} else if (targetDirectory == null && !checkpointCoordinator.getCheckpointStorage().hasDefaultSavepointLocation()) {
-			log.info("Trying to cancel job {} with savepoint, but no savepoint directory configured.", jobGraph.getJobID());
-
-			return FutureUtils.completedExceptionally(new IllegalStateException(
-				"No savepoint directory configured. You can either specify a directory " +
-					"while cancelling via -s :targetDirectory or configure a cluster-wide " +
-					"default via key '" + CheckpointingOptions.SAVEPOINT_DIRECTORY.key() + "'."));
-		}
-
-		if (cancelJob) {
-			checkpointCoordinator.stopCheckpointScheduler();
-		}
-		return checkpointCoordinator
-			.triggerSavepoint(System.currentTimeMillis(), targetDirectory)
-			.thenApply(CompletedCheckpoint::getExternalPointer)
-			.handleAsync((path, throwable) -> {
-				if (throwable != null) {
-					if (cancelJob) {
-						startCheckpointScheduler(checkpointCoordinator);
-					}
-					throw new CompletionException(throwable);
-				} else if (cancelJob) {
-					log.info("Savepoint stored in {}. Now cancelling {}.", path, jobGraph.getJobID());
-					cancel(timeout);
-				}
-				return path;
-			}, getMainThreadExecutor());
+		return schedulerNG.triggerSavepoint(targetDirectory, cancelJob);
 	}
 
 	@Override
@@ -808,80 +630,19 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast
 			final boolean advanceToEndOfEventTime,
 			final Time timeout) {
 
-		final CheckpointCoordinator checkpointCoordinator = executionGraph.getCheckpointCoordinator();
-
-		if (checkpointCoordinator == null) {
-			return FutureUtils.completedExceptionally(new IllegalStateException(
-					String.format("Job %s is not a streaming job.", jobGraph.getJobID())));
-		}
-
-		if (targetDirectory == null && !checkpointCoordinator.getCheckpointStorage().hasDefaultSavepointLocation()) {
-			log.info("Trying to cancel job {} with savepoint, but no savepoint directory configured.", jobGraph.getJobID());
-
-			return FutureUtils.completedExceptionally(new IllegalStateException(
-					"No savepoint directory configured. You can either specify a directory " +
-							"while cancelling via -s :targetDirectory or configure a cluster-wide " +
-							"default via key '" + CheckpointingOptions.SAVEPOINT_DIRECTORY.key() + "'."));
-		}
-
-		final long now = System.currentTimeMillis();
-
-		// we stop the checkpoint coordinator so that we are guaranteed
-		// to have only the data of the synchronous savepoint committed.
-		// in case of failure, and if the job restarts, the coordinator
-		// will be restarted by the CheckpointCoordinatorDeActivator.
-		checkpointCoordinator.stopCheckpointScheduler();
-
-		final CompletableFuture<String> savepointFuture = checkpointCoordinator
-				.triggerSynchronousSavepoint(now, advanceToEndOfEventTime, targetDirectory)
-				.handleAsync((completedCheckpoint, throwable) -> {
-					if (throwable != null) {
-						log.info("Failed during stopping job {} with a savepoint. Reason: {}", jobGraph.getJobID(), throwable.getMessage());
-						throw new CompletionException(throwable);
-					}
-					return completedCheckpoint.getExternalPointer();
-				}, getMainThreadExecutor());
-
-		final CompletableFuture<JobStatus> terminationFuture = executionGraph
-				.getTerminationFuture()
-				.handleAsync((jobstatus, throwable) -> {
-
-					if (throwable != null) {
-						log.info("Failed during stopping job {} with a savepoint. Reason: {}", jobGraph.getJobID(), throwable.getMessage());
-						throw new CompletionException(throwable);
-					} else if(jobstatus != JobStatus.FINISHED) {
-						log.info("Failed during stopping job {} with a savepoint. Reason: Reached state {} instead of FINISHED.", jobGraph.getJobID(), jobstatus);
-						throw new CompletionException(new FlinkException("Reached state " + jobstatus + " instead of FINISHED."));
-					}
-					return jobstatus;
-				}, getMainThreadExecutor());
-
-		return savepointFuture.thenCompose((path) ->
-			terminationFuture.thenApply((jobStatus -> path)));
-	}
-
-	private void startCheckpointScheduler(final CheckpointCoordinator checkpointCoordinator) {
-		if (checkpointCoordinator.isPeriodicCheckpointingConfigured()) {
-			try {
-				checkpointCoordinator.startCheckpointScheduler();
-			} catch (IllegalStateException ignored) {
-				// Concurrent shut down of the coordinator
-			}
-		}
+		return schedulerNG.stopWithSavepoint(targetDirectory, advanceToEndOfEventTime);
 	}
 
 	@Override
 	public CompletableFuture<OperatorBackPressureStatsResponse> requestOperatorBackPressureStats(final JobVertexID jobVertexId) {
-		final ExecutionJobVertex jobVertex = executionGraph.getJobVertex(jobVertexId);
-		if (jobVertex == null) {
-			return FutureUtils.completedExceptionally(new FlinkException("JobVertexID not found " +
-				jobVertexId));
+		try {
+			final Optional<OperatorBackPressureStats> operatorBackPressureStats = schedulerNG.requestOperatorBackPressureStats(jobVertexId);
+			return CompletableFuture.completedFuture(OperatorBackPressureStatsResponse.of(
+				operatorBackPressureStats.orElse(null)));
+		} catch (FlinkException e) {
+			log.info("Error while requesting operator back pressure stats", e);
+			return FutureUtils.completedExceptionally(e);
 		}
-
-		final Optional<OperatorBackPressureStats> operatorBackPressureStats =
-			backPressureStatsTracker.getOperatorBackPressureStats(jobVertex);
-		return CompletableFuture.completedFuture(OperatorBackPressureStatsResponse.of(
-			operatorBackPressureStats.orElse(null)));
 	}
 
 	@Override
@@ -933,7 +694,7 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast
 
 		log.info("Starting execution of job {} ({}) under job master id {}.", jobGraph.getName(), jobGraph.getJobID(), newJobMasterId);
 
-		resetAndScheduleExecutionGraph();
+		resetAndStartScheduler();
 
 		return Acknowledge.get();
 	}
@@ -994,7 +755,7 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast
 			log.warn("Failed to stop resource manager leader retriever when suspending.", t);
 		}
 
-		suspendAndClearExecutionGraphFields(cause);
+		suspendAndClearSchedulerFields(cause);
 
 		// the slot pool stops receiving messages and clears its pooled slots
 		slotPool.suspend();
@@ -1005,101 +766,58 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast
 		return Acknowledge.get();
 	}
 
-	private void assignExecutionGraph(
-			ExecutionGraph newExecutionGraph,
+	private void assignScheduler(
+			SchedulerNG newScheduler,
 			JobManagerJobMetricGroup newJobManagerJobMetricGroup) {
 		validateRunsInMainThread();
-		checkState(executionGraph.getState().isTerminalState());
+		checkState(schedulerNG.requestJobStatus().isTerminalState());
 		checkState(jobManagerJobMetricGroup == null);
 
-		executionGraph = newExecutionGraph;
+		schedulerNG = newScheduler;
 		jobManagerJobMetricGroup = newJobManagerJobMetricGroup;
 	}
 
-	private void resetAndScheduleExecutionGraph() throws Exception {
+	private void resetAndStartScheduler() throws Exception {
 		validateRunsInMainThread();
 
-		final CompletableFuture<Void> executionGraphAssignedFuture;
+		final CompletableFuture<Void> schedulerAssignedFuture;
 
-		if (executionGraph.getState() == JobStatus.CREATED) {
-			executionGraphAssignedFuture = CompletableFuture.completedFuture(null);
-			executionGraph.start(getMainThreadExecutor());
+		if (schedulerNG.requestJobStatus() == JobStatus.CREATED) {
+			schedulerAssignedFuture = CompletableFuture.completedFuture(null);
+			schedulerNG.setMainThreadExecutor(getMainThreadExecutor());
 		} else {
-			suspendAndClearExecutionGraphFields(new FlinkException("ExecutionGraph is being reset in order to be rescheduled."));
+			suspendAndClearSchedulerFields(new FlinkException("ExecutionGraph is being reset in order to be rescheduled."));
 			final JobManagerJobMetricGroup newJobManagerJobMetricGroup = jobMetricGroupFactory.create(jobGraph);
-			final ExecutionGraph newExecutionGraph = createAndRestoreExecutionGraph(newJobManagerJobMetricGroup);
+			final SchedulerNG newScheduler = createScheduler(newJobManagerJobMetricGroup);
 
-			executionGraphAssignedFuture = executionGraph.getTerminationFuture().handle(
-				(JobStatus ignored, Throwable throwable) -> {
-					newExecutionGraph.start(getMainThreadExecutor());
-					assignExecutionGraph(newExecutionGraph, newJobManagerJobMetricGroup);
+			schedulerAssignedFuture = schedulerNG.getTerminationFuture().handle(
+				(ignored, throwable) -> {
+					newScheduler.setMainThreadExecutor(getMainThreadExecutor());
+					assignScheduler(newScheduler, newJobManagerJobMetricGroup);
 					return null;
-				});
+				}
+			);
 		}
 
-		executionGraphAssignedFuture.thenRun(this::scheduleExecutionGraph);
+		schedulerAssignedFuture.thenRun(this::startScheduling);
 	}
 
-	private void scheduleExecutionGraph() {
+	private void startScheduling() {
 		checkState(jobStatusListener == null);
 		// register self as job status change listener
 		jobStatusListener = new JobManagerJobStatusListener();
-		executionGraph.registerJobStatusListener(jobStatusListener);
-
-		try {
-			executionGraph.scheduleForExecution();
-		}
-		catch (Throwable t) {
-			executionGraph.failGlobal(t);
-		}
-	}
-
-	private ExecutionGraph createAndRestoreExecutionGraph(JobManagerJobMetricGroup currentJobManagerJobMetricGroup) throws Exception {
-
-		ExecutionGraph newExecutionGraph = createExecutionGraph(currentJobManagerJobMetricGroup);
-
-		final CheckpointCoordinator checkpointCoordinator = newExecutionGraph.getCheckpointCoordinator();
-
-		if (checkpointCoordinator != null) {
-			// check whether we find a valid checkpoint
-			if (!checkpointCoordinator.restoreLatestCheckpointedState(
-				newExecutionGraph.getAllVertices(),
-				false,
-				false)) {
-
-				// check whether we can restore from a savepoint
-				tryRestoreExecutionGraphFromSavepoint(newExecutionGraph, jobGraph.getSavepointRestoreSettings());
-			}
-		}
+		schedulerNG.registerJobStatusListener(jobStatusListener);
 
-		return newExecutionGraph;
+		schedulerNG.startScheduling();
 	}
 
-	private ExecutionGraph createExecutionGraph(JobManagerJobMetricGroup currentJobManagerJobMetricGroup) throws JobExecutionException, JobException {
-		return ExecutionGraphBuilder.buildGraph(
-			null,
-			jobGraph,
-			jobMasterConfiguration.getConfiguration(),
-			scheduledExecutorService,
-			scheduledExecutorService,
-			scheduler,
-			userCodeLoader,
-			highAvailabilityServices.getCheckpointRecoveryFactory(),
-			rpcTimeout,
-			restartStrategy,
-			currentJobManagerJobMetricGroup,
-			blobWriter,
-			jobMasterConfiguration.getSlotRequestTimeout(),
-			log);
-	}
-
-	private void suspendAndClearExecutionGraphFields(Exception cause) {
-		suspendExecutionGraph(cause);
-		clearExecutionGraphFields();
+	private void suspendAndClearSchedulerFields(Exception cause) {
+		suspendScheduler(cause);
+		clearSchedulerFields();
 	}
 
-	private void suspendExecutionGraph(Exception cause) {
-		executionGraph.suspend(cause);
+	private void suspendScheduler(Exception cause) {
+		schedulerNG.suspend(cause);
 
 		if (jobManagerJobMetricGroup != null) {
 			jobManagerJobMetricGroup.close();
@@ -1110,31 +828,11 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast
 		}
 	}
 
-	private void clearExecutionGraphFields() {
+	private void clearSchedulerFields() {
 		jobManagerJobMetricGroup = null;
 		jobStatusListener = null;
 	}
 
-	/**
-	 * Tries to restore the given {@link ExecutionGraph} from the provided {@link SavepointRestoreSettings}.
-	 *
-	 * @param executionGraphToRestore {@link ExecutionGraph} which is supposed to be restored
-	 * @param savepointRestoreSettings {@link SavepointRestoreSettings} containing information about the savepoint to restore from
-	 * @throws Exception if the {@link ExecutionGraph} could not be restored
-	 */
-	private void tryRestoreExecutionGraphFromSavepoint(ExecutionGraph executionGraphToRestore, SavepointRestoreSettings savepointRestoreSettings) throws Exception {
-		if (savepointRestoreSettings.restoreSavepoint()) {
-			final CheckpointCoordinator checkpointCoordinator = executionGraphToRestore.getCheckpointCoordinator();
-			if (checkpointCoordinator != null) {
-				checkpointCoordinator.restoreSavepoint(
-					savepointRestoreSettings.getRestorePath(),
-					savepointRestoreSettings.allowNonRestoredState(),
-					executionGraphToRestore.getAllVertices(),
-					userCodeLoader);
-			}
-		}
-	}
-
 	//----------------------------------------------------------------------------------------------
 
 	private void handleJobMasterError(final Throwable cause) {
@@ -1154,7 +852,7 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast
 		validateRunsInMainThread();
 
 		if (newJobStatus.isGloballyTerminalState()) {
-			final ArchivedExecutionGraph archivedExecutionGraph = ArchivedExecutionGraph.createFrom(executionGraph);
+			final ArchivedExecutionGraph archivedExecutionGraph = schedulerNG.requestJob();
 			scheduledExecutorService.execute(() -> jobCompletionActions.jobReachedGloballyTerminalState(archivedExecutionGraph));
 		}
 	}
@@ -1414,7 +1112,7 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast
 		@Override
 		public void reportPayload(ResourceID resourceID, AccumulatorReport payload) {
 			for (AccumulatorSnapshot snapshot : payload.getAccumulatorSnapshots()) {
-				executionGraph.updateAccumulators(snapshot);
+				schedulerNG.updateAccumulators(snapshot);
 			}
 		}
 
@@ -1449,9 +1147,5 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast
 			return CompletableFuture.completedFuture(null);
 		}
 	}
-
-	@VisibleForTesting
-	RestartStrategy getRestartStrategy() {
-		return restartStrategy;
-	}
 }
+
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/LegacyScheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/LegacyScheduler.java
new file mode 100644
index 0000000..b6a256c
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/LegacyScheduler.java
@@ -0,0 +1,626 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.CheckpointingOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.io.InputSplit;
+import org.apache.flink.queryablestate.KvStateID;
+import org.apache.flink.runtime.JobException;
+import org.apache.flink.runtime.accumulators.AccumulatorSnapshot;
+import org.apache.flink.runtime.blob.BlobWriter;
+import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
+import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
+import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
+import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
+import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder;
+import org.apache.flink.runtime.executiongraph.ExecutionGraphException;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.IntermediateResult;
+import org.apache.flink.runtime.executiongraph.JobStatusListener;
+import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
+import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory;
+import org.apache.flink.runtime.executiongraph.restart.RestartStrategyResolving;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
+import org.apache.flink.runtime.jobmanager.PartitionProducerDisposedException;
+import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
+import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
+import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
+import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
+import org.apache.flink.runtime.messages.webmonitor.JobDetails;
+import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup;
+import org.apache.flink.runtime.query.KvStateLocation;
+import org.apache.flink.runtime.query.KvStateLocationRegistry;
+import org.apache.flink.runtime.query.UnknownKvStateLocation;
+import org.apache.flink.runtime.rest.handler.legacy.backpressure.BackPressureStatsTracker;
+import org.apache.flink.runtime.rest.handler.legacy.backpressure.OperatorBackPressureStats;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.taskmanager.TaskExecutionState;
+import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.util.function.FunctionUtils;
+
+import org.slf4j.Logger;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ScheduledExecutorService;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A scheduler that delegates to the scheduling logic in the {@link ExecutionGraph}.
+ *
+ * @see ExecutionGraph#scheduleForExecution()
+ */
+public class LegacyScheduler implements SchedulerNG {
+
+	private final Logger log;
+
+	private final JobGraph jobGraph;
+
+	private final ExecutionGraph executionGraph;
+
+	private final BackPressureStatsTracker backPressureStatsTracker;
+
+	private final Executor ioExecutor;
+
+	private final Configuration jobMasterConfiguration;
+
+	private final SlotProvider slotProvider;
+
+	private final ScheduledExecutorService futureExecutor;
+
+	private final ClassLoader userCodeLoader;
+
+	private final CheckpointRecoveryFactory checkpointRecoveryFactory;
+
+	private final Time rpcTimeout;
+
+	private final RestartStrategy restartStrategy;
+
+	private final BlobWriter blobWriter;
+
+	private final Time slotRequestTimeout;
+
+	private ComponentMainThreadExecutor mainThreadExecutor = new ComponentMainThreadExecutor.DummyComponentMainThreadExecutor(
+		"LegacyScheduler is not initialized with proper main thread executor. " +
+			"Call to LegacyScheduler.setMainThreadExecutor(...) required.");
+
+	public LegacyScheduler(
+			final Logger log,
+			final JobGraph jobGraph,
+			final BackPressureStatsTracker backPressureStatsTracker,
+			final Executor ioExecutor,
+			final Configuration jobMasterConfiguration,
+			final SlotProvider slotProvider,
+			final ScheduledExecutorService futureExecutor,
+			final ClassLoader userCodeLoader,
+			final CheckpointRecoveryFactory checkpointRecoveryFactory,
+			final Time rpcTimeout,
+			final RestartStrategyFactory restartStrategyFactory,
+			final BlobWriter blobWriter,
+			final JobManagerJobMetricGroup jobManagerJobMetricGroup,
+			final Time slotRequestTimeout) throws Exception {
+
+		this.log = checkNotNull(log);
+		this.jobGraph = checkNotNull(jobGraph);
+		this.backPressureStatsTracker = checkNotNull(backPressureStatsTracker);
+		this.ioExecutor = checkNotNull(ioExecutor);
+		this.jobMasterConfiguration = checkNotNull(jobMasterConfiguration);
+		this.slotProvider = checkNotNull(slotProvider);
+		this.futureExecutor = checkNotNull(futureExecutor);
+		this.userCodeLoader = checkNotNull(userCodeLoader);
+		this.checkpointRecoveryFactory = checkNotNull(checkpointRecoveryFactory);
+		this.rpcTimeout = checkNotNull(rpcTimeout);
+
+		final RestartStrategies.RestartStrategyConfiguration restartStrategyConfiguration =
+			jobGraph.getSerializedExecutionConfig()
+				.deserializeValue(userCodeLoader)
+				.getRestartStrategy();
+
+		this.restartStrategy = RestartStrategyResolving.resolve(restartStrategyConfiguration,
+			restartStrategyFactory,
+			jobGraph.isCheckpointingEnabled());
+
+		log.info("Using restart strategy {} for {} ({}).", this.restartStrategy, jobGraph.getName(), jobGraph.getJobID());
+
+		this.blobWriter = checkNotNull(blobWriter);
+		this.slotRequestTimeout = checkNotNull(slotRequestTimeout);
+
+		this.executionGraph = createAndRestoreExecutionGraph(jobManagerJobMetricGroup);
+	}
+
+	private ExecutionGraph createAndRestoreExecutionGraph(JobManagerJobMetricGroup currentJobManagerJobMetricGroup) throws Exception {
+
+		ExecutionGraph newExecutionGraph = createExecutionGraph(currentJobManagerJobMetricGroup);
+
+		final CheckpointCoordinator checkpointCoordinator = newExecutionGraph.getCheckpointCoordinator();
+
+		if (checkpointCoordinator != null) {
+			// check whether we find a valid checkpoint
+			if (!checkpointCoordinator.restoreLatestCheckpointedState(
+				newExecutionGraph.getAllVertices(),
+				false,
+				false)) {
+
+				// check whether we can restore from a savepoint
+				tryRestoreExecutionGraphFromSavepoint(newExecutionGraph, jobGraph.getSavepointRestoreSettings());
+			}
+		}
+
+		return newExecutionGraph;
+	}
+
+	private ExecutionGraph createExecutionGraph(JobManagerJobMetricGroup currentJobManagerJobMetricGroup) throws JobExecutionException, JobException {
+		return ExecutionGraphBuilder.buildGraph(
+			null,
+			jobGraph,
+			jobMasterConfiguration,
+			futureExecutor,
+			ioExecutor,
+			slotProvider,
+			userCodeLoader,
+			checkpointRecoveryFactory,
+			rpcTimeout,
+			restartStrategy,
+			currentJobManagerJobMetricGroup,
+			blobWriter,
+			slotRequestTimeout,
+			log);
+	}
+
+	/**
+	 * Tries to restore the given {@link ExecutionGraph} from the provided {@link SavepointRestoreSettings}.
+	 *
+	 * @param executionGraphToRestore {@link ExecutionGraph} which is supposed to be restored
+	 * @param savepointRestoreSettings {@link SavepointRestoreSettings} containing information about the savepoint to restore from
+	 * @throws Exception if the {@link ExecutionGraph} could not be restored
+	 */
+	private void tryRestoreExecutionGraphFromSavepoint(ExecutionGraph executionGraphToRestore, SavepointRestoreSettings savepointRestoreSettings) throws Exception {
+		if (savepointRestoreSettings.restoreSavepoint()) {
+			final CheckpointCoordinator checkpointCoordinator = executionGraphToRestore.getCheckpointCoordinator();
+			if (checkpointCoordinator != null) {
+				checkpointCoordinator.restoreSavepoint(
+					savepointRestoreSettings.getRestorePath(),
+					savepointRestoreSettings.allowNonRestoredState(),
+					executionGraphToRestore.getAllVertices(),
+					userCodeLoader);
+			}
+		}
+	}
+
+	@Override
+	public void setMainThreadExecutor(final ComponentMainThreadExecutor mainThreadExecutor) {
+		this.mainThreadExecutor = checkNotNull(mainThreadExecutor);
+		executionGraph.start(mainThreadExecutor);
+	}
+
+	@Override
+	public void registerJobStatusListener(final JobStatusListener jobStatusListener) {
+		executionGraph.registerJobStatusListener(jobStatusListener);
+	}
+
+	@Override
+	public void startScheduling() {
+		mainThreadExecutor.assertRunningInMainThread();
+
+		try {
+			executionGraph.scheduleForExecution();
+		}
+		catch (Throwable t) {
+			executionGraph.failGlobal(t);
+		}
+	}
+
+	@Override
+	public void suspend(Throwable cause) {
+		mainThreadExecutor.assertRunningInMainThread();
+		executionGraph.suspend(cause);
+	}
+
+	@Override
+	public void cancel() {
+		mainThreadExecutor.assertRunningInMainThread();
+		executionGraph.cancel();
+	}
+
+	@Override
+	public CompletableFuture<Void> getTerminationFuture() {
+		return executionGraph.getTerminationFuture().thenApply(FunctionUtils.nullFn());
+	}
+
+	@Override
+	public boolean updateTaskExecutionState(final TaskExecutionState taskExecutionState) {
+		mainThreadExecutor.assertRunningInMainThread();
+		return executionGraph.updateState(taskExecutionState);
+	}
+
+	@Override
+	public SerializedInputSplit requestNextInputSplit(JobVertexID vertexID, ExecutionAttemptID executionAttempt) throws IOException {
+		mainThreadExecutor.assertRunningInMainThread();
+
+		final Execution execution = executionGraph.getRegisteredExecutions().get(executionAttempt);
+		if (execution == null) {
+			// can happen when JobManager had already unregistered this execution upon on task failure,
+			// but TaskManager get some delay to aware of that situation
+			if (log.isDebugEnabled()) {
+				log.debug("Can not find Execution for attempt {}.", executionAttempt);
+			}
+			// but we should TaskManager be aware of this
+			throw new IllegalArgumentException("Can not find Execution for attempt " + executionAttempt);
+		}
+
+		final ExecutionJobVertex vertex = executionGraph.getJobVertex(vertexID);
+		if (vertex == null) {
+			throw new IllegalArgumentException("Cannot find execution vertex for vertex ID " + vertexID);
+		}
+
+		if (vertex.getSplitAssigner() == null) {
+			throw new IllegalStateException("No InputSplitAssigner for vertex ID " + vertexID);
+		}
+
+		final InputSplit nextInputSplit = execution.getNextInputSplit();
+
+		if (log.isDebugEnabled()) {
+			log.debug("Send next input split {}.", nextInputSplit);
+		}
+
+		try {
+			final byte[] serializedInputSplit = InstantiationUtil.serializeObject(nextInputSplit);
+			return new SerializedInputSplit(serializedInputSplit);
+		} catch (Exception ex) {
+			IOException reason = new IOException("Could not serialize the next input split of class " +
+				nextInputSplit.getClass() + ".", ex);
+			vertex.fail(reason);
+			throw reason;
+		}
+	}
+
+	@Override
+	public ExecutionState requestPartitionState(
+			final IntermediateDataSetID intermediateResultId,
+			final ResultPartitionID resultPartitionId) throws PartitionProducerDisposedException {
+
+		mainThreadExecutor.assertRunningInMainThread();
+
+		final Execution execution = executionGraph.getRegisteredExecutions().get(resultPartitionId.getProducerId());
+		if (execution != null) {
+			return execution.getState();
+		}
+		else {
+			final IntermediateResult intermediateResult =
+				executionGraph.getAllIntermediateResults().get(intermediateResultId);
+
+			if (intermediateResult != null) {
+				// Try to find the producing execution
+				Execution producerExecution = intermediateResult
+					.getPartitionById(resultPartitionId.getPartitionId())
+					.getProducer()
+					.getCurrentExecutionAttempt();
+
+				if (producerExecution.getAttemptId().equals(resultPartitionId.getProducerId())) {
+					return producerExecution.getState();
+				} else {
+					throw new PartitionProducerDisposedException(resultPartitionId);
+				}
+			} else {
+				throw new IllegalArgumentException("Intermediate data set with ID "
+					+ intermediateResultId + " not found.");
+			}
+		}
+	}
+
+	@Override
+	public void scheduleOrUpdateConsumers(final ResultPartitionID partitionID) {
+		mainThreadExecutor.assertRunningInMainThread();
+
+		try {
+			executionGraph.scheduleOrUpdateConsumers(partitionID);
+		} catch (ExecutionGraphException e) {
+			throw new RuntimeException(e);
+		}
+	}
+
+	@Override
+	public ArchivedExecutionGraph requestJob() {
+		mainThreadExecutor.assertRunningInMainThread();
+		return ArchivedExecutionGraph.createFrom(executionGraph);
+	}
+
+	@Override
+	public JobStatus requestJobStatus() {
+		return executionGraph.getState();
+	}
+
+	@Override
+	public JobDetails requestJobDetails() {
+		mainThreadExecutor.assertRunningInMainThread();
+		return WebMonitorUtils.createDetailsForJob(executionGraph);
+	}
+
+	@Override
+	public KvStateLocation requestKvStateLocation(final JobID jobId, final String registrationName) throws UnknownKvStateLocation, FlinkJobNotFoundException {
+		mainThreadExecutor.assertRunningInMainThread();
+
+		// sanity check for the correct JobID
+		if (jobGraph.getJobID().equals(jobId)) {
+			if (log.isDebugEnabled()) {
+				log.debug("Lookup key-value state for job {} with registration " +
+					"name {}.", jobGraph.getJobID(), registrationName);
+			}
+
+			final KvStateLocationRegistry registry = executionGraph.getKvStateLocationRegistry();
+			final KvStateLocation location = registry.getKvStateLocation(registrationName);
+			if (location != null) {
+				return location;
+			} else {
+				throw new UnknownKvStateLocation(registrationName);
+			}
+		} else {
+			if (log.isDebugEnabled()) {
+				log.debug("Request of key-value state location for unknown job {} received.", jobId);
+			}
+			throw new FlinkJobNotFoundException(jobId);
+		}
+	}
+
+	@Override
+	public void notifyKvStateRegistered(final JobID jobId, final JobVertexID jobVertexId, final KeyGroupRange keyGroupRange, final String registrationName, final KvStateID kvStateId, final InetSocketAddress kvStateServerAddress) throws FlinkJobNotFoundException {
+		mainThreadExecutor.assertRunningInMainThread();
+
+		if (jobGraph.getJobID().equals(jobId)) {
+			if (log.isDebugEnabled()) {
+				log.debug("Key value state registered for job {} under name {}.",
+					jobGraph.getJobID(), registrationName);
+			}
+
+			try {
+				executionGraph.getKvStateLocationRegistry().notifyKvStateRegistered(
+					jobVertexId, keyGroupRange, registrationName, kvStateId, kvStateServerAddress);
+			} catch (Exception e) {
+				throw new RuntimeException(e);
+			}
+		} else {
+			throw new FlinkJobNotFoundException(jobId);
+		}
+	}
+
+	@Override
+	public void notifyKvStateUnregistered(final JobID jobId, final JobVertexID jobVertexId, final KeyGroupRange keyGroupRange, final String registrationName) throws FlinkJobNotFoundException {
+		mainThreadExecutor.assertRunningInMainThread();
+
+		if (jobGraph.getJobID().equals(jobId)) {
+			if (log.isDebugEnabled()) {
+				log.debug("Key value state unregistered for job {} under name {}.",
+					jobGraph.getJobID(), registrationName);
+			}
+
+			try {
+				executionGraph.getKvStateLocationRegistry().notifyKvStateUnregistered(
+					jobVertexId, keyGroupRange, registrationName);
+			} catch (Exception e) {
+				throw new RuntimeException(e);
+			}
+		} else {
+			throw new FlinkJobNotFoundException(jobId);
+		}
+	}
+
+	@Override
+	public void updateAccumulators(final AccumulatorSnapshot accumulatorSnapshot) {
+		mainThreadExecutor.assertRunningInMainThread();
+
+		executionGraph.updateAccumulators(accumulatorSnapshot);
+	}
+
+	@Override
+	public Optional<OperatorBackPressureStats> requestOperatorBackPressureStats(final JobVertexID jobVertexId) throws FlinkException {
+		final ExecutionJobVertex jobVertex = executionGraph.getJobVertex(jobVertexId);
+		if (jobVertex == null) {
+			throw new FlinkException("JobVertexID not found " +
+				jobVertexId);
+		}
+
+		return backPressureStatsTracker.getOperatorBackPressureStats(jobVertex);
+	}
+
+	@Override
+	public CompletableFuture<String> triggerSavepoint(final String targetDirectory, final boolean cancelJob) {
+		mainThreadExecutor.assertRunningInMainThread();
+
+		final CheckpointCoordinator checkpointCoordinator = executionGraph.getCheckpointCoordinator();
+		if (checkpointCoordinator == null) {
+			throw new IllegalStateException(
+				String.format("Job %s is not a streaming job.", jobGraph.getJobID()));
+		} else if (targetDirectory == null && !checkpointCoordinator.getCheckpointStorage().hasDefaultSavepointLocation()) {
+			log.info("Trying to cancel job {} with savepoint, but no savepoint directory configured.", jobGraph.getJobID());
+
+			throw new IllegalStateException(
+				"No savepoint directory configured. You can either specify a directory " +
+					"while cancelling via -s :targetDirectory or configure a cluster-wide " +
+					"default via key '" + CheckpointingOptions.SAVEPOINT_DIRECTORY.key() + "'.");
+		}
+
+		if (cancelJob) {
+			checkpointCoordinator.stopCheckpointScheduler();
+		}
+
+		return checkpointCoordinator
+			.triggerSavepoint(System.currentTimeMillis(), targetDirectory)
+			.thenApply(CompletedCheckpoint::getExternalPointer)
+			.handleAsync((path, throwable) -> {
+				if (throwable != null) {
+					if (cancelJob) {
+						startCheckpointScheduler(checkpointCoordinator);
+					}
+					throw new CompletionException(throwable);
+				} else if (cancelJob) {
+					log.info("Savepoint stored in {}. Now cancelling {}.", path, jobGraph.getJobID());
+					cancel();
+				}
+				return path;
+			}, mainThreadExecutor);
+	}
+
+	private void startCheckpointScheduler(final CheckpointCoordinator checkpointCoordinator) {
+		mainThreadExecutor.assertRunningInMainThread();
+
+		if (checkpointCoordinator.isPeriodicCheckpointingConfigured()) {
+			try {
+				checkpointCoordinator.startCheckpointScheduler();
+			} catch (IllegalStateException ignored) {
+				// Concurrent shut down of the coordinator
+			}
+		}
+	}
+
+	@Override
+	public void acknowledgeCheckpoint(final JobID jobID, final ExecutionAttemptID executionAttemptID, final long checkpointId, final CheckpointMetrics checkpointMetrics, final TaskStateSnapshot checkpointState) {
+		mainThreadExecutor.assertRunningInMainThread();
+
+		final CheckpointCoordinator checkpointCoordinator = executionGraph.getCheckpointCoordinator();
+		final AcknowledgeCheckpoint ackMessage = new AcknowledgeCheckpoint(
+			jobID,
+			executionAttemptID,
+			checkpointId,
+			checkpointMetrics,
+			checkpointState);
+
+		if (checkpointCoordinator != null) {
+			ioExecutor.execute(() -> {
+				try {
+					checkpointCoordinator.receiveAcknowledgeMessage(ackMessage);
+				} catch (Throwable t) {
+					log.warn("Error while processing checkpoint acknowledgement message", t);
+				}
+			});
+		} else {
+			String errorMessage = "Received AcknowledgeCheckpoint message for job {} with no CheckpointCoordinator";
+			if (executionGraph.getState() == JobStatus.RUNNING) {
+				log.error(errorMessage, jobGraph.getJobID());
+			} else {
+				log.debug(errorMessage, jobGraph.getJobID());
+			}
+		}
+	}
+
+	@Override
+	public void declineCheckpoint(final DeclineCheckpoint decline) {
+		mainThreadExecutor.assertRunningInMainThread();
+
+		final CheckpointCoordinator checkpointCoordinator = executionGraph.getCheckpointCoordinator();
+
+		if (checkpointCoordinator != null) {
+			ioExecutor.execute(() -> {
+				try {
+					checkpointCoordinator.receiveDeclineMessage(decline);
+				} catch (Exception e) {
+					log.error("Error in CheckpointCoordinator while processing {}", decline, e);
+				}
+			});
+		} else {
+			String errorMessage = "Received DeclineCheckpoint message for job {} with no CheckpointCoordinator";
+			if (executionGraph.getState() == JobStatus.RUNNING) {
+				log.error(errorMessage, jobGraph.getJobID());
+			} else {
+				log.debug(errorMessage, jobGraph.getJobID());
+			}
+		}
+	}
+
+	@Override
+	public CompletableFuture<String> stopWithSavepoint(final String targetDirectory, final boolean advanceToEndOfEventTime) {
+		mainThreadExecutor.assertRunningInMainThread();
+
+		final CheckpointCoordinator checkpointCoordinator = executionGraph.getCheckpointCoordinator();
+
+		if (checkpointCoordinator == null) {
+			return FutureUtils.completedExceptionally(new IllegalStateException(
+				String.format("Job %s is not a streaming job.", jobGraph.getJobID())));
+		}
+
+		if (targetDirectory == null && !checkpointCoordinator.getCheckpointStorage().hasDefaultSavepointLocation()) {
+			log.info("Trying to cancel job {} with savepoint, but no savepoint directory configured.", jobGraph.getJobID());
+
+			return FutureUtils.completedExceptionally(new IllegalStateException(
+				"No savepoint directory configured. You can either specify a directory " +
+					"while cancelling via -s :targetDirectory or configure a cluster-wide " +
+					"default via key '" + CheckpointingOptions.SAVEPOINT_DIRECTORY.key() + "'."));
+		}
+
+		final long now = System.currentTimeMillis();
+
+		// we stop the checkpoint coordinator so that we are guaranteed
+		// to have only the data of the synchronous savepoint committed.
+		// in case of failure, and if the job restarts, the coordinator
+		// will be restarted by the CheckpointCoordinatorDeActivator.
+		checkpointCoordinator.stopCheckpointScheduler();
+
+		final CompletableFuture<String> savepointFuture = checkpointCoordinator
+			.triggerSynchronousSavepoint(now, advanceToEndOfEventTime, targetDirectory)
+			.handle((completedCheckpoint, throwable) -> {
+				if (throwable != null) {
+					log.info("Failed during stopping job {} with a savepoint. Reason: {}", jobGraph.getJobID(), throwable.getMessage());
+					throw new CompletionException(throwable);
+				}
+				return completedCheckpoint.getExternalPointer();
+			});
+
+		final CompletableFuture<JobStatus> terminationFuture = executionGraph
+			.getTerminationFuture()
+			.handle((jobstatus, throwable) -> {
+
+				if (throwable != null) {
+					log.info("Failed during stopping job {} with a savepoint. Reason: {}", jobGraph.getJobID(), throwable.getMessage());
+					throw new CompletionException(throwable);
+				} else if (jobstatus != JobStatus.FINISHED) {
+					log.info("Failed during stopping job {} with a savepoint. Reason: Reached state {} instead of FINISHED.", jobGraph.getJobID(), jobstatus);
+					throw new CompletionException(new FlinkException("Reached state " + jobstatus + " instead of FINISHED."));
+				}
+				return jobstatus;
+			});
+
+		return savepointFuture.thenCompose((path) ->
+			terminationFuture.thenApply((jobStatus -> path)));
+	}
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/LegacySchedulerFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/LegacySchedulerFactory.java
new file mode 100644
index 0000000..33e14b6
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/LegacySchedulerFactory.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.blob.BlobWriter;
+import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
+import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
+import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup;
+import org.apache.flink.runtime.rest.handler.legacy.backpressure.BackPressureStatsTracker;
+
+import org.slf4j.Logger;
+
+import java.util.concurrent.Executor;
+import java.util.concurrent.ScheduledExecutorService;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Factory for {@link LegacyScheduler}.
+ */
+public class LegacySchedulerFactory implements SchedulerNGFactory {
+
+	private final RestartStrategyFactory restartStrategyFactory;
+
+	public LegacySchedulerFactory(final RestartStrategyFactory restartStrategyFactory) {
+		this.restartStrategyFactory = checkNotNull(restartStrategyFactory);
+	}
+
+	@Override
+	public SchedulerNG createInstance(
+			final Logger log,
+			final JobGraph jobGraph,
+			final BackPressureStatsTracker backPressureStatsTracker,
+			final Executor ioExecutor,
+			final Configuration jobMasterConfiguration,
+			final SlotProvider slotProvider,
+			final ScheduledExecutorService futureExecutor,
+			final ClassLoader userCodeLoader,
+			final CheckpointRecoveryFactory checkpointRecoveryFactory,
+			final Time rpcTimeout,
+			final BlobWriter blobWriter,
+			final JobManagerJobMetricGroup jobManagerJobMetricGroup,
+			final Time slotRequestTimeout) throws Exception {
+
+		return new LegacyScheduler(
+			log,
+			jobGraph,
+			backPressureStatsTracker,
+			ioExecutor,
+			jobMasterConfiguration,
+			slotProvider,
+			futureExecutor,
+			userCodeLoader,
+			checkpointRecoveryFactory,
+			rpcTimeout,
+			restartStrategyFactory,
+			blobWriter,
+			jobManagerJobMetricGroup,
+			slotRequestTimeout);
+	}
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/SchedulerNG.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/SchedulerNG.java
new file mode 100644
index 0000000..98809df
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/SchedulerNG.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.queryablestate.KvStateID;
+import org.apache.flink.runtime.accumulators.AccumulatorSnapshot;
+import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
+import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.JobStatusListener;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobmanager.PartitionProducerDisposedException;
+import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
+import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
+import org.apache.flink.runtime.messages.webmonitor.JobDetails;
+import org.apache.flink.runtime.query.KvStateLocation;
+import org.apache.flink.runtime.query.UnknownKvStateLocation;
+import org.apache.flink.runtime.rest.handler.legacy.backpressure.OperatorBackPressureStats;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.taskmanager.TaskExecutionState;
+import org.apache.flink.util.FlinkException;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Interface for scheduling Flink jobs.
+ *
+ * <p>Instances are created via {@link SchedulerNGFactory}, and receive a {@link JobGraph} when
+ * instantiated.
+ *
+ * <p>Implementations can expect that methods will not be invoked concurrently. In fact,
+ * all invocations will originate from a thread in the {@link ComponentMainThreadExecutor}, which
+ * will be passed via {@link #setMainThreadExecutor(ComponentMainThreadExecutor)}.
+ */
+public interface SchedulerNG {
+
+	void setMainThreadExecutor(ComponentMainThreadExecutor mainThreadExecutor);
+
+	void registerJobStatusListener(JobStatusListener jobStatusListener);
+
+	void startScheduling();
+
+	void suspend(Throwable cause);
+
+	void cancel();
+
+	CompletableFuture<Void> getTerminationFuture();
+
+	boolean updateTaskExecutionState(TaskExecutionState taskExecutionState);
+
+	SerializedInputSplit requestNextInputSplit(JobVertexID vertexID, ExecutionAttemptID executionAttempt) throws IOException;
+
+	ExecutionState requestPartitionState(IntermediateDataSetID intermediateResultId, ResultPartitionID resultPartitionId) throws PartitionProducerDisposedException;
+
+	void scheduleOrUpdateConsumers(ResultPartitionID partitionID);
+
+	ArchivedExecutionGraph requestJob();
+
+	JobStatus requestJobStatus();
+
+	JobDetails requestJobDetails();
+
+	// ------------------------------------------------------------------------------------
+	// Methods below do not belong to Scheduler but are included due to historical reasons
+	// ------------------------------------------------------------------------------------
+
+	KvStateLocation requestKvStateLocation(JobID jobId, String registrationName) throws UnknownKvStateLocation, FlinkJobNotFoundException;
+
+	void notifyKvStateRegistered(JobID jobId, JobVertexID jobVertexId, KeyGroupRange keyGroupRange, String registrationName, KvStateID kvStateId, InetSocketAddress kvStateServerAddress) throws FlinkJobNotFoundException;
+
+	void notifyKvStateUnregistered(JobID jobId, JobVertexID jobVertexId, KeyGroupRange keyGroupRange, String registrationName) throws FlinkJobNotFoundException;
+
+	// ------------------------------------------------------------------------
+
+	void updateAccumulators(AccumulatorSnapshot accumulatorSnapshot);
+
+	// ------------------------------------------------------------------------
+
+	Optional<OperatorBackPressureStats> requestOperatorBackPressureStats(JobVertexID jobVertexId) throws FlinkException;
+
+	// ------------------------------------------------------------------------
+
+	CompletableFuture<String> triggerSavepoint(@Nullable String targetDirectory, boolean cancelJob);
+
+	void acknowledgeCheckpoint(JobID jobID, ExecutionAttemptID executionAttemptID, long checkpointId, CheckpointMetrics checkpointMetrics, TaskStateSnapshot checkpointState);
+
+	void declineCheckpoint(DeclineCheckpoint decline);
+
+	CompletableFuture<String> stopWithSavepoint(String targetDirectory, boolean advanceToEndOfEventTime);
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/SchedulerNGFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/SchedulerNGFactory.java
new file mode 100644
index 0000000..f335252
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/SchedulerNGFactory.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.blob.BlobWriter;
+import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
+import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup;
+import org.apache.flink.runtime.rest.handler.legacy.backpressure.BackPressureStatsTracker;
+
+import org.slf4j.Logger;
+
+import java.util.concurrent.Executor;
+import java.util.concurrent.ScheduledExecutorService;
+
+/**
+ * Factory for {@link SchedulerNG}.
+ */
+public interface SchedulerNGFactory {
+
+	SchedulerNG createInstance(
+		Logger log,
+		JobGraph jobGraph,
+		BackPressureStatsTracker backPressureStatsTracker,
+		Executor ioExecutor,
+		Configuration jobMasterConfiguration,
+		SlotProvider slotProvider,
+		ScheduledExecutorService futureExecutor,
+		ClassLoader userCodeLoader,
+		CheckpointRecoveryFactory checkpointRecoveryFactory,
+		Time rpcTimeout,
+		BlobWriter blobWriter,
+		JobManagerJobMetricGroup jobManagerJobMetricGroup,
+		Time slotRequestTimeout) throws Exception;
+
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/factories/DefaultJobMasterServiceFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/factories/DefaultJobMasterServiceFactory.java
index 2ad91d1..3f80042 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/factories/DefaultJobMasterServiceFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/factories/DefaultJobMasterServiceFactory.java
@@ -26,6 +26,7 @@ import org.apache.flink.runtime.jobmanager.OnCompletionActions;
 import org.apache.flink.runtime.jobmaster.JobManagerSharedServices;
 import org.apache.flink.runtime.jobmaster.JobMaster;
 import org.apache.flink.runtime.jobmaster.JobMasterConfiguration;
+import org.apache.flink.runtime.jobmaster.SchedulerNGFactory;
 import org.apache.flink.runtime.jobmaster.slotpool.SchedulerFactory;
 import org.apache.flink.runtime.jobmaster.slotpool.SlotPoolFactory;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
@@ -54,6 +55,8 @@ public class DefaultJobMasterServiceFactory implements JobMasterServiceFactory {
 
 	private final FatalErrorHandler fatalErrorHandler;
 
+	private final SchedulerNGFactory schedulerNGFactory;
+
 	public DefaultJobMasterServiceFactory(
 			JobMasterConfiguration jobMasterConfiguration,
 			SlotPoolFactory slotPoolFactory,
@@ -63,7 +66,8 @@ public class DefaultJobMasterServiceFactory implements JobMasterServiceFactory {
 			JobManagerSharedServices jobManagerSharedServices,
 			HeartbeatServices heartbeatServices,
 			JobManagerJobMetricGroupFactory jobManagerJobMetricGroupFactory,
-			FatalErrorHandler fatalErrorHandler) {
+			FatalErrorHandler fatalErrorHandler,
+			SchedulerNGFactory schedulerNGFactory) {
 		this.jobMasterConfiguration = jobMasterConfiguration;
 		this.slotPoolFactory = slotPoolFactory;
 		this.schedulerFactory = schedulerFactory;
@@ -73,10 +77,15 @@ public class DefaultJobMasterServiceFactory implements JobMasterServiceFactory {
 		this.heartbeatServices = heartbeatServices;
 		this.jobManagerJobMetricGroupFactory = jobManagerJobMetricGroupFactory;
 		this.fatalErrorHandler = fatalErrorHandler;
+		this.schedulerNGFactory = schedulerNGFactory;
 	}
 
 	@Override
-	public JobMaster createJobMasterService(JobGraph jobGraph, OnCompletionActions jobCompletionActions, ClassLoader userCodeClassloader) throws Exception {
+	public JobMaster createJobMasterService(
+			JobGraph jobGraph,
+			OnCompletionActions jobCompletionActions,
+			ClassLoader userCodeClassloader) throws Exception {
+
 		return new JobMaster(
 			rpcService,
 			jobMasterConfiguration,
@@ -90,6 +99,7 @@ public class DefaultJobMasterServiceFactory implements JobMasterServiceFactory {
 			jobManagerJobMetricGroupFactory,
 			jobCompletionActions,
 			fatalErrorHandler,
-			userCodeClassloader);
+			userCodeClassloader,
+			schedulerNGFactory);
 	}
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobGraphTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobGraphTest.java
index 160402b..adb6f5e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobGraphTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobGraphTest.java
@@ -22,17 +22,23 @@ import org.apache.flink.api.common.InvalidProgramException;
 import org.apache.flink.api.common.cache.DistributedCache;
 import org.apache.flink.core.testutils.CommonTestUtils;
 import org.apache.flink.runtime.blob.PermanentBlobKey;
+import org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
+import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
 import org.apache.flink.util.InstantiationUtil;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.Test;
 
 import java.io.IOException;
+import java.util.Collections;
 import java.util.List;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 public class JobGraphTest extends TestLogger {
@@ -309,4 +315,44 @@ public class JobGraphTest extends TestLogger {
 			assertEquals(entry.filePath, jobGraphEntry.filePath);
 		}
 	}
+
+	@Test
+	public void checkpointingIsDisabledByDefault() {
+		final JobGraph jobGraph = new JobGraph();
+
+		assertFalse(jobGraph.isCheckpointingEnabled());
+	}
+
+	@Test
+	public void checkpointingIsEnabledIfIntervalIsPositive() {
+		final JobGraph jobGraph = new JobGraph();
+		jobGraph.setSnapshotSettings(createCheckpointSettingsWithInterval(1));
+
+		assertTrue(jobGraph.isCheckpointingEnabled());
+	}
+
+	@Test
+	public void checkpointingIsDisabledIfIntervalIsMaxValue() {
+		final JobGraph jobGraph = new JobGraph();
+		jobGraph.setSnapshotSettings(createCheckpointSettingsWithInterval(Long.MAX_VALUE));
+
+		assertFalse(jobGraph.isCheckpointingEnabled());
+	}
+
+	private static JobCheckpointingSettings createCheckpointSettingsWithInterval(final long checkpointInterval) {
+		final CheckpointCoordinatorConfiguration checkpointCoordinatorConfiguration = new CheckpointCoordinatorConfiguration(
+			checkpointInterval,
+			Long.MAX_VALUE,
+			Long.MAX_VALUE,
+			Integer.MAX_VALUE,
+			CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
+			true);
+
+		return new JobCheckpointingSettings(
+			Collections.emptyList(),
+			Collections.emptyList(),
+			Collections.emptyList(),
+			checkpointCoordinatorConfiguration,
+			null);
+	}
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
index dcd0559..52792f8 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
@@ -64,8 +64,6 @@ import org.apache.flink.runtime.executiongraph.ArchivedExecutionVertex;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.executiongraph.TaskInformation;
 import org.apache.flink.runtime.executiongraph.failover.FailoverStrategyLoader;
-import org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy;
-import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
 import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.heartbeat.TestingHeartbeatServices;
@@ -188,7 +186,6 @@ import static org.hamcrest.Matchers.instanceOf;
 import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.nullValue;
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -290,6 +287,10 @@ public class JobMasterTest extends TestLogger {
 
 			final JobManagerSharedServices jobManagerSharedServices = new TestingJobManagerSharedServicesBuilder().build();
 			final JobMasterConfiguration jobMasterConfiguration = JobMasterConfiguration.fromConfiguration(configuration);
+
+			final SchedulerNGFactory schedulerNGFactory = new LegacySchedulerFactory(
+				jobManagerSharedServices.getRestartStrategyFactory());
+
 			final JobMaster jobMaster = new JobMaster(
 				rpcService1,
 				jobMasterConfiguration,
@@ -303,7 +304,8 @@ public class JobMasterTest extends TestLogger {
 				UnregisteredJobManagerJobMetricGroupFactory.INSTANCE,
 				new TestingOnCompletionActions(),
 				testingFatalErrorHandler,
-				JobMasterTest.class.getClassLoader()) {
+				JobMasterTest.class.getClassLoader(),
+				schedulerNGFactory) {
 				@Override
 				public void declineCheckpoint(DeclineCheckpoint declineCheckpoint) {
 					declineCheckpointMessageFuture.complete(declineCheckpoint.getReason());
@@ -560,42 +562,6 @@ public class JobMasterTest extends TestLogger {
 	}
 
 	/**
-	 * Tests that in a streaming use case where checkpointing is enabled, a
-	 * fixed delay with Integer.MAX_VALUE retries is instantiated if no other restart
-	 * strategy has been specified.
-	 */
-	@Test
-	public void testAutomaticRestartingWhenCheckpointing() throws Exception {
-		// create savepoint data
-		final long savepointId = 42L;
-		final File savepointFile = createSavepoint(savepointId);
-
-		// set savepoint settings
-		final SavepointRestoreSettings savepointRestoreSettings = SavepointRestoreSettings.forPath(
-			savepointFile.getAbsolutePath(),
-			true);
-		final JobGraph jobGraph = createJobGraphWithCheckpointing(savepointRestoreSettings);
-
-		final StandaloneCompletedCheckpointStore completedCheckpointStore = new StandaloneCompletedCheckpointStore(1);
-		final TestingCheckpointRecoveryFactory testingCheckpointRecoveryFactory = new TestingCheckpointRecoveryFactory(
-			completedCheckpointStore,
-			new StandaloneCheckpointIDCounter());
-		haServices.setCheckpointRecoveryFactory(testingCheckpointRecoveryFactory);
-		final JobMaster jobMaster = createJobMaster(
-			new Configuration(),
-			jobGraph,
-			haServices,
-			new TestingJobManagerSharedServicesBuilder()
-				.setRestartStrategyFactory(RestartStrategyFactory.createRestartStrategyFactory(configuration))
-				.build());
-
-		RestartStrategy restartStrategy = jobMaster.getRestartStrategy();
-
-		assertNotNull(restartStrategy);
-		assertTrue(restartStrategy instanceof FixedDelayRestartStrategy);
-	}
-
-	/**
 	 * Tests that an existing checkpoint will have precedence over an savepoint.
 	 */
 	@Test
@@ -1465,20 +1431,27 @@ public class JobMasterTest extends TestLogger {
 	 */
 	@Test
 	public void testTriggerSavepointTimeout() throws Exception {
+		final JobManagerSharedServices jobManagerSharedServices = new TestingJobManagerSharedServicesBuilder().build();
+		final JobMasterConfiguration jobMasterConfiguration = JobMasterConfiguration.fromConfiguration(configuration);
+
+		final SchedulerNGFactory schedulerNGFactory = new LegacySchedulerFactory(
+			jobManagerSharedServices.getRestartStrategyFactory());
+
 		final JobMaster jobMaster = new JobMaster(
 			rpcService,
-			JobMasterConfiguration.fromConfiguration(configuration),
+			jobMasterConfiguration,
 			jmResourceId,
 			jobGraph,
 			haServices,
 			DefaultSlotPoolFactory.fromConfiguration(configuration),
 			DefaultSchedulerFactory.fromConfiguration(configuration),
-			new TestingJobManagerSharedServicesBuilder().build(),
+			jobManagerSharedServices,
 			heartbeatServices,
 			UnregisteredJobManagerJobMetricGroupFactory.INSTANCE,
 			new TestingOnCompletionActions(),
 			testingFatalErrorHandler,
-			JobMasterTest.class.getClassLoader()) {
+			JobMasterTest.class.getClassLoader(),
+			schedulerNGFactory) {
 
 			@Override
 			public CompletableFuture<String> triggerSavepoint(
@@ -1917,6 +1890,8 @@ public class JobMasterTest extends TestLogger {
 			HeartbeatServices heartbeatServices,
 			OnCompletionActions onCompletionActions) throws Exception {
 		final JobMasterConfiguration jobMasterConfiguration = JobMasterConfiguration.fromConfiguration(configuration);
+		final SchedulerNGFactory schedulerNGFactory = new LegacySchedulerFactory(
+			jobManagerSharedServices.getRestartStrategyFactory());
 
 		return new JobMaster(
 			rpcService,
@@ -1931,7 +1906,8 @@ public class JobMasterTest extends TestLogger {
 			UnregisteredJobManagerJobMetricGroupFactory.INSTANCE,
 			onCompletionActions,
 			testingFatalErrorHandler,
-			JobMasterTest.class.getClassLoader());
+			JobMasterTest.class.getClassLoader(),
+			schedulerNGFactory);
 	}
 
 	private JobGraph createSingleVertexJobWithRestartStrategy() throws IOException {