You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ga...@apache.org on 2019/09/27 12:39:32 UTC

[flink] 05/06: [FLINK-12433][runtime] Introduce SchedulerBase

This is an automated email from the ASF dual-hosted git repository.

gary pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit e865c260da57ed67b111744d427825002e1a47e2
Author: Gary Yao <ga...@apache.org>
AuthorDate: Thu Sep 5 09:34:50 2019 +0200

    [FLINK-12433][runtime] Introduce SchedulerBase
    
    Introduce common super class SchedulerBase for DefaultScheduler and
    LegacyScheduler, which contains code that is shared between the scheduler
    implementations. Previously, DefaultScheduler inherited from LegacyScheduler to
    avoid re-implementing features such as queryable state, taking savepoints, etc.
---
 .../flink/runtime/scheduler/DefaultScheduler.java  |   2 +-
 .../flink/runtime/scheduler/LegacyScheduler.java   | 524 +--------------------
 .../{LegacyScheduler.java => SchedulerBase.java}   |  61 ++-
 3 files changed, 39 insertions(+), 548 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java
index 05429ac..759a56a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java
@@ -39,7 +39,7 @@ import java.util.concurrent.ScheduledExecutorService;
 /**
  * Stub implementation of the future default scheduler.
  */
-public class DefaultScheduler extends LegacyScheduler {
+public class DefaultScheduler extends SchedulerBase {
 
 	public DefaultScheduler(
 			final Logger log,
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/LegacyScheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/LegacyScheduler.java
index e33b6b6..bc6ed69 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/LegacyScheduler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/LegacyScheduler.java
@@ -96,39 +96,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  *
  * @see ExecutionGraph#scheduleForExecution()
  */
-public class LegacyScheduler implements SchedulerNG {
-
-	private final Logger log;
-
-	private final JobGraph jobGraph;
-
-	private final ExecutionGraph executionGraph;
-
-	private final BackPressureStatsTracker backPressureStatsTracker;
-
-	private final Executor ioExecutor;
-
-	private final Configuration jobMasterConfiguration;
-
-	private final SlotProvider slotProvider;
-
-	private final ScheduledExecutorService futureExecutor;
-
-	private final ClassLoader userCodeLoader;
-
-	private final CheckpointRecoveryFactory checkpointRecoveryFactory;
-
-	private final Time rpcTimeout;
-
-	private final RestartStrategy restartStrategy;
-
-	private final BlobWriter blobWriter;
-
-	private final Time slotRequestTimeout;
-
-	private ComponentMainThreadExecutor mainThreadExecutor = new ComponentMainThreadExecutor.DummyComponentMainThreadExecutor(
-		"LegacyScheduler is not initialized with proper main thread executor. " +
-			"Call to LegacyScheduler.setMainThreadExecutor(...) required.");
+public class LegacyScheduler extends SchedulerBase {
 
 	public LegacyScheduler(
 			final Logger log,
@@ -148,498 +116,22 @@ public class LegacyScheduler implements SchedulerNG {
 			final ShuffleMaster<?> shuffleMaster,
 			final PartitionTracker partitionTracker) throws Exception {
 
-		this.log = checkNotNull(log);
-		this.jobGraph = checkNotNull(jobGraph);
-		this.backPressureStatsTracker = checkNotNull(backPressureStatsTracker);
-		this.ioExecutor = checkNotNull(ioExecutor);
-		this.jobMasterConfiguration = checkNotNull(jobMasterConfiguration);
-		this.slotProvider = checkNotNull(slotProvider);
-		this.futureExecutor = checkNotNull(futureExecutor);
-		this.userCodeLoader = checkNotNull(userCodeLoader);
-		this.checkpointRecoveryFactory = checkNotNull(checkpointRecoveryFactory);
-		this.rpcTimeout = checkNotNull(rpcTimeout);
-
-		final RestartStrategies.RestartStrategyConfiguration restartStrategyConfiguration =
-			jobGraph.getSerializedExecutionConfig()
-				.deserializeValue(userCodeLoader)
-				.getRestartStrategy();
-
-		this.restartStrategy = RestartStrategyResolving.resolve(restartStrategyConfiguration,
-			restartStrategyFactory,
-			jobGraph.isCheckpointingEnabled());
-
-		log.info("Using restart strategy {} for {} ({}).", this.restartStrategy, jobGraph.getName(), jobGraph.getJobID());
-
-		this.blobWriter = checkNotNull(blobWriter);
-		this.slotRequestTimeout = checkNotNull(slotRequestTimeout);
-
-		this.executionGraph = createAndRestoreExecutionGraph(jobManagerJobMetricGroup, checkNotNull(shuffleMaster), checkNotNull(partitionTracker));
-	}
-
-	private ExecutionGraph createAndRestoreExecutionGraph(
-			JobManagerJobMetricGroup currentJobManagerJobMetricGroup,
-			ShuffleMaster<?> shuffleMaster,
-			PartitionTracker partitionTracker) throws Exception {
-
-		ExecutionGraph newExecutionGraph = createExecutionGraph(currentJobManagerJobMetricGroup, shuffleMaster, partitionTracker);
-
-		final CheckpointCoordinator checkpointCoordinator = newExecutionGraph.getCheckpointCoordinator();
-
-		if (checkpointCoordinator != null) {
-			// check whether we find a valid checkpoint
-			if (!checkpointCoordinator.restoreLatestCheckpointedState(
-				newExecutionGraph.getAllVertices(),
-				false,
-				false)) {
-
-				// check whether we can restore from a savepoint
-				tryRestoreExecutionGraphFromSavepoint(newExecutionGraph, jobGraph.getSavepointRestoreSettings());
-			}
-		}
-
-		return newExecutionGraph;
-	}
-
-	private ExecutionGraph createExecutionGraph(
-			JobManagerJobMetricGroup currentJobManagerJobMetricGroup,
-			ShuffleMaster<?> shuffleMaster,
-			final PartitionTracker partitionTracker) throws JobExecutionException, JobException {
-		return ExecutionGraphBuilder.buildGraph(
-			null,
+		super(
+			log,
 			jobGraph,
-			jobMasterConfiguration,
-			futureExecutor,
+			backPressureStatsTracker,
 			ioExecutor,
+			jobMasterConfiguration,
 			slotProvider,
+			futureExecutor,
 			userCodeLoader,
 			checkpointRecoveryFactory,
 			rpcTimeout,
-			restartStrategy,
-			currentJobManagerJobMetricGroup,
+			restartStrategyFactory,
 			blobWriter,
+			jobManagerJobMetricGroup,
 			slotRequestTimeout,
-			log,
 			shuffleMaster,
 			partitionTracker);
 	}
-
-	/**
-	 * Tries to restore the given {@link ExecutionGraph} from the provided {@link SavepointRestoreSettings}.
-	 *
-	 * @param executionGraphToRestore {@link ExecutionGraph} which is supposed to be restored
-	 * @param savepointRestoreSettings {@link SavepointRestoreSettings} containing information about the savepoint to restore from
-	 * @throws Exception if the {@link ExecutionGraph} could not be restored
-	 */
-	private void tryRestoreExecutionGraphFromSavepoint(ExecutionGraph executionGraphToRestore, SavepointRestoreSettings savepointRestoreSettings) throws Exception {
-		if (savepointRestoreSettings.restoreSavepoint()) {
-			final CheckpointCoordinator checkpointCoordinator = executionGraphToRestore.getCheckpointCoordinator();
-			if (checkpointCoordinator != null) {
-				checkpointCoordinator.restoreSavepoint(
-					savepointRestoreSettings.getRestorePath(),
-					savepointRestoreSettings.allowNonRestoredState(),
-					executionGraphToRestore.getAllVertices(),
-					userCodeLoader);
-			}
-		}
-	}
-
-	@Override
-	public void setMainThreadExecutor(final ComponentMainThreadExecutor mainThreadExecutor) {
-		this.mainThreadExecutor = checkNotNull(mainThreadExecutor);
-		executionGraph.start(mainThreadExecutor);
-	}
-
-	@Override
-	public void registerJobStatusListener(final JobStatusListener jobStatusListener) {
-		executionGraph.registerJobStatusListener(jobStatusListener);
-	}
-
-	@Override
-	public void startScheduling() {
-		mainThreadExecutor.assertRunningInMainThread();
-
-		try {
-			executionGraph.scheduleForExecution();
-		}
-		catch (Throwable t) {
-			executionGraph.failGlobal(t);
-		}
-	}
-
-	@Override
-	public void suspend(Throwable cause) {
-		mainThreadExecutor.assertRunningInMainThread();
-		executionGraph.suspend(cause);
-	}
-
-	@Override
-	public void cancel() {
-		mainThreadExecutor.assertRunningInMainThread();
-		executionGraph.cancel();
-	}
-
-	@Override
-	public CompletableFuture<Void> getTerminationFuture() {
-		return executionGraph.getTerminationFuture().thenApply(FunctionUtils.nullFn());
-	}
-
-	@Override
-	public boolean updateTaskExecutionState(final TaskExecutionState taskExecutionState) {
-		mainThreadExecutor.assertRunningInMainThread();
-		return executionGraph.updateState(taskExecutionState);
-	}
-
-	@Override
-	public SerializedInputSplit requestNextInputSplit(JobVertexID vertexID, ExecutionAttemptID executionAttempt) throws IOException {
-		mainThreadExecutor.assertRunningInMainThread();
-
-		final Execution execution = executionGraph.getRegisteredExecutions().get(executionAttempt);
-		if (execution == null) {
-			// can happen when JobManager had already unregistered this execution upon on task failure,
-			// but TaskManager get some delay to aware of that situation
-			if (log.isDebugEnabled()) {
-				log.debug("Can not find Execution for attempt {}.", executionAttempt);
-			}
-			// but we should TaskManager be aware of this
-			throw new IllegalArgumentException("Can not find Execution for attempt " + executionAttempt);
-		}
-
-		final ExecutionJobVertex vertex = executionGraph.getJobVertex(vertexID);
-		if (vertex == null) {
-			throw new IllegalArgumentException("Cannot find execution vertex for vertex ID " + vertexID);
-		}
-
-		if (vertex.getSplitAssigner() == null) {
-			throw new IllegalStateException("No InputSplitAssigner for vertex ID " + vertexID);
-		}
-
-		final InputSplit nextInputSplit = execution.getNextInputSplit();
-
-		if (log.isDebugEnabled()) {
-			log.debug("Send next input split {}.", nextInputSplit);
-		}
-
-		try {
-			final byte[] serializedInputSplit = InstantiationUtil.serializeObject(nextInputSplit);
-			return new SerializedInputSplit(serializedInputSplit);
-		} catch (Exception ex) {
-			IOException reason = new IOException("Could not serialize the next input split of class " +
-				nextInputSplit.getClass() + ".", ex);
-			vertex.fail(reason);
-			throw reason;
-		}
-	}
-
-	@Override
-	public ExecutionState requestPartitionState(
-			final IntermediateDataSetID intermediateResultId,
-			final ResultPartitionID resultPartitionId) throws PartitionProducerDisposedException {
-
-		mainThreadExecutor.assertRunningInMainThread();
-
-		final Execution execution = executionGraph.getRegisteredExecutions().get(resultPartitionId.getProducerId());
-		if (execution != null) {
-			return execution.getState();
-		}
-		else {
-			final IntermediateResult intermediateResult =
-				executionGraph.getAllIntermediateResults().get(intermediateResultId);
-
-			if (intermediateResult != null) {
-				// Try to find the producing execution
-				Execution producerExecution = intermediateResult
-					.getPartitionById(resultPartitionId.getPartitionId())
-					.getProducer()
-					.getCurrentExecutionAttempt();
-
-				if (producerExecution.getAttemptId().equals(resultPartitionId.getProducerId())) {
-					return producerExecution.getState();
-				} else {
-					throw new PartitionProducerDisposedException(resultPartitionId);
-				}
-			} else {
-				throw new IllegalArgumentException("Intermediate data set with ID "
-					+ intermediateResultId + " not found.");
-			}
-		}
-	}
-
-	@Override
-	public void scheduleOrUpdateConsumers(final ResultPartitionID partitionID) {
-		mainThreadExecutor.assertRunningInMainThread();
-
-		try {
-			executionGraph.scheduleOrUpdateConsumers(partitionID);
-		} catch (ExecutionGraphException e) {
-			throw new RuntimeException(e);
-		}
-	}
-
-	@Override
-	public ArchivedExecutionGraph requestJob() {
-		mainThreadExecutor.assertRunningInMainThread();
-		return ArchivedExecutionGraph.createFrom(executionGraph);
-	}
-
-	@Override
-	public JobStatus requestJobStatus() {
-		return executionGraph.getState();
-	}
-
-	@Override
-	public JobDetails requestJobDetails() {
-		mainThreadExecutor.assertRunningInMainThread();
-		return WebMonitorUtils.createDetailsForJob(executionGraph);
-	}
-
-	@Override
-	public KvStateLocation requestKvStateLocation(final JobID jobId, final String registrationName) throws UnknownKvStateLocation, FlinkJobNotFoundException {
-		mainThreadExecutor.assertRunningInMainThread();
-
-		// sanity check for the correct JobID
-		if (jobGraph.getJobID().equals(jobId)) {
-			if (log.isDebugEnabled()) {
-				log.debug("Lookup key-value state for job {} with registration " +
-					"name {}.", jobGraph.getJobID(), registrationName);
-			}
-
-			final KvStateLocationRegistry registry = executionGraph.getKvStateLocationRegistry();
-			final KvStateLocation location = registry.getKvStateLocation(registrationName);
-			if (location != null) {
-				return location;
-			} else {
-				throw new UnknownKvStateLocation(registrationName);
-			}
-		} else {
-			if (log.isDebugEnabled()) {
-				log.debug("Request of key-value state location for unknown job {} received.", jobId);
-			}
-			throw new FlinkJobNotFoundException(jobId);
-		}
-	}
-
-	@Override
-	public void notifyKvStateRegistered(final JobID jobId, final JobVertexID jobVertexId, final KeyGroupRange keyGroupRange, final String registrationName, final KvStateID kvStateId, final InetSocketAddress kvStateServerAddress) throws FlinkJobNotFoundException {
-		mainThreadExecutor.assertRunningInMainThread();
-
-		if (jobGraph.getJobID().equals(jobId)) {
-			if (log.isDebugEnabled()) {
-				log.debug("Key value state registered for job {} under name {}.",
-					jobGraph.getJobID(), registrationName);
-			}
-
-			try {
-				executionGraph.getKvStateLocationRegistry().notifyKvStateRegistered(
-					jobVertexId, keyGroupRange, registrationName, kvStateId, kvStateServerAddress);
-			} catch (Exception e) {
-				throw new RuntimeException(e);
-			}
-		} else {
-			throw new FlinkJobNotFoundException(jobId);
-		}
-	}
-
-	@Override
-	public void notifyKvStateUnregistered(final JobID jobId, final JobVertexID jobVertexId, final KeyGroupRange keyGroupRange, final String registrationName) throws FlinkJobNotFoundException {
-		mainThreadExecutor.assertRunningInMainThread();
-
-		if (jobGraph.getJobID().equals(jobId)) {
-			if (log.isDebugEnabled()) {
-				log.debug("Key value state unregistered for job {} under name {}.",
-					jobGraph.getJobID(), registrationName);
-			}
-
-			try {
-				executionGraph.getKvStateLocationRegistry().notifyKvStateUnregistered(
-					jobVertexId, keyGroupRange, registrationName);
-			} catch (Exception e) {
-				throw new RuntimeException(e);
-			}
-		} else {
-			throw new FlinkJobNotFoundException(jobId);
-		}
-	}
-
-	@Override
-	public void updateAccumulators(final AccumulatorSnapshot accumulatorSnapshot) {
-		mainThreadExecutor.assertRunningInMainThread();
-
-		executionGraph.updateAccumulators(accumulatorSnapshot);
-	}
-
-	@Override
-	public Optional<OperatorBackPressureStats> requestOperatorBackPressureStats(final JobVertexID jobVertexId) throws FlinkException {
-		final ExecutionJobVertex jobVertex = executionGraph.getJobVertex(jobVertexId);
-		if (jobVertex == null) {
-			throw new FlinkException("JobVertexID not found " +
-				jobVertexId);
-		}
-
-		return backPressureStatsTracker.getOperatorBackPressureStats(jobVertex);
-	}
-
-	@Override
-	public CompletableFuture<String> triggerSavepoint(final String targetDirectory, final boolean cancelJob) {
-		mainThreadExecutor.assertRunningInMainThread();
-
-		final CheckpointCoordinator checkpointCoordinator = executionGraph.getCheckpointCoordinator();
-		if (checkpointCoordinator == null) {
-			throw new IllegalStateException(
-				String.format("Job %s is not a streaming job.", jobGraph.getJobID()));
-		} else if (targetDirectory == null && !checkpointCoordinator.getCheckpointStorage().hasDefaultSavepointLocation()) {
-			log.info("Trying to cancel job {} with savepoint, but no savepoint directory configured.", jobGraph.getJobID());
-
-			throw new IllegalStateException(
-				"No savepoint directory configured. You can either specify a directory " +
-					"while cancelling via -s :targetDirectory or configure a cluster-wide " +
-					"default via key '" + CheckpointingOptions.SAVEPOINT_DIRECTORY.key() + "'.");
-		}
-
-		if (cancelJob) {
-			checkpointCoordinator.stopCheckpointScheduler();
-		}
-
-		return checkpointCoordinator
-			.triggerSavepoint(System.currentTimeMillis(), targetDirectory)
-			.thenApply(CompletedCheckpoint::getExternalPointer)
-			.handleAsync((path, throwable) -> {
-				if (throwable != null) {
-					if (cancelJob) {
-						startCheckpointScheduler(checkpointCoordinator);
-					}
-					throw new CompletionException(throwable);
-				} else if (cancelJob) {
-					log.info("Savepoint stored in {}. Now cancelling {}.", path, jobGraph.getJobID());
-					cancel();
-				}
-				return path;
-			}, mainThreadExecutor);
-	}
-
-	private void startCheckpointScheduler(final CheckpointCoordinator checkpointCoordinator) {
-		mainThreadExecutor.assertRunningInMainThread();
-
-		if (checkpointCoordinator.isPeriodicCheckpointingConfigured()) {
-			try {
-				checkpointCoordinator.startCheckpointScheduler();
-			} catch (IllegalStateException ignored) {
-				// Concurrent shut down of the coordinator
-			}
-		}
-	}
-
-	@Override
-	public void acknowledgeCheckpoint(final JobID jobID, final ExecutionAttemptID executionAttemptID, final long checkpointId, final CheckpointMetrics checkpointMetrics, final TaskStateSnapshot checkpointState) {
-		mainThreadExecutor.assertRunningInMainThread();
-
-		final CheckpointCoordinator checkpointCoordinator = executionGraph.getCheckpointCoordinator();
-		final AcknowledgeCheckpoint ackMessage = new AcknowledgeCheckpoint(
-			jobID,
-			executionAttemptID,
-			checkpointId,
-			checkpointMetrics,
-			checkpointState);
-
-		final String taskManagerLocationInfo = retrieveTaskManagerLocation(executionAttemptID);
-
-		if (checkpointCoordinator != null) {
-			ioExecutor.execute(() -> {
-				try {
-					checkpointCoordinator.receiveAcknowledgeMessage(ackMessage, taskManagerLocationInfo);
-				} catch (Throwable t) {
-					log.warn("Error while processing checkpoint acknowledgement message", t);
-				}
-			});
-		} else {
-			String errorMessage = "Received AcknowledgeCheckpoint message for job {} with no CheckpointCoordinator";
-			if (executionGraph.getState() == JobStatus.RUNNING) {
-				log.error(errorMessage, jobGraph.getJobID());
-			} else {
-				log.debug(errorMessage, jobGraph.getJobID());
-			}
-		}
-	}
-
-	@Override
-	public void declineCheckpoint(final DeclineCheckpoint decline) {
-		mainThreadExecutor.assertRunningInMainThread();
-
-		final CheckpointCoordinator checkpointCoordinator = executionGraph.getCheckpointCoordinator();
-		final String taskManagerLocationInfo = retrieveTaskManagerLocation(decline.getTaskExecutionId());
-
-		if (checkpointCoordinator != null) {
-			ioExecutor.execute(() -> {
-				try {
-					checkpointCoordinator.receiveDeclineMessage(decline, taskManagerLocationInfo);
-				} catch (Exception e) {
-					log.error("Error in CheckpointCoordinator while processing {}", decline, e);
-				}
-			});
-		} else {
-			String errorMessage = "Received DeclineCheckpoint message for job {} with no CheckpointCoordinator";
-			if (executionGraph.getState() == JobStatus.RUNNING) {
-				log.error(errorMessage, jobGraph.getJobID());
-			} else {
-				log.debug(errorMessage, jobGraph.getJobID());
-			}
-		}
-	}
-
-	@Override
-	public CompletableFuture<String> stopWithSavepoint(final String targetDirectory, final boolean advanceToEndOfEventTime) {
-		mainThreadExecutor.assertRunningInMainThread();
-
-		final CheckpointCoordinator checkpointCoordinator = executionGraph.getCheckpointCoordinator();
-
-		if (checkpointCoordinator == null) {
-			return FutureUtils.completedExceptionally(new IllegalStateException(
-				String.format("Job %s is not a streaming job.", jobGraph.getJobID())));
-		}
-
-		if (targetDirectory == null && !checkpointCoordinator.getCheckpointStorage().hasDefaultSavepointLocation()) {
-			log.info("Trying to cancel job {} with savepoint, but no savepoint directory configured.", jobGraph.getJobID());
-
-			return FutureUtils.completedExceptionally(new IllegalStateException(
-				"No savepoint directory configured. You can either specify a directory " +
-					"while cancelling via -s :targetDirectory or configure a cluster-wide " +
-					"default via key '" + CheckpointingOptions.SAVEPOINT_DIRECTORY.key() + "'."));
-		}
-
-		// we stop the checkpoint coordinator so that we are guaranteed
-		// to have only the data of the synchronous savepoint committed.
-		// in case of failure, and if the job restarts, the coordinator
-		// will be restarted by the CheckpointCoordinatorDeActivator.
-		checkpointCoordinator.stopCheckpointScheduler();
-
-		final long now = System.currentTimeMillis();
-		final CompletableFuture<String> savepointFuture = checkpointCoordinator
-				.triggerSynchronousSavepoint(now, advanceToEndOfEventTime, targetDirectory)
-				.thenApply(CompletedCheckpoint::getExternalPointer);
-
-		final CompletableFuture<JobStatus> terminationFuture = executionGraph
-			.getTerminationFuture()
-			.handle((jobstatus, throwable) -> {
-
-				if (throwable != null) {
-					log.info("Failed during stopping job {} with a savepoint. Reason: {}", jobGraph.getJobID(), throwable.getMessage());
-					throw new CompletionException(throwable);
-				} else if (jobstatus != JobStatus.FINISHED) {
-					log.info("Failed during stopping job {} with a savepoint. Reason: Reached state {} instead of FINISHED.", jobGraph.getJobID(), jobstatus);
-					throw new CompletionException(new FlinkException("Reached state " + jobstatus + " instead of FINISHED."));
-				}
-				return jobstatus;
-			});
-
-		return savepointFuture.thenCompose((path) ->
-			terminationFuture.thenApply((jobStatus -> path)));
-	}
-
-	private String retrieveTaskManagerLocation(ExecutionAttemptID executionAttemptID) {
-		final Optional<Execution> currentExecution = Optional.ofNullable(executionGraph.getRegisteredExecutions().get(executionAttemptID));
-
-		return currentExecution
-			.map(Execution::getAssignedResourceLocation)
-			.map(TaskManagerLocation::toString)
-			.orElse("Unknown location");
-	}
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/LegacyScheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
similarity index 93%
copy from flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/LegacyScheduler.java
copy to flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
index e33b6b6..6c2ed51 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/LegacyScheduler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
@@ -92,11 +92,9 @@ import java.util.concurrent.ScheduledExecutorService;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
- * A scheduler that delegates to the scheduling logic in the {@link ExecutionGraph}.
- *
- * @see ExecutionGraph#scheduleForExecution()
+ * Base class which can be used to implement {@link SchedulerNG}.
  */
-public class LegacyScheduler implements SchedulerNG {
+public abstract class SchedulerBase implements SchedulerNG {
 
 	private final Logger log;
 
@@ -130,23 +128,23 @@ public class LegacyScheduler implements SchedulerNG {
 		"LegacyScheduler is not initialized with proper main thread executor. " +
 			"Call to LegacyScheduler.setMainThreadExecutor(...) required.");
 
-	public LegacyScheduler(
-			final Logger log,
-			final JobGraph jobGraph,
-			final BackPressureStatsTracker backPressureStatsTracker,
-			final Executor ioExecutor,
-			final Configuration jobMasterConfiguration,
-			final SlotProvider slotProvider,
-			final ScheduledExecutorService futureExecutor,
-			final ClassLoader userCodeLoader,
-			final CheckpointRecoveryFactory checkpointRecoveryFactory,
-			final Time rpcTimeout,
-			final RestartStrategyFactory restartStrategyFactory,
-			final BlobWriter blobWriter,
-			final JobManagerJobMetricGroup jobManagerJobMetricGroup,
-			final Time slotRequestTimeout,
-			final ShuffleMaster<?> shuffleMaster,
-			final PartitionTracker partitionTracker) throws Exception {
+	public SchedulerBase(
+		final Logger log,
+		final JobGraph jobGraph,
+		final BackPressureStatsTracker backPressureStatsTracker,
+		final Executor ioExecutor,
+		final Configuration jobMasterConfiguration,
+		final SlotProvider slotProvider,
+		final ScheduledExecutorService futureExecutor,
+		final ClassLoader userCodeLoader,
+		final CheckpointRecoveryFactory checkpointRecoveryFactory,
+		final Time rpcTimeout,
+		final RestartStrategyFactory restartStrategyFactory,
+		final BlobWriter blobWriter,
+		final JobManagerJobMetricGroup jobManagerJobMetricGroup,
+		final Time slotRequestTimeout,
+		final ShuffleMaster<?> shuffleMaster,
+		final PartitionTracker partitionTracker) throws Exception {
 
 		this.log = checkNotNull(log);
 		this.jobGraph = checkNotNull(jobGraph);
@@ -177,9 +175,9 @@ public class LegacyScheduler implements SchedulerNG {
 	}
 
 	private ExecutionGraph createAndRestoreExecutionGraph(
-			JobManagerJobMetricGroup currentJobManagerJobMetricGroup,
-			ShuffleMaster<?> shuffleMaster,
-			PartitionTracker partitionTracker) throws Exception {
+		JobManagerJobMetricGroup currentJobManagerJobMetricGroup,
+		ShuffleMaster<?> shuffleMaster,
+		PartitionTracker partitionTracker) throws Exception {
 
 		ExecutionGraph newExecutionGraph = createExecutionGraph(currentJobManagerJobMetricGroup, shuffleMaster, partitionTracker);
 
@@ -201,9 +199,9 @@ public class LegacyScheduler implements SchedulerNG {
 	}
 
 	private ExecutionGraph createExecutionGraph(
-			JobManagerJobMetricGroup currentJobManagerJobMetricGroup,
-			ShuffleMaster<?> shuffleMaster,
-			final PartitionTracker partitionTracker) throws JobExecutionException, JobException {
+		JobManagerJobMetricGroup currentJobManagerJobMetricGroup,
+		ShuffleMaster<?> shuffleMaster,
+		final PartitionTracker partitionTracker) throws JobExecutionException, JobException {
 		return ExecutionGraphBuilder.buildGraph(
 			null,
 			jobGraph,
@@ -332,8 +330,8 @@ public class LegacyScheduler implements SchedulerNG {
 
 	@Override
 	public ExecutionState requestPartitionState(
-			final IntermediateDataSetID intermediateResultId,
-			final ResultPartitionID resultPartitionId) throws PartitionProducerDisposedException {
+		final IntermediateDataSetID intermediateResultId,
+		final ResultPartitionID resultPartitionId) throws PartitionProducerDisposedException {
 
 		mainThreadExecutor.assertRunningInMainThread();
 
@@ -613,8 +611,8 @@ public class LegacyScheduler implements SchedulerNG {
 
 		final long now = System.currentTimeMillis();
 		final CompletableFuture<String> savepointFuture = checkpointCoordinator
-				.triggerSynchronousSavepoint(now, advanceToEndOfEventTime, targetDirectory)
-				.thenApply(CompletedCheckpoint::getExternalPointer);
+			.triggerSynchronousSavepoint(now, advanceToEndOfEventTime, targetDirectory)
+			.thenApply(CompletedCheckpoint::getExternalPointer);
 
 		final CompletableFuture<JobStatus> terminationFuture = executionGraph
 			.getTerminationFuture()
@@ -642,4 +640,5 @@ public class LegacyScheduler implements SchedulerNG {
 			.map(TaskManagerLocation::toString)
 			.orElse("Unknown location");
 	}
+
 }