You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ga...@apache.org on 2019/09/27 12:39:32 UTC
[flink] 05/06: [FLINK-12433][runtime] Introduce SchedulerBase
This is an automated email from the ASF dual-hosted git repository.
gary pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit e865c260da57ed67b111744d427825002e1a47e2
Author: Gary Yao <ga...@apache.org>
AuthorDate: Thu Sep 5 09:34:50 2019 +0200
[FLINK-12433][runtime] Introduce SchedulerBase
Introduce common super class SchedulerBase for DefaultScheduler and
LegacyScheduler, which contains code that is shared between the scheduler
implementations. Previously, DefaultScheduler inherited from LegacyScheduler to
avoid re-implementing features such as queryable state, taking savepoints, etc.
---
.../flink/runtime/scheduler/DefaultScheduler.java | 2 +-
.../flink/runtime/scheduler/LegacyScheduler.java | 524 +--------------------
.../{LegacyScheduler.java => SchedulerBase.java} | 61 ++-
3 files changed, 39 insertions(+), 548 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java
index 05429ac..759a56a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java
@@ -39,7 +39,7 @@ import java.util.concurrent.ScheduledExecutorService;
/**
* Stub implementation of the future default scheduler.
*/
-public class DefaultScheduler extends LegacyScheduler {
+public class DefaultScheduler extends SchedulerBase {
public DefaultScheduler(
final Logger log,
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/LegacyScheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/LegacyScheduler.java
index e33b6b6..bc6ed69 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/LegacyScheduler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/LegacyScheduler.java
@@ -96,39 +96,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
*
* @see ExecutionGraph#scheduleForExecution()
*/
-public class LegacyScheduler implements SchedulerNG {
-
- private final Logger log;
-
- private final JobGraph jobGraph;
-
- private final ExecutionGraph executionGraph;
-
- private final BackPressureStatsTracker backPressureStatsTracker;
-
- private final Executor ioExecutor;
-
- private final Configuration jobMasterConfiguration;
-
- private final SlotProvider slotProvider;
-
- private final ScheduledExecutorService futureExecutor;
-
- private final ClassLoader userCodeLoader;
-
- private final CheckpointRecoveryFactory checkpointRecoveryFactory;
-
- private final Time rpcTimeout;
-
- private final RestartStrategy restartStrategy;
-
- private final BlobWriter blobWriter;
-
- private final Time slotRequestTimeout;
-
- private ComponentMainThreadExecutor mainThreadExecutor = new ComponentMainThreadExecutor.DummyComponentMainThreadExecutor(
- "LegacyScheduler is not initialized with proper main thread executor. " +
- "Call to LegacyScheduler.setMainThreadExecutor(...) required.");
+public class LegacyScheduler extends SchedulerBase {
public LegacyScheduler(
final Logger log,
@@ -148,498 +116,22 @@ public class LegacyScheduler implements SchedulerNG {
final ShuffleMaster<?> shuffleMaster,
final PartitionTracker partitionTracker) throws Exception {
- this.log = checkNotNull(log);
- this.jobGraph = checkNotNull(jobGraph);
- this.backPressureStatsTracker = checkNotNull(backPressureStatsTracker);
- this.ioExecutor = checkNotNull(ioExecutor);
- this.jobMasterConfiguration = checkNotNull(jobMasterConfiguration);
- this.slotProvider = checkNotNull(slotProvider);
- this.futureExecutor = checkNotNull(futureExecutor);
- this.userCodeLoader = checkNotNull(userCodeLoader);
- this.checkpointRecoveryFactory = checkNotNull(checkpointRecoveryFactory);
- this.rpcTimeout = checkNotNull(rpcTimeout);
-
- final RestartStrategies.RestartStrategyConfiguration restartStrategyConfiguration =
- jobGraph.getSerializedExecutionConfig()
- .deserializeValue(userCodeLoader)
- .getRestartStrategy();
-
- this.restartStrategy = RestartStrategyResolving.resolve(restartStrategyConfiguration,
- restartStrategyFactory,
- jobGraph.isCheckpointingEnabled());
-
- log.info("Using restart strategy {} for {} ({}).", this.restartStrategy, jobGraph.getName(), jobGraph.getJobID());
-
- this.blobWriter = checkNotNull(blobWriter);
- this.slotRequestTimeout = checkNotNull(slotRequestTimeout);
-
- this.executionGraph = createAndRestoreExecutionGraph(jobManagerJobMetricGroup, checkNotNull(shuffleMaster), checkNotNull(partitionTracker));
- }
-
- private ExecutionGraph createAndRestoreExecutionGraph(
- JobManagerJobMetricGroup currentJobManagerJobMetricGroup,
- ShuffleMaster<?> shuffleMaster,
- PartitionTracker partitionTracker) throws Exception {
-
- ExecutionGraph newExecutionGraph = createExecutionGraph(currentJobManagerJobMetricGroup, shuffleMaster, partitionTracker);
-
- final CheckpointCoordinator checkpointCoordinator = newExecutionGraph.getCheckpointCoordinator();
-
- if (checkpointCoordinator != null) {
- // check whether we find a valid checkpoint
- if (!checkpointCoordinator.restoreLatestCheckpointedState(
- newExecutionGraph.getAllVertices(),
- false,
- false)) {
-
- // check whether we can restore from a savepoint
- tryRestoreExecutionGraphFromSavepoint(newExecutionGraph, jobGraph.getSavepointRestoreSettings());
- }
- }
-
- return newExecutionGraph;
- }
-
- private ExecutionGraph createExecutionGraph(
- JobManagerJobMetricGroup currentJobManagerJobMetricGroup,
- ShuffleMaster<?> shuffleMaster,
- final PartitionTracker partitionTracker) throws JobExecutionException, JobException {
- return ExecutionGraphBuilder.buildGraph(
- null,
+ super(
+ log,
jobGraph,
- jobMasterConfiguration,
- futureExecutor,
+ backPressureStatsTracker,
ioExecutor,
+ jobMasterConfiguration,
slotProvider,
+ futureExecutor,
userCodeLoader,
checkpointRecoveryFactory,
rpcTimeout,
- restartStrategy,
- currentJobManagerJobMetricGroup,
+ restartStrategyFactory,
blobWriter,
+ jobManagerJobMetricGroup,
slotRequestTimeout,
- log,
shuffleMaster,
partitionTracker);
}
-
- /**
- * Tries to restore the given {@link ExecutionGraph} from the provided {@link SavepointRestoreSettings}.
- *
- * @param executionGraphToRestore {@link ExecutionGraph} which is supposed to be restored
- * @param savepointRestoreSettings {@link SavepointRestoreSettings} containing information about the savepoint to restore from
- * @throws Exception if the {@link ExecutionGraph} could not be restored
- */
- private void tryRestoreExecutionGraphFromSavepoint(ExecutionGraph executionGraphToRestore, SavepointRestoreSettings savepointRestoreSettings) throws Exception {
- if (savepointRestoreSettings.restoreSavepoint()) {
- final CheckpointCoordinator checkpointCoordinator = executionGraphToRestore.getCheckpointCoordinator();
- if (checkpointCoordinator != null) {
- checkpointCoordinator.restoreSavepoint(
- savepointRestoreSettings.getRestorePath(),
- savepointRestoreSettings.allowNonRestoredState(),
- executionGraphToRestore.getAllVertices(),
- userCodeLoader);
- }
- }
- }
-
- @Override
- public void setMainThreadExecutor(final ComponentMainThreadExecutor mainThreadExecutor) {
- this.mainThreadExecutor = checkNotNull(mainThreadExecutor);
- executionGraph.start(mainThreadExecutor);
- }
-
- @Override
- public void registerJobStatusListener(final JobStatusListener jobStatusListener) {
- executionGraph.registerJobStatusListener(jobStatusListener);
- }
-
- @Override
- public void startScheduling() {
- mainThreadExecutor.assertRunningInMainThread();
-
- try {
- executionGraph.scheduleForExecution();
- }
- catch (Throwable t) {
- executionGraph.failGlobal(t);
- }
- }
-
- @Override
- public void suspend(Throwable cause) {
- mainThreadExecutor.assertRunningInMainThread();
- executionGraph.suspend(cause);
- }
-
- @Override
- public void cancel() {
- mainThreadExecutor.assertRunningInMainThread();
- executionGraph.cancel();
- }
-
- @Override
- public CompletableFuture<Void> getTerminationFuture() {
- return executionGraph.getTerminationFuture().thenApply(FunctionUtils.nullFn());
- }
-
- @Override
- public boolean updateTaskExecutionState(final TaskExecutionState taskExecutionState) {
- mainThreadExecutor.assertRunningInMainThread();
- return executionGraph.updateState(taskExecutionState);
- }
-
- @Override
- public SerializedInputSplit requestNextInputSplit(JobVertexID vertexID, ExecutionAttemptID executionAttempt) throws IOException {
- mainThreadExecutor.assertRunningInMainThread();
-
- final Execution execution = executionGraph.getRegisteredExecutions().get(executionAttempt);
- if (execution == null) {
- // can happen when JobManager had already unregistered this execution upon on task failure,
- // but TaskManager get some delay to aware of that situation
- if (log.isDebugEnabled()) {
- log.debug("Can not find Execution for attempt {}.", executionAttempt);
- }
- // but we should TaskManager be aware of this
- throw new IllegalArgumentException("Can not find Execution for attempt " + executionAttempt);
- }
-
- final ExecutionJobVertex vertex = executionGraph.getJobVertex(vertexID);
- if (vertex == null) {
- throw new IllegalArgumentException("Cannot find execution vertex for vertex ID " + vertexID);
- }
-
- if (vertex.getSplitAssigner() == null) {
- throw new IllegalStateException("No InputSplitAssigner for vertex ID " + vertexID);
- }
-
- final InputSplit nextInputSplit = execution.getNextInputSplit();
-
- if (log.isDebugEnabled()) {
- log.debug("Send next input split {}.", nextInputSplit);
- }
-
- try {
- final byte[] serializedInputSplit = InstantiationUtil.serializeObject(nextInputSplit);
- return new SerializedInputSplit(serializedInputSplit);
- } catch (Exception ex) {
- IOException reason = new IOException("Could not serialize the next input split of class " +
- nextInputSplit.getClass() + ".", ex);
- vertex.fail(reason);
- throw reason;
- }
- }
-
- @Override
- public ExecutionState requestPartitionState(
- final IntermediateDataSetID intermediateResultId,
- final ResultPartitionID resultPartitionId) throws PartitionProducerDisposedException {
-
- mainThreadExecutor.assertRunningInMainThread();
-
- final Execution execution = executionGraph.getRegisteredExecutions().get(resultPartitionId.getProducerId());
- if (execution != null) {
- return execution.getState();
- }
- else {
- final IntermediateResult intermediateResult =
- executionGraph.getAllIntermediateResults().get(intermediateResultId);
-
- if (intermediateResult != null) {
- // Try to find the producing execution
- Execution producerExecution = intermediateResult
- .getPartitionById(resultPartitionId.getPartitionId())
- .getProducer()
- .getCurrentExecutionAttempt();
-
- if (producerExecution.getAttemptId().equals(resultPartitionId.getProducerId())) {
- return producerExecution.getState();
- } else {
- throw new PartitionProducerDisposedException(resultPartitionId);
- }
- } else {
- throw new IllegalArgumentException("Intermediate data set with ID "
- + intermediateResultId + " not found.");
- }
- }
- }
-
- @Override
- public void scheduleOrUpdateConsumers(final ResultPartitionID partitionID) {
- mainThreadExecutor.assertRunningInMainThread();
-
- try {
- executionGraph.scheduleOrUpdateConsumers(partitionID);
- } catch (ExecutionGraphException e) {
- throw new RuntimeException(e);
- }
- }
-
- @Override
- public ArchivedExecutionGraph requestJob() {
- mainThreadExecutor.assertRunningInMainThread();
- return ArchivedExecutionGraph.createFrom(executionGraph);
- }
-
- @Override
- public JobStatus requestJobStatus() {
- return executionGraph.getState();
- }
-
- @Override
- public JobDetails requestJobDetails() {
- mainThreadExecutor.assertRunningInMainThread();
- return WebMonitorUtils.createDetailsForJob(executionGraph);
- }
-
- @Override
- public KvStateLocation requestKvStateLocation(final JobID jobId, final String registrationName) throws UnknownKvStateLocation, FlinkJobNotFoundException {
- mainThreadExecutor.assertRunningInMainThread();
-
- // sanity check for the correct JobID
- if (jobGraph.getJobID().equals(jobId)) {
- if (log.isDebugEnabled()) {
- log.debug("Lookup key-value state for job {} with registration " +
- "name {}.", jobGraph.getJobID(), registrationName);
- }
-
- final KvStateLocationRegistry registry = executionGraph.getKvStateLocationRegistry();
- final KvStateLocation location = registry.getKvStateLocation(registrationName);
- if (location != null) {
- return location;
- } else {
- throw new UnknownKvStateLocation(registrationName);
- }
- } else {
- if (log.isDebugEnabled()) {
- log.debug("Request of key-value state location for unknown job {} received.", jobId);
- }
- throw new FlinkJobNotFoundException(jobId);
- }
- }
-
- @Override
- public void notifyKvStateRegistered(final JobID jobId, final JobVertexID jobVertexId, final KeyGroupRange keyGroupRange, final String registrationName, final KvStateID kvStateId, final InetSocketAddress kvStateServerAddress) throws FlinkJobNotFoundException {
- mainThreadExecutor.assertRunningInMainThread();
-
- if (jobGraph.getJobID().equals(jobId)) {
- if (log.isDebugEnabled()) {
- log.debug("Key value state registered for job {} under name {}.",
- jobGraph.getJobID(), registrationName);
- }
-
- try {
- executionGraph.getKvStateLocationRegistry().notifyKvStateRegistered(
- jobVertexId, keyGroupRange, registrationName, kvStateId, kvStateServerAddress);
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- } else {
- throw new FlinkJobNotFoundException(jobId);
- }
- }
-
- @Override
- public void notifyKvStateUnregistered(final JobID jobId, final JobVertexID jobVertexId, final KeyGroupRange keyGroupRange, final String registrationName) throws FlinkJobNotFoundException {
- mainThreadExecutor.assertRunningInMainThread();
-
- if (jobGraph.getJobID().equals(jobId)) {
- if (log.isDebugEnabled()) {
- log.debug("Key value state unregistered for job {} under name {}.",
- jobGraph.getJobID(), registrationName);
- }
-
- try {
- executionGraph.getKvStateLocationRegistry().notifyKvStateUnregistered(
- jobVertexId, keyGroupRange, registrationName);
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- } else {
- throw new FlinkJobNotFoundException(jobId);
- }
- }
-
- @Override
- public void updateAccumulators(final AccumulatorSnapshot accumulatorSnapshot) {
- mainThreadExecutor.assertRunningInMainThread();
-
- executionGraph.updateAccumulators(accumulatorSnapshot);
- }
-
- @Override
- public Optional<OperatorBackPressureStats> requestOperatorBackPressureStats(final JobVertexID jobVertexId) throws FlinkException {
- final ExecutionJobVertex jobVertex = executionGraph.getJobVertex(jobVertexId);
- if (jobVertex == null) {
- throw new FlinkException("JobVertexID not found " +
- jobVertexId);
- }
-
- return backPressureStatsTracker.getOperatorBackPressureStats(jobVertex);
- }
-
- @Override
- public CompletableFuture<String> triggerSavepoint(final String targetDirectory, final boolean cancelJob) {
- mainThreadExecutor.assertRunningInMainThread();
-
- final CheckpointCoordinator checkpointCoordinator = executionGraph.getCheckpointCoordinator();
- if (checkpointCoordinator == null) {
- throw new IllegalStateException(
- String.format("Job %s is not a streaming job.", jobGraph.getJobID()));
- } else if (targetDirectory == null && !checkpointCoordinator.getCheckpointStorage().hasDefaultSavepointLocation()) {
- log.info("Trying to cancel job {} with savepoint, but no savepoint directory configured.", jobGraph.getJobID());
-
- throw new IllegalStateException(
- "No savepoint directory configured. You can either specify a directory " +
- "while cancelling via -s :targetDirectory or configure a cluster-wide " +
- "default via key '" + CheckpointingOptions.SAVEPOINT_DIRECTORY.key() + "'.");
- }
-
- if (cancelJob) {
- checkpointCoordinator.stopCheckpointScheduler();
- }
-
- return checkpointCoordinator
- .triggerSavepoint(System.currentTimeMillis(), targetDirectory)
- .thenApply(CompletedCheckpoint::getExternalPointer)
- .handleAsync((path, throwable) -> {
- if (throwable != null) {
- if (cancelJob) {
- startCheckpointScheduler(checkpointCoordinator);
- }
- throw new CompletionException(throwable);
- } else if (cancelJob) {
- log.info("Savepoint stored in {}. Now cancelling {}.", path, jobGraph.getJobID());
- cancel();
- }
- return path;
- }, mainThreadExecutor);
- }
-
- private void startCheckpointScheduler(final CheckpointCoordinator checkpointCoordinator) {
- mainThreadExecutor.assertRunningInMainThread();
-
- if (checkpointCoordinator.isPeriodicCheckpointingConfigured()) {
- try {
- checkpointCoordinator.startCheckpointScheduler();
- } catch (IllegalStateException ignored) {
- // Concurrent shut down of the coordinator
- }
- }
- }
-
- @Override
- public void acknowledgeCheckpoint(final JobID jobID, final ExecutionAttemptID executionAttemptID, final long checkpointId, final CheckpointMetrics checkpointMetrics, final TaskStateSnapshot checkpointState) {
- mainThreadExecutor.assertRunningInMainThread();
-
- final CheckpointCoordinator checkpointCoordinator = executionGraph.getCheckpointCoordinator();
- final AcknowledgeCheckpoint ackMessage = new AcknowledgeCheckpoint(
- jobID,
- executionAttemptID,
- checkpointId,
- checkpointMetrics,
- checkpointState);
-
- final String taskManagerLocationInfo = retrieveTaskManagerLocation(executionAttemptID);
-
- if (checkpointCoordinator != null) {
- ioExecutor.execute(() -> {
- try {
- checkpointCoordinator.receiveAcknowledgeMessage(ackMessage, taskManagerLocationInfo);
- } catch (Throwable t) {
- log.warn("Error while processing checkpoint acknowledgement message", t);
- }
- });
- } else {
- String errorMessage = "Received AcknowledgeCheckpoint message for job {} with no CheckpointCoordinator";
- if (executionGraph.getState() == JobStatus.RUNNING) {
- log.error(errorMessage, jobGraph.getJobID());
- } else {
- log.debug(errorMessage, jobGraph.getJobID());
- }
- }
- }
-
- @Override
- public void declineCheckpoint(final DeclineCheckpoint decline) {
- mainThreadExecutor.assertRunningInMainThread();
-
- final CheckpointCoordinator checkpointCoordinator = executionGraph.getCheckpointCoordinator();
- final String taskManagerLocationInfo = retrieveTaskManagerLocation(decline.getTaskExecutionId());
-
- if (checkpointCoordinator != null) {
- ioExecutor.execute(() -> {
- try {
- checkpointCoordinator.receiveDeclineMessage(decline, taskManagerLocationInfo);
- } catch (Exception e) {
- log.error("Error in CheckpointCoordinator while processing {}", decline, e);
- }
- });
- } else {
- String errorMessage = "Received DeclineCheckpoint message for job {} with no CheckpointCoordinator";
- if (executionGraph.getState() == JobStatus.RUNNING) {
- log.error(errorMessage, jobGraph.getJobID());
- } else {
- log.debug(errorMessage, jobGraph.getJobID());
- }
- }
- }
-
- @Override
- public CompletableFuture<String> stopWithSavepoint(final String targetDirectory, final boolean advanceToEndOfEventTime) {
- mainThreadExecutor.assertRunningInMainThread();
-
- final CheckpointCoordinator checkpointCoordinator = executionGraph.getCheckpointCoordinator();
-
- if (checkpointCoordinator == null) {
- return FutureUtils.completedExceptionally(new IllegalStateException(
- String.format("Job %s is not a streaming job.", jobGraph.getJobID())));
- }
-
- if (targetDirectory == null && !checkpointCoordinator.getCheckpointStorage().hasDefaultSavepointLocation()) {
- log.info("Trying to cancel job {} with savepoint, but no savepoint directory configured.", jobGraph.getJobID());
-
- return FutureUtils.completedExceptionally(new IllegalStateException(
- "No savepoint directory configured. You can either specify a directory " +
- "while cancelling via -s :targetDirectory or configure a cluster-wide " +
- "default via key '" + CheckpointingOptions.SAVEPOINT_DIRECTORY.key() + "'."));
- }
-
- // we stop the checkpoint coordinator so that we are guaranteed
- // to have only the data of the synchronous savepoint committed.
- // in case of failure, and if the job restarts, the coordinator
- // will be restarted by the CheckpointCoordinatorDeActivator.
- checkpointCoordinator.stopCheckpointScheduler();
-
- final long now = System.currentTimeMillis();
- final CompletableFuture<String> savepointFuture = checkpointCoordinator
- .triggerSynchronousSavepoint(now, advanceToEndOfEventTime, targetDirectory)
- .thenApply(CompletedCheckpoint::getExternalPointer);
-
- final CompletableFuture<JobStatus> terminationFuture = executionGraph
- .getTerminationFuture()
- .handle((jobstatus, throwable) -> {
-
- if (throwable != null) {
- log.info("Failed during stopping job {} with a savepoint. Reason: {}", jobGraph.getJobID(), throwable.getMessage());
- throw new CompletionException(throwable);
- } else if (jobstatus != JobStatus.FINISHED) {
- log.info("Failed during stopping job {} with a savepoint. Reason: Reached state {} instead of FINISHED.", jobGraph.getJobID(), jobstatus);
- throw new CompletionException(new FlinkException("Reached state " + jobstatus + " instead of FINISHED."));
- }
- return jobstatus;
- });
-
- return savepointFuture.thenCompose((path) ->
- terminationFuture.thenApply((jobStatus -> path)));
- }
-
- private String retrieveTaskManagerLocation(ExecutionAttemptID executionAttemptID) {
- final Optional<Execution> currentExecution = Optional.ofNullable(executionGraph.getRegisteredExecutions().get(executionAttemptID));
-
- return currentExecution
- .map(Execution::getAssignedResourceLocation)
- .map(TaskManagerLocation::toString)
- .orElse("Unknown location");
- }
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/LegacyScheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
similarity index 93%
copy from flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/LegacyScheduler.java
copy to flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
index e33b6b6..6c2ed51 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/LegacyScheduler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
@@ -92,11 +92,9 @@ import java.util.concurrent.ScheduledExecutorService;
import static org.apache.flink.util.Preconditions.checkNotNull;
/**
- * A scheduler that delegates to the scheduling logic in the {@link ExecutionGraph}.
- *
- * @see ExecutionGraph#scheduleForExecution()
+ * Base class which can be used to implement {@link SchedulerNG}.
*/
-public class LegacyScheduler implements SchedulerNG {
+public abstract class SchedulerBase implements SchedulerNG {
private final Logger log;
@@ -130,23 +128,23 @@ public class LegacyScheduler implements SchedulerNG {
"LegacyScheduler is not initialized with proper main thread executor. " +
"Call to LegacyScheduler.setMainThreadExecutor(...) required.");
- public LegacyScheduler(
- final Logger log,
- final JobGraph jobGraph,
- final BackPressureStatsTracker backPressureStatsTracker,
- final Executor ioExecutor,
- final Configuration jobMasterConfiguration,
- final SlotProvider slotProvider,
- final ScheduledExecutorService futureExecutor,
- final ClassLoader userCodeLoader,
- final CheckpointRecoveryFactory checkpointRecoveryFactory,
- final Time rpcTimeout,
- final RestartStrategyFactory restartStrategyFactory,
- final BlobWriter blobWriter,
- final JobManagerJobMetricGroup jobManagerJobMetricGroup,
- final Time slotRequestTimeout,
- final ShuffleMaster<?> shuffleMaster,
- final PartitionTracker partitionTracker) throws Exception {
+ public SchedulerBase(
+ final Logger log,
+ final JobGraph jobGraph,
+ final BackPressureStatsTracker backPressureStatsTracker,
+ final Executor ioExecutor,
+ final Configuration jobMasterConfiguration,
+ final SlotProvider slotProvider,
+ final ScheduledExecutorService futureExecutor,
+ final ClassLoader userCodeLoader,
+ final CheckpointRecoveryFactory checkpointRecoveryFactory,
+ final Time rpcTimeout,
+ final RestartStrategyFactory restartStrategyFactory,
+ final BlobWriter blobWriter,
+ final JobManagerJobMetricGroup jobManagerJobMetricGroup,
+ final Time slotRequestTimeout,
+ final ShuffleMaster<?> shuffleMaster,
+ final PartitionTracker partitionTracker) throws Exception {
this.log = checkNotNull(log);
this.jobGraph = checkNotNull(jobGraph);
@@ -177,9 +175,9 @@ public class LegacyScheduler implements SchedulerNG {
}
private ExecutionGraph createAndRestoreExecutionGraph(
- JobManagerJobMetricGroup currentJobManagerJobMetricGroup,
- ShuffleMaster<?> shuffleMaster,
- PartitionTracker partitionTracker) throws Exception {
+ JobManagerJobMetricGroup currentJobManagerJobMetricGroup,
+ ShuffleMaster<?> shuffleMaster,
+ PartitionTracker partitionTracker) throws Exception {
ExecutionGraph newExecutionGraph = createExecutionGraph(currentJobManagerJobMetricGroup, shuffleMaster, partitionTracker);
@@ -201,9 +199,9 @@ public class LegacyScheduler implements SchedulerNG {
}
private ExecutionGraph createExecutionGraph(
- JobManagerJobMetricGroup currentJobManagerJobMetricGroup,
- ShuffleMaster<?> shuffleMaster,
- final PartitionTracker partitionTracker) throws JobExecutionException, JobException {
+ JobManagerJobMetricGroup currentJobManagerJobMetricGroup,
+ ShuffleMaster<?> shuffleMaster,
+ final PartitionTracker partitionTracker) throws JobExecutionException, JobException {
return ExecutionGraphBuilder.buildGraph(
null,
jobGraph,
@@ -332,8 +330,8 @@ public class LegacyScheduler implements SchedulerNG {
@Override
public ExecutionState requestPartitionState(
- final IntermediateDataSetID intermediateResultId,
- final ResultPartitionID resultPartitionId) throws PartitionProducerDisposedException {
+ final IntermediateDataSetID intermediateResultId,
+ final ResultPartitionID resultPartitionId) throws PartitionProducerDisposedException {
mainThreadExecutor.assertRunningInMainThread();
@@ -613,8 +611,8 @@ public class LegacyScheduler implements SchedulerNG {
final long now = System.currentTimeMillis();
final CompletableFuture<String> savepointFuture = checkpointCoordinator
- .triggerSynchronousSavepoint(now, advanceToEndOfEventTime, targetDirectory)
- .thenApply(CompletedCheckpoint::getExternalPointer);
+ .triggerSynchronousSavepoint(now, advanceToEndOfEventTime, targetDirectory)
+ .thenApply(CompletedCheckpoint::getExternalPointer);
final CompletableFuture<JobStatus> terminationFuture = executionGraph
.getTerminationFuture()
@@ -642,4 +640,5 @@ public class LegacyScheduler implements SchedulerNG {
.map(TaskManagerLocation::toString)
.orElse("Unknown location");
}
+
}