You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2020/08/24 10:38:28 UTC

[GitHub] [flink] tillrohrmann commented on a change in pull request #13217: [FLINK-16866] Make job submission non-blocking

tillrohrmann commented on a change in pull request #13217:
URL: https://github.com/apache/flink/pull/13217#discussion_r475437759



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
##########
@@ -802,23 +769,28 @@ private void jobMasterFailed(JobID jobId, Throwable cause) {
 		onFatalError(new FlinkException(String.format("JobMaster for job %s failed.", jobId), cause));
 	}
 
-	private CompletableFuture<JobMasterGateway> getJobMasterGatewayFuture(JobID jobId) {
-		final CompletableFuture<JobManagerRunner> jobManagerRunnerFuture = jobManagerRunnerFutures.get(jobId);
+	/**
+	 * Ensures that the JobMasterGateway is available.
+	 */
+	private CompletableFuture<JobMasterGateway> getJobMasterGateway(JobID jobId) throws
+		FlinkJobNotFoundException,
+		UnavailableDispatcherOperationException {

Review comment:
       If this method wouldn't throw exceptions but only return exceptionally futures, then `performOperationOnJobMasterGateway` could be simplified.

##########
File path: flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java
##########
@@ -111,4 +122,41 @@ public static void executeProgram(
 			Thread.currentThread().setContextClassLoader(contextClassLoader);
 		}
 	}
+
+	/**
+	 * This method blocks until the job status is not INITIALIZING anymore.
+	 * If the job is FAILED, it throws an CompletionException with the failure cause.
+	 * @param jobStatusSupplier supplier returning the job status.
+	 */
+	public static void waitUntilJobInitializationFinished(
+		SupplierWithException<JobStatus, Exception> jobStatusSupplier,
+		SupplierWithException<JobResult, Exception> jobResultSupplier
+		) throws CompletionException {

Review comment:
       Why are we throwing a `CompletionException`?

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherJob.java
##########
@@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.jobmaster.JobManagerRunner;
+import org.apache.flink.runtime.jobmaster.JobMasterGateway;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.messages.webmonitor.JobDetails;
+import org.apache.flink.runtime.rpc.RpcUtils;
+import org.apache.flink.util.AutoCloseableAsync;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.concurrent.GuardedBy;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Abstraction used by the {@link Dispatcher} to manage JobManagers, in
+ * particular during initialization.
+ * While a job is initializing, the JobMasterGateway is not available. A small subset
+ * of the methods of the JobMasterGateway necessary during initialization are provided
+ * by this class (job details, cancel).
+ */
+public final class DispatcherJob implements AutoCloseableAsync {
+
+	private final Logger log = LoggerFactory.getLogger(DispatcherJob.class);
+
+	private final CompletableFuture<JobManagerRunner> jobManagerRunnerFuture;
+	private final CompletableFuture<ArchivedExecutionGraph> jobResultFuture;
+
+	private final long initializationTimestamp;
+	private final JobID jobId;
+	private final String jobName;
+
+	private final Object lock = new Object();
+
+	// internal field to track job status during initialization. Is not updated anymore after
+	// job is initialized, cancelled or failed.
+	@GuardedBy("lock")
+	private JobStatus jobStatus = JobStatus.INITIALIZING;
+
+	private final CompletableFuture<Void> terminationFuture = new CompletableFuture<>();
+
+	private enum SubmissionType {

Review comment:
       Maybe
   
   ```suggestion
   	private enum ExecutionType {
   ```

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
##########
@@ -682,15 +650,14 @@ private void registerJobManagerRunnerTerminationFuture(JobID jobId, CompletableF
 	}
 
 	private CompletableFuture<Void> removeJob(JobID jobId, boolean cleanupHA) {
-		CompletableFuture<JobManagerRunner> jobManagerRunnerFuture = jobManagerRunnerFutures.remove(jobId);
+		DispatcherJob job = runningJobs.remove(jobId);
 
 		final CompletableFuture<Void> jobManagerRunnerTerminationFuture;

Review comment:
       Please update field names to reflect new abstraction. I think it should be the `jobTerminationFuture` or `dispatcherJobTerminationFuture` now.

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherJob.java
##########
@@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.jobmaster.JobManagerRunner;
+import org.apache.flink.runtime.jobmaster.JobMasterGateway;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.messages.webmonitor.JobDetails;
+import org.apache.flink.runtime.rpc.RpcUtils;
+import org.apache.flink.util.AutoCloseableAsync;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.concurrent.GuardedBy;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Abstraction used by the {@link Dispatcher} to manage JobManagers, in
+ * particular during initialization.
+ * While a job is initializing, the JobMasterGateway is not available. A small subset
+ * of the methods of the JobMasterGateway necessary during initialization are provided
+ * by this class (job details, cancel).
+ */

Review comment:
       It is not mainly during initialization. The `DispatcherJob` is the basic abstraction of a Flink job seen from the `Dispatcher`. It has its own lifecycle and offers a set of operations.

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherJob.java
##########
@@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.jobmaster.JobManagerRunner;
+import org.apache.flink.runtime.jobmaster.JobMasterGateway;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.messages.webmonitor.JobDetails;
+import org.apache.flink.runtime.rpc.RpcUtils;
+import org.apache.flink.util.AutoCloseableAsync;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.concurrent.GuardedBy;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Abstraction used by the {@link Dispatcher} to manage JobManagers, in
+ * particular during initialization.
+ * While a job is initializing, the JobMasterGateway is not available. A small subset
+ * of the methods of the JobMasterGateway necessary during initialization are provided
+ * by this class (job details, cancel).
+ */
+public final class DispatcherJob implements AutoCloseableAsync {
+
+	private final Logger log = LoggerFactory.getLogger(DispatcherJob.class);
+
+	private final CompletableFuture<JobManagerRunner> jobManagerRunnerFuture;
+	private final CompletableFuture<ArchivedExecutionGraph> jobResultFuture;
+
+	private final long initializationTimestamp;
+	private final JobID jobId;
+	private final String jobName;
+
+	private final Object lock = new Object();
+
+	// internal field to track job status during initialization. Is not updated anymore after
+	// job is initialized, cancelled or failed.
+	@GuardedBy("lock")
+	private JobStatus jobStatus = JobStatus.INITIALIZING;
+
+	private final CompletableFuture<Void> terminationFuture = new CompletableFuture<>();

Review comment:
       I would suggest to not mix final and non-final fields.

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraph.java
##########
@@ -351,4 +352,52 @@ public static ArchivedExecutionGraph createFrom(ExecutionGraph executionGraph) {
 			executionGraph.getCheckpointStatsSnapshot(),
 			executionGraph.getStateBackendName().orElse(null));
 	}
+
+	/**
+	 * Create an ArchivedExecutionGraph from an initializing job.
+	 */
+	public static ArchivedExecutionGraph createFromInitializingJob(
+		JobID jobId,
+		String jobName,
+		@Nullable Throwable throwable,
+		JobStatus finalJobStatus,
+		long initializationTimestamp) {
+
+		long failureTime = System.currentTimeMillis();
+		Map<JobVertexID, ArchivedExecutionJobVertex> archivedTasks = Collections.emptyMap();
+		List<ArchivedExecutionJobVertex> archivedVerticesInCreationOrder = Collections.emptyList();
+		final Map<String, SerializedValue<OptionalFailure<Object>>> serializedUserAccumulators = Collections.emptyMap();
+		StringifiedAccumulatorResult[] archivedUserAccumulators = new StringifiedAccumulatorResult[]{};
+
+		final long[] timestamps = new long[JobStatus.values().length];
+		timestamps[JobStatus.CREATED.ordinal()] = initializationTimestamp;
+		timestamps[JobStatus.INITIALIZING.ordinal()] = initializationTimestamp;
+
+		String jsonPlan = "{}";
+
+		ErrorInfo failureInfo = null;
+		if (throwable != null) {
+			failureInfo = new ErrorInfo(throwable, failureTime);
+			timestamps[JobStatus.FAILED.ordinal()] = failureTime;

Review comment:
       I would suggest to assert that the `finalJobStatus` is `FAILED`.

##########
File path: flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java
##########
@@ -111,4 +122,41 @@ public static void executeProgram(
 			Thread.currentThread().setContextClassLoader(contextClassLoader);
 		}
 	}
+
+	/**
+	 * This method blocks until the job status is not INITIALIZING anymore.
+	 * If the job is FAILED, it throws an CompletionException with the failure cause.
+	 * @param jobStatusSupplier supplier returning the job status.
+	 */
+	public static void waitUntilJobInitializationFinished(
+		SupplierWithException<JobStatus, Exception> jobStatusSupplier,
+		SupplierWithException<JobResult, Exception> jobResultSupplier
+		) throws CompletionException {
+		LOG.debug("Wait until job initialization is finished");
+		WaitStrategy waitStrategy = new ExponentialWaitStrategy(50, 2000);
+		try {
+			JobStatus status = jobStatusSupplier.get();
+			long attempt = 0;
+			while (status == JobStatus.INITIALIZING) {
+				Thread.sleep(waitStrategy.sleepTime(attempt++));
+				status = jobStatusSupplier.get();
+			}
+			if (status == JobStatus.FAILED) {
+				// note: we can not distinguish between initialization failures and failures once
+				// execution has started. Execution errors are potentially reported here.
+				JobResult result = jobResultSupplier.get();
+				Optional<SerializedThrowable> throwable = result.getSerializedThrowable();
+				if (throwable.isPresent()) {
+					throw new CompletionException(throwable.get().deserializeError(Thread.currentThread().getContextClassLoader()));

Review comment:
       Is the context class loader strictly the user code class loader?

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
##########
@@ -350,71 +341,42 @@ private boolean isPartialResourceConfigured(JobGraph jobGraph) {
 		}, getRpcService().getExecutor());
 	}
 
-	private CompletableFuture<Void> persistAndRunJob(JobGraph jobGraph) throws Exception {
+	private Void persistAndRunJob(JobGraph jobGraph) throws Exception {
 		jobGraphWriter.putJobGraph(jobGraph);
-
-		final CompletableFuture<Void> runJobFuture = runJob(jobGraph);
-
-		return runJobFuture.whenComplete(BiConsumerWithException.unchecked((Object ignored, Throwable throwable) -> {
-			if (throwable != null) {
-				jobGraphWriter.removeJobGraph(jobGraph.getJobID());
-			}
-		}));
+		runJob(jobGraph, false);
+		return null;

Review comment:
       please remove

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
##########
@@ -350,71 +341,42 @@ private boolean isPartialResourceConfigured(JobGraph jobGraph) {
 		}, getRpcService().getExecutor());
 	}
 
-	private CompletableFuture<Void> persistAndRunJob(JobGraph jobGraph) throws Exception {
+	private Void persistAndRunJob(JobGraph jobGraph) throws Exception {

Review comment:
       ```suggestion
   	private void persistAndRunJob(JobGraph jobGraph) throws Exception {
   ```

##########
File path: flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java
##########
@@ -111,4 +122,41 @@ public static void executeProgram(
 			Thread.currentThread().setContextClassLoader(contextClassLoader);
 		}
 	}
+
+	/**
+	 * This method blocks until the job status is not INITIALIZING anymore.
+	 * If the job is FAILED, it throws an CompletionException with the failure cause.
+	 * @param jobStatusSupplier supplier returning the job status.
+	 */
+	public static void waitUntilJobInitializationFinished(
+		SupplierWithException<JobStatus, Exception> jobStatusSupplier,
+		SupplierWithException<JobResult, Exception> jobResultSupplier
+		) throws CompletionException {

Review comment:
       Nit: this is a bit uncommon formatting in the Flink code base.
   
   ```suggestion
   	public static void waitUntilJobInitializationFinished(
   		    SupplierWithException<JobStatus, Exception> jobStatusSupplier,
   		    SupplierWithException<JobResult, Exception> jobResultSupplier) 
   		throws CompletionException {
   ```

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherJob.java
##########
@@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.jobmaster.JobManagerRunner;
+import org.apache.flink.runtime.jobmaster.JobMasterGateway;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.messages.webmonitor.JobDetails;
+import org.apache.flink.runtime.rpc.RpcUtils;
+import org.apache.flink.util.AutoCloseableAsync;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.concurrent.GuardedBy;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Abstraction used by the {@link Dispatcher} to manage JobManagers, in
+ * particular during initialization.
+ * While a job is initializing, the JobMasterGateway is not available. A small subset
+ * of the methods of the JobMasterGateway necessary during initialization are provided
+ * by this class (job details, cancel).
+ */
+public final class DispatcherJob implements AutoCloseableAsync {
+
+	private final Logger log = LoggerFactory.getLogger(DispatcherJob.class);
+
+	private final CompletableFuture<JobManagerRunner> jobManagerRunnerFuture;
+	private final CompletableFuture<ArchivedExecutionGraph> jobResultFuture;
+
+	private final long initializationTimestamp;
+	private final JobID jobId;
+	private final String jobName;
+
+	private final Object lock = new Object();
+
+	// internal field to track job status during initialization. Is not updated anymore after
+	// job is initialized, cancelled or failed.
+	@GuardedBy("lock")
+	private JobStatus jobStatus = JobStatus.INITIALIZING;

Review comment:
       I'd suggest to use a different type here because we are effectively limiting ourselves in the implementation to `INITIALIZING`, `CANCELLING` and `RUNNING` if I am not mistaken.

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
##########
@@ -350,71 +341,42 @@ private boolean isPartialResourceConfigured(JobGraph jobGraph) {
 		}, getRpcService().getExecutor());
 	}
 
-	private CompletableFuture<Void> persistAndRunJob(JobGraph jobGraph) throws Exception {
+	private Void persistAndRunJob(JobGraph jobGraph) throws Exception {
 		jobGraphWriter.putJobGraph(jobGraph);
-
-		final CompletableFuture<Void> runJobFuture = runJob(jobGraph);
-
-		return runJobFuture.whenComplete(BiConsumerWithException.unchecked((Object ignored, Throwable throwable) -> {
-			if (throwable != null) {
-				jobGraphWriter.removeJobGraph(jobGraph.getJobID());
-			}
-		}));
+		runJob(jobGraph, false);
+		return null;
 	}
 
-	private CompletableFuture<Void> runJob(JobGraph jobGraph) {
-		Preconditions.checkState(!jobManagerRunnerFutures.containsKey(jobGraph.getJobID()));
-
-		final CompletableFuture<JobManagerRunner> jobManagerRunnerFuture = createJobManagerRunner(jobGraph);
+	private void runJob(JobGraph jobGraph, boolean isRecovery) {

Review comment:
       It is easier for maintenance purposes to introduce an enum instead of boolean parameters or to introduce a new method name (e.g. `runRecoveredJob` and `runSubmittedJob`) to give more meaning and context.

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
##########
@@ -732,7 +699,7 @@ private void cleanUpJobData(JobID jobId, boolean cleanupHA) {
 	private void terminateJobManagerRunners() {

Review comment:
       Please update method names with reference to the `JobManagerRunner` throughout the `Dispatcher` class.

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
##########
@@ -802,23 +769,28 @@ private void jobMasterFailed(JobID jobId, Throwable cause) {
 		onFatalError(new FlinkException(String.format("JobMaster for job %s failed.", jobId), cause));
 	}
 
-	private CompletableFuture<JobMasterGateway> getJobMasterGatewayFuture(JobID jobId) {
-		final CompletableFuture<JobManagerRunner> jobManagerRunnerFuture = jobManagerRunnerFutures.get(jobId);
+	/**
+	 * Ensures that the JobMasterGateway is available.
+	 */
+	private CompletableFuture<JobMasterGateway> getJobMasterGateway(JobID jobId) throws
+		FlinkJobNotFoundException,
+		UnavailableDispatcherOperationException {
+		DispatcherJob job = runningJobs.get(jobId);
+		if (job == null) {
+			throw new FlinkJobNotFoundException(jobId);
+		}
+		if (!job.isRunning()) {
+			throw new UnavailableDispatcherOperationException("Unable to get JobMasterGateway for initializing job. "
+				+ "The requested operation is not available while the JobManager is still initializing.");

Review comment:
       ```suggestion
   				+ "The requested operation is not available while the JobManager is initializing.");
   ```

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
##########
@@ -427,21 +389,39 @@ private JobManagerRunner startJobManagerRunner(JobManagerRunner jobManagerRunner
 							}
 						}
 					} else {
-						log.debug("There is a newer JobManagerRunner for the job {}.", jobId);
+						log.debug("Job {} is not registered anymore at dispatcher", jobId);
 					}
-
 					return null;
 				}, getMainThreadExecutor()));
+	}
 
-		jobManagerRunner.start();
-
-		return jobManagerRunner;
+	CompletableFuture<JobManagerRunner> createJobManagerRunner(JobGraph jobGraph) {
+		final RpcService rpcService = getRpcService();
+		return CompletableFuture.supplyAsync(
+			() -> {
+				try {
+					JobManagerRunner runner = jobManagerRunnerFactory.createJobManagerRunner(
+						jobGraph,
+						configuration,
+						rpcService,
+						highAvailabilityServices,
+						heartbeatServices,
+						jobManagerSharedServices,
+						new DefaultJobManagerJobMetricGroupFactory(jobManagerMetricGroup),
+						fatalErrorHandler);
+					runner.start();
+					return runner;
+				} catch (Exception e) {
+					throw new CompletionException(new JobExecutionException(jobGraph.getJobID(), "Could not instantiate JobManager.", e));
+				}
+			},
+			rpcService.getExecutor()); // do not use main thread executor. Otherwise, Dispatcher is blocked on JobManager creation

Review comment:
       Let's file a follow up issue to replace `rpcService.getExecutor` with a proper `ioExecutor`.

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherJob.java
##########
@@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.jobmaster.JobManagerRunner;
+import org.apache.flink.runtime.jobmaster.JobMasterGateway;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.messages.webmonitor.JobDetails;
+import org.apache.flink.runtime.rpc.RpcUtils;
+import org.apache.flink.util.AutoCloseableAsync;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.concurrent.GuardedBy;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Abstraction used by the {@link Dispatcher} to manage JobManagers, in
+ * particular during initialization.
+ * While a job is initializing, the JobMasterGateway is not available. A small subset
+ * of the methods of the JobMasterGateway necessary during initialization are provided
+ * by this class (job details, cancel).
+ */
+public final class DispatcherJob implements AutoCloseableAsync {
+
+	private final Logger log = LoggerFactory.getLogger(DispatcherJob.class);
+
+	private final CompletableFuture<JobManagerRunner> jobManagerRunnerFuture;
+	private final CompletableFuture<ArchivedExecutionGraph> jobResultFuture;
+
+	private final long initializationTimestamp;
+	private final JobID jobId;
+	private final String jobName;
+
+	private final Object lock = new Object();
+
+	// internal field to track job status during initialization. Is not updated anymore after
+	// job is initialized, cancelled or failed.
+	@GuardedBy("lock")
+	private JobStatus jobStatus = JobStatus.INITIALIZING;
+
+	private final CompletableFuture<Void> terminationFuture = new CompletableFuture<>();
+
+	private enum SubmissionType {
+		INITIAL, RECOVERY

Review comment:
       ```suggestion
   		SUBMISSION, RECOVERY
   ```

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherJob.java
##########
@@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.jobmaster.JobManagerRunner;
+import org.apache.flink.runtime.jobmaster.JobMasterGateway;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.messages.webmonitor.JobDetails;
+import org.apache.flink.runtime.rpc.RpcUtils;
+import org.apache.flink.util.AutoCloseableAsync;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.concurrent.GuardedBy;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Abstraction used by the {@link Dispatcher} to manage JobManagers, in
+ * particular during initialization.
+ * While a job is initializing, the JobMasterGateway is not available. A small subset
+ * of the methods of the JobMasterGateway necessary during initialization are provided
+ * by this class (job details, cancel).
+ */
+public final class DispatcherJob implements AutoCloseableAsync {
+
+	private final Logger log = LoggerFactory.getLogger(DispatcherJob.class);
+
+	private final CompletableFuture<JobManagerRunner> jobManagerRunnerFuture;
+	private final CompletableFuture<ArchivedExecutionGraph> jobResultFuture;
+
+	private final long initializationTimestamp;
+	private final JobID jobId;
+	private final String jobName;
+
+	private final Object lock = new Object();
+
+	// internal field to track job status during initialization. Is not updated anymore after
+	// job is initialized, cancelled or failed.
+	@GuardedBy("lock")
+	private JobStatus jobStatus = JobStatus.INITIALIZING;

Review comment:
       `jobStatus` is accessed outside of `lock` in the method `requestJob`.

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherJob.java
##########
@@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.jobmaster.JobManagerRunner;
+import org.apache.flink.runtime.jobmaster.JobMasterGateway;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.messages.webmonitor.JobDetails;
+import org.apache.flink.runtime.rpc.RpcUtils;
+import org.apache.flink.util.AutoCloseableAsync;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.concurrent.GuardedBy;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Abstraction used by the {@link Dispatcher} to manage JobManagers, in
+ * particular during initialization.
+ * While a job is initializing, the JobMasterGateway is not available. A small subset
+ * of the methods of the JobMasterGateway necessary during initialization are provided
+ * by this class (job details, cancel).
+ */
+public final class DispatcherJob implements AutoCloseableAsync {
+
+	private final Logger log = LoggerFactory.getLogger(DispatcherJob.class);
+
+	private final CompletableFuture<JobManagerRunner> jobManagerRunnerFuture;
+	private final CompletableFuture<ArchivedExecutionGraph> jobResultFuture;
+
+	private final long initializationTimestamp;
+	private final JobID jobId;
+	private final String jobName;
+
+	private final Object lock = new Object();
+
+	// internal field to track job status during initialization. Is not updated anymore after
+	// job is initialized, cancelled or failed.
+	@GuardedBy("lock")
+	private JobStatus jobStatus = JobStatus.INITIALIZING;
+
+	private final CompletableFuture<Void> terminationFuture = new CompletableFuture<>();
+
+	private enum SubmissionType {
+		INITIAL, RECOVERY
+	}
+
+	static DispatcherJob createForSubmission(
+		CompletableFuture<JobManagerRunner> jobManagerRunnerFuture,
+		JobID jobId,
+		String jobName,
+		long initializationTimestamp) {
+		return new DispatcherJob(jobManagerRunnerFuture, jobId, jobName, initializationTimestamp, SubmissionType.INITIAL);
+	}
+
+	static DispatcherJob createForRecovery(
+		CompletableFuture<JobManagerRunner> jobManagerRunnerFuture,
+		JobID jobId,
+		String jobName,
+		long initializationTimestamp) {
+		return new DispatcherJob(jobManagerRunnerFuture, jobId, jobName, initializationTimestamp, SubmissionType.RECOVERY);
+	}
+
+	private DispatcherJob(
+		CompletableFuture<JobManagerRunner> jobManagerRunnerFuture,
+		JobID jobId,
+		String jobName,
+		long initializationTimestamp,
+		SubmissionType submissionType) {
+		this.jobManagerRunnerFuture = jobManagerRunnerFuture;
+		this.jobId = jobId;
+		this.jobName = jobName;
+		this.initializationTimestamp = initializationTimestamp;
+		this.jobResultFuture = new CompletableFuture<>();
+
+		FutureUtils.assertNoException(this.jobManagerRunnerFuture.handle((jobManagerRunner, throwable) -> {
+			// JM has been initialized, or the initialization failed
+			synchronized (lock) {
+				if (throwable == null) {
+					// Forward result future
+					FutureUtils.forward(jobManagerRunner.getResultFuture(), jobResultFuture);
+
+					if (jobStatus == JobStatus.CANCELLING) {
+						log.info("Cancellation during initialization has been requested for job {}. Initialization completed, cancelling job.", jobId);
+
+						// cancel job
+						jobManagerRunner
+							.getJobMasterGateway()
+							.thenCompose(gw -> gw.cancel(RpcUtils.INF_TIMEOUT))
+							.whenComplete((ignored, cancelThrowable) -> {
+							if (cancelThrowable != null) {
+								log.warn("Cancellation of job {} failed", jobId, cancelThrowable);
+							}
+						});
+
+						// cancellation will eventually complete the jobResultFuture
+						jobResultFuture.whenComplete((archivedExecutionGraph, resultThrowable) -> {
+							synchronized (lock) {
+								if (resultThrowable == null) {
+									jobStatus = archivedExecutionGraph.getState();
+								} else {
+									jobStatus = JobStatus.FAILED;
+								}
+							}
+						});

Review comment:
       What happens if I call `dispatcherJob.requestJob` after the job has been cancelled in state `INITIALIZING`? I would like to obtain the final `CANCELED` `ArchivedExecutionGraph` ideally.

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherJob.java
##########
@@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.jobmaster.JobManagerRunner;
+import org.apache.flink.runtime.jobmaster.JobMasterGateway;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.messages.webmonitor.JobDetails;
+import org.apache.flink.runtime.rpc.RpcUtils;
+import org.apache.flink.util.AutoCloseableAsync;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.concurrent.GuardedBy;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Abstraction used by the {@link Dispatcher} to manage JobManagers, in
+ * particular during initialization.
+ * While a job is initializing, the JobMasterGateway is not available. A small subset
+ * of the methods of the JobMasterGateway necessary during initialization are provided
+ * by this class (job details, cancel).
+ */
+public final class DispatcherJob implements AutoCloseableAsync {
+
+	private final Logger log = LoggerFactory.getLogger(DispatcherJob.class);
+
+	private final CompletableFuture<JobManagerRunner> jobManagerRunnerFuture;
+	private final CompletableFuture<ArchivedExecutionGraph> jobResultFuture;
+
+	private final long initializationTimestamp;
+	private final JobID jobId;
+	private final String jobName;
+
+	private final Object lock = new Object();
+
+	// internal field to track job status during initialization. Is not updated anymore after
+	// job is initialized, cancelled or failed.
+	@GuardedBy("lock")
+	private JobStatus jobStatus = JobStatus.INITIALIZING;
+
+	private final CompletableFuture<Void> terminationFuture = new CompletableFuture<>();
+
+	private enum SubmissionType {
+		INITIAL, RECOVERY
+	}
+
+	static DispatcherJob createForSubmission(
+		CompletableFuture<JobManagerRunner> jobManagerRunnerFuture,
+		JobID jobId,
+		String jobName,
+		long initializationTimestamp) {
+		return new DispatcherJob(jobManagerRunnerFuture, jobId, jobName, initializationTimestamp, SubmissionType.INITIAL);
+	}
+
+	static DispatcherJob createForRecovery(
+		CompletableFuture<JobManagerRunner> jobManagerRunnerFuture,
+		JobID jobId,
+		String jobName,
+		long initializationTimestamp) {
+		return new DispatcherJob(jobManagerRunnerFuture, jobId, jobName, initializationTimestamp, SubmissionType.RECOVERY);
+	}
+
+	private DispatcherJob(
+		CompletableFuture<JobManagerRunner> jobManagerRunnerFuture,
+		JobID jobId,
+		String jobName,
+		long initializationTimestamp,
+		SubmissionType submissionType) {
+		this.jobManagerRunnerFuture = jobManagerRunnerFuture;
+		this.jobId = jobId;
+		this.jobName = jobName;
+		this.initializationTimestamp = initializationTimestamp;
+		this.jobResultFuture = new CompletableFuture<>();
+
+		FutureUtils.assertNoException(this.jobManagerRunnerFuture.handle((jobManagerRunner, throwable) -> {
+			// JM has been initialized, or the initialization failed
+			synchronized (lock) {
+				if (throwable == null) {
+					// Forward result future
+					FutureUtils.forward(jobManagerRunner.getResultFuture(), jobResultFuture);
+
+					if (jobStatus == JobStatus.CANCELLING) {
+						log.info("Cancellation during initialization has been requested for job {}. Initialization completed, cancelling job.", jobId);
+
+						// cancel job
+						jobManagerRunner
+							.getJobMasterGateway()
+							.thenCompose(gw -> gw.cancel(RpcUtils.INF_TIMEOUT))
+							.whenComplete((ignored, cancelThrowable) -> {
+							if (cancelThrowable != null) {
+								log.warn("Cancellation of job {} failed", jobId, cancelThrowable);
+							}
+						});
+
+						// cancellation will eventually complete the jobResultFuture
+						jobResultFuture.whenComplete((archivedExecutionGraph, resultThrowable) -> {
+							synchronized (lock) {
+								if (resultThrowable == null) {
+									jobStatus = archivedExecutionGraph.getState();
+								} else {
+									jobStatus = JobStatus.FAILED;
+								}
+							}
+						});
+					} else {
+						jobStatus = JobStatus.RUNNING; // this status should never be exposed from the DispatcherJob. Only used internally for tracking running state
+					}
+				} else { // failure during initialization
+					if (submissionType == SubmissionType.RECOVERY) {
+						jobResultFuture.completeExceptionally(throwable);
+					} else {
+						jobResultFuture.complete(ArchivedExecutionGraph.createFromInitializingJob(
+							jobId,
+							jobName,
+							throwable,
+							JobStatus.FAILED,
+							initializationTimestamp));
+					}
+				}
+			}
+			return null;
+		}));
+	}
+
+	public CompletableFuture<ArchivedExecutionGraph> getResultFuture() {
+		return jobResultFuture;
+	}
+
+	public CompletableFuture<JobDetails> requestJobDetails(Time timeout) {
+		synchronized (lock) {
+			if (isRunning()) {
+				return getJobMasterGateway().thenCompose(jobMasterGateway -> jobMasterGateway.requestJobDetails(
+					timeout));
+			} else {
+				int[] tasksPerState = new int[ExecutionState.values().length];
+				return CompletableFuture.completedFuture(new JobDetails(
+					jobId,
+					jobName,
+					initializationTimestamp,
+					0,
+					0,
+					jobStatus,
+					0,
+					tasksPerState,
+					0));

Review comment:
       factor out into `createInitializingJobDetails`

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherJob.java
##########
@@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.jobmaster.JobManagerRunner;
+import org.apache.flink.runtime.jobmaster.JobMasterGateway;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.messages.webmonitor.JobDetails;
+import org.apache.flink.runtime.rpc.RpcUtils;
+import org.apache.flink.util.AutoCloseableAsync;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.concurrent.GuardedBy;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Abstraction used by the {@link Dispatcher} to manage JobManagers, in
+ * particular during initialization.
+ * While a job is initializing, the JobMasterGateway is not available. A small subset
+ * of the methods of the JobMasterGateway necessary during initialization are provided
+ * by this class (job details, cancel).
+ */
+public final class DispatcherJob implements AutoCloseableAsync {
+
+	private final Logger log = LoggerFactory.getLogger(DispatcherJob.class);
+
+	private final CompletableFuture<JobManagerRunner> jobManagerRunnerFuture;
+	private final CompletableFuture<ArchivedExecutionGraph> jobResultFuture;
+
+	private final long initializationTimestamp;
+	private final JobID jobId;
+	private final String jobName;
+
+	private final Object lock = new Object();
+
+	// internal field to track job status during initialization. Is not updated anymore after
+	// job is initialized, cancelled or failed.
+	@GuardedBy("lock")
+	private JobStatus jobStatus = JobStatus.INITIALIZING;
+
+	private final CompletableFuture<Void> terminationFuture = new CompletableFuture<>();
+
+	private enum SubmissionType {
+		INITIAL, RECOVERY
+	}
+
+	static DispatcherJob createForSubmission(
+		CompletableFuture<JobManagerRunner> jobManagerRunnerFuture,
+		JobID jobId,
+		String jobName,
+		long initializationTimestamp) {
+		return new DispatcherJob(jobManagerRunnerFuture, jobId, jobName, initializationTimestamp, SubmissionType.INITIAL);
+	}
+
+	static DispatcherJob createForRecovery(
+		CompletableFuture<JobManagerRunner> jobManagerRunnerFuture,
+		JobID jobId,
+		String jobName,
+		long initializationTimestamp) {
+		return new DispatcherJob(jobManagerRunnerFuture, jobId, jobName, initializationTimestamp, SubmissionType.RECOVERY);
+	}
+
+	private DispatcherJob(
+		CompletableFuture<JobManagerRunner> jobManagerRunnerFuture,
+		JobID jobId,
+		String jobName,
+		long initializationTimestamp,
+		SubmissionType submissionType) {
+		this.jobManagerRunnerFuture = jobManagerRunnerFuture;
+		this.jobId = jobId;
+		this.jobName = jobName;
+		this.initializationTimestamp = initializationTimestamp;
+		this.jobResultFuture = new CompletableFuture<>();
+
+		FutureUtils.assertNoException(this.jobManagerRunnerFuture.handle((jobManagerRunner, throwable) -> {
+			// JM has been initialized, or the initialization failed
+			synchronized (lock) {
+				if (throwable == null) {
+					// Forward result future
+					FutureUtils.forward(jobManagerRunner.getResultFuture(), jobResultFuture);
+
+					if (jobStatus == JobStatus.CANCELLING) {
+						log.info("Cancellation during initialization has been requested for job {}. Initialization completed, cancelling job.", jobId);
+
+						// cancel job
+						jobManagerRunner
+							.getJobMasterGateway()
+							.thenCompose(gw -> gw.cancel(RpcUtils.INF_TIMEOUT))
+							.whenComplete((ignored, cancelThrowable) -> {
+							if (cancelThrowable != null) {
+								log.warn("Cancellation of job {} failed", jobId, cancelThrowable);
+							}
+						});
+
+						// cancellation will eventually complete the jobResultFuture
+						jobResultFuture.whenComplete((archivedExecutionGraph, resultThrowable) -> {
+							synchronized (lock) {
+								if (resultThrowable == null) {
+									jobStatus = archivedExecutionGraph.getState();
+								} else {
+									jobStatus = JobStatus.FAILED;
+								}
+							}
+						});
+					} else {
+						jobStatus = JobStatus.RUNNING; // this status should never be exposed from the DispatcherJob. Only used internally for tracking running state
+					}
+				} else { // failure during initialization
+					if (submissionType == SubmissionType.RECOVERY) {
+						jobResultFuture.completeExceptionally(throwable);
+					} else {
+						jobResultFuture.complete(ArchivedExecutionGraph.createFromInitializingJob(
+							jobId,
+							jobName,
+							throwable,
+							JobStatus.FAILED,
+							initializationTimestamp));
+					}
+				}
+			}
+			return null;
+		}));
+	}
+
+	public CompletableFuture<ArchivedExecutionGraph> getResultFuture() {
+		return jobResultFuture;
+	}
+
+	public CompletableFuture<JobDetails> requestJobDetails(Time timeout) {
+		synchronized (lock) {
+			if (isRunning()) {
+				return getJobMasterGateway().thenCompose(jobMasterGateway -> jobMasterGateway.requestJobDetails(
+					timeout));
+			} else {
+				int[] tasksPerState = new int[ExecutionState.values().length];
+				return CompletableFuture.completedFuture(new JobDetails(
+					jobId,
+					jobName,
+					initializationTimestamp,
+					0,
+					0,
+					jobStatus,
+					0,
+					tasksPerState,
+					0));
+			}
+		}
+	}
+
+	public CompletableFuture<Acknowledge> cancel(Time timeout) {
+		synchronized (lock) {
+			if (isRunning()) {
+				return getJobMasterGateway().thenCompose(jobMasterGateway -> jobMasterGateway.cancel(timeout));
+			} else {
+				log.info("Cancellation during initialization requested for job {}. Job will be cancelled once JobManager has been initialized.", jobId);
+				jobStatus = JobStatus.CANCELLING;
+				return jobResultFuture.thenApply(ignored -> Acknowledge.get());
+			}
+		}
+	}
+
+	public CompletableFuture<JobStatus> requestJobStatus(Time timeout) {
+		synchronized (lock) {
+			if (isRunning()) {
+				return getJobMasterGateway().thenCompose(jobMasterGateway -> jobMasterGateway.requestJobStatus(timeout));
+			} else {
+				return CompletableFuture.completedFuture(jobStatus);
+			}
+		}
+	}
+
+	public CompletableFuture<ArchivedExecutionGraph> requestJob(Time timeout) {
+		synchronized (lock) {
+			if (isRunning()) {
+				return getJobMasterGateway().thenCompose(jobMasterGateway -> jobMasterGateway.requestJob(timeout));
+			} else {
+				return CompletableFuture.supplyAsync(() -> ArchivedExecutionGraph.createFromInitializingJob(jobId, jobName, null, jobStatus, initializationTimestamp));
+			}
+		}
+	}
+
+	public boolean isRunning() {
+		synchronized (lock) {
+			return jobStatus == JobStatus.RUNNING;
+		}
+	}
+
+	public CompletableFuture<JobMasterGateway> getJobMasterGateway() {
+		Preconditions.checkState(
+			isRunning(),
+			"JobMaster Gateway is not available during initialization");
+		try {
+			return jobManagerRunnerFuture.thenCompose(JobManagerRunner::getJobMasterGateway);
+		} catch (Throwable e) {
+			throw new IllegalStateException("JobMaster gateway is not available", e);
+		}

Review comment:
       Why is this catch block necessary?

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherJob.java
##########
@@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.jobmaster.JobManagerRunner;
+import org.apache.flink.runtime.jobmaster.JobMasterGateway;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.messages.webmonitor.JobDetails;
+import org.apache.flink.runtime.rpc.RpcUtils;
+import org.apache.flink.util.AutoCloseableAsync;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.concurrent.GuardedBy;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Abstraction used by the {@link Dispatcher} to manage JobManagers, in
+ * particular during initialization.
+ * While a job is initializing, the JobMasterGateway is not available. A small subset
+ * of the methods of the JobMasterGateway necessary during initialization are provided
+ * by this class (job details, cancel).
+ */
+public final class DispatcherJob implements AutoCloseableAsync {
+
+	private final Logger log = LoggerFactory.getLogger(DispatcherJob.class);
+
+	private final CompletableFuture<JobManagerRunner> jobManagerRunnerFuture;
+	private final CompletableFuture<ArchivedExecutionGraph> jobResultFuture;
+
+	private final long initializationTimestamp;
+	private final JobID jobId;
+	private final String jobName;
+
+	private final Object lock = new Object();
+
+	// internal field to track job status during initialization. Is not updated anymore after
+	// job is initialized, cancelled or failed.
+	@GuardedBy("lock")
+	private JobStatus jobStatus = JobStatus.INITIALIZING;
+
+	private final CompletableFuture<Void> terminationFuture = new CompletableFuture<>();
+
+	private enum SubmissionType {
+		INITIAL, RECOVERY
+	}
+
+	static DispatcherJob createForSubmission(
+		CompletableFuture<JobManagerRunner> jobManagerRunnerFuture,
+		JobID jobId,
+		String jobName,
+		long initializationTimestamp) {
+		return new DispatcherJob(jobManagerRunnerFuture, jobId, jobName, initializationTimestamp, SubmissionType.INITIAL);
+	}
+
+	static DispatcherJob createForRecovery(
+		CompletableFuture<JobManagerRunner> jobManagerRunnerFuture,
+		JobID jobId,
+		String jobName,
+		long initializationTimestamp) {
+		return new DispatcherJob(jobManagerRunnerFuture, jobId, jobName, initializationTimestamp, SubmissionType.RECOVERY);
+	}
+
+	private DispatcherJob(
+		CompletableFuture<JobManagerRunner> jobManagerRunnerFuture,
+		JobID jobId,
+		String jobName,
+		long initializationTimestamp,
+		SubmissionType submissionType) {
+		this.jobManagerRunnerFuture = jobManagerRunnerFuture;
+		this.jobId = jobId;
+		this.jobName = jobName;
+		this.initializationTimestamp = initializationTimestamp;
+		this.jobResultFuture = new CompletableFuture<>();
+
+		FutureUtils.assertNoException(this.jobManagerRunnerFuture.handle((jobManagerRunner, throwable) -> {
+			// JM has been initialized, or the initialization failed
+			synchronized (lock) {
+				if (throwable == null) {
+					// Forward result future
+					FutureUtils.forward(jobManagerRunner.getResultFuture(), jobResultFuture);
+
+					if (jobStatus == JobStatus.CANCELLING) {
+						log.info("Cancellation during initialization has been requested for job {}. Initialization completed, cancelling job.", jobId);
+
+						// cancel job
+						jobManagerRunner
+							.getJobMasterGateway()
+							.thenCompose(gw -> gw.cancel(RpcUtils.INF_TIMEOUT))
+							.whenComplete((ignored, cancelThrowable) -> {
+							if (cancelThrowable != null) {
+								log.warn("Cancellation of job {} failed", jobId, cancelThrowable);
+							}
+						});
+
+						// cancellation will eventually complete the jobResultFuture
+						jobResultFuture.whenComplete((archivedExecutionGraph, resultThrowable) -> {
+							synchronized (lock) {
+								if (resultThrowable == null) {
+									jobStatus = archivedExecutionGraph.getState();
+								} else {
+									jobStatus = JobStatus.FAILED;
+								}
+							}
+						});
+					} else {
+						jobStatus = JobStatus.RUNNING; // this status should never be exposed from the DispatcherJob. Only used internally for tracking running state
+					}
+				} else { // failure during initialization
+					if (submissionType == SubmissionType.RECOVERY) {
+						jobResultFuture.completeExceptionally(throwable);
+					} else {
+						jobResultFuture.complete(ArchivedExecutionGraph.createFromInitializingJob(
+							jobId,
+							jobName,
+							throwable,
+							JobStatus.FAILED,
+							initializationTimestamp));
+					}
+				}
+			}
+			return null;
+		}));
+	}
+
+	public CompletableFuture<ArchivedExecutionGraph> getResultFuture() {
+		return jobResultFuture;
+	}
+
+	public CompletableFuture<JobDetails> requestJobDetails(Time timeout) {
+		synchronized (lock) {
+			if (isRunning()) {
+				return getJobMasterGateway().thenCompose(jobMasterGateway -> jobMasterGateway.requestJobDetails(
+					timeout));
+			} else {
+				int[] tasksPerState = new int[ExecutionState.values().length];
+				return CompletableFuture.completedFuture(new JobDetails(
+					jobId,
+					jobName,
+					initializationTimestamp,
+					0,
+					0,
+					jobStatus,
+					0,
+					tasksPerState,
+					0));
+			}
+		}
+	}
+
+	public CompletableFuture<Acknowledge> cancel(Time timeout) {
+		synchronized (lock) {
+			if (isRunning()) {
+				return getJobMasterGateway().thenCompose(jobMasterGateway -> jobMasterGateway.cancel(timeout));
+			} else {
+				log.info("Cancellation during initialization requested for job {}. Job will be cancelled once JobManager has been initialized.", jobId);
+				jobStatus = JobStatus.CANCELLING;
+				return jobResultFuture.thenApply(ignored -> Acknowledge.get());
+			}
+		}
+	}
+
+	public CompletableFuture<JobStatus> requestJobStatus(Time timeout) {
+		synchronized (lock) {
+			if (isRunning()) {
+				return getJobMasterGateway().thenCompose(jobMasterGateway -> jobMasterGateway.requestJobStatus(timeout));
+			} else {
+				return CompletableFuture.completedFuture(jobStatus);
+			}
+		}
+	}
+
+	public CompletableFuture<ArchivedExecutionGraph> requestJob(Time timeout) {
+		synchronized (lock) {
+			if (isRunning()) {
+				return getJobMasterGateway().thenCompose(jobMasterGateway -> jobMasterGateway.requestJob(timeout));
+			} else {
+				return CompletableFuture.supplyAsync(() -> ArchivedExecutionGraph.createFromInitializingJob(jobId, jobName, null, jobStatus, initializationTimestamp));
+			}
+		}
+	}
+
+	public boolean isRunning() {
+		synchronized (lock) {
+			return jobStatus == JobStatus.RUNNING;
+		}
+	}
+
+	public CompletableFuture<JobMasterGateway> getJobMasterGateway() {
+		Preconditions.checkState(
+			isRunning(),
+			"JobMaster Gateway is not available during initialization");
+		try {
+			return jobManagerRunnerFuture.thenCompose(JobManagerRunner::getJobMasterGateway);
+		} catch (Throwable e) {
+			throw new IllegalStateException("JobMaster gateway is not available", e);
+		}
+	}
+
+	@Override
+	public CompletableFuture<Void> closeAsync() {

Review comment:
       What is the status of a `DispatcherJob` after `closeAsync` is being called? More concretely, what happens if `jobStatus` is `INITIALIZING`?

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherJob.java
##########
@@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.jobmaster.JobManagerRunner;
+import org.apache.flink.runtime.jobmaster.JobMasterGateway;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.messages.webmonitor.JobDetails;
+import org.apache.flink.runtime.rpc.RpcUtils;
+import org.apache.flink.util.AutoCloseableAsync;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.concurrent.GuardedBy;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Abstraction used by the {@link Dispatcher} to manage JobManagers, in
+ * particular during initialization.
+ * While a job is initializing, the JobMasterGateway is not available. A small subset
+ * of the methods of the JobMasterGateway necessary during initialization are provided
+ * by this class (job details, cancel).
+ */
+public final class DispatcherJob implements AutoCloseableAsync {
+
+	private final Logger log = LoggerFactory.getLogger(DispatcherJob.class);
+
+	private final CompletableFuture<JobManagerRunner> jobManagerRunnerFuture;
+	private final CompletableFuture<ArchivedExecutionGraph> jobResultFuture;
+
+	private final long initializationTimestamp;
+	private final JobID jobId;
+	private final String jobName;
+
+	private final Object lock = new Object();
+
+	// internal field to track job status during initialization. Is not updated anymore after
+	// job is initialized, cancelled or failed.
+	@GuardedBy("lock")
+	private JobStatus jobStatus = JobStatus.INITIALIZING;
+
+	private final CompletableFuture<Void> terminationFuture = new CompletableFuture<>();
+
+	private enum SubmissionType {
+		INITIAL, RECOVERY
+	}
+
+	static DispatcherJob createForSubmission(
+		CompletableFuture<JobManagerRunner> jobManagerRunnerFuture,
+		JobID jobId,
+		String jobName,
+		long initializationTimestamp) {
+		return new DispatcherJob(jobManagerRunnerFuture, jobId, jobName, initializationTimestamp, SubmissionType.INITIAL);
+	}
+
+	static DispatcherJob createForRecovery(
+		CompletableFuture<JobManagerRunner> jobManagerRunnerFuture,
+		JobID jobId,
+		String jobName,
+		long initializationTimestamp) {
+		return new DispatcherJob(jobManagerRunnerFuture, jobId, jobName, initializationTimestamp, SubmissionType.RECOVERY);
+	}
+
+	private DispatcherJob(
+		CompletableFuture<JobManagerRunner> jobManagerRunnerFuture,
+		JobID jobId,
+		String jobName,
+		long initializationTimestamp,
+		SubmissionType submissionType) {
+		this.jobManagerRunnerFuture = jobManagerRunnerFuture;
+		this.jobId = jobId;
+		this.jobName = jobName;
+		this.initializationTimestamp = initializationTimestamp;
+		this.jobResultFuture = new CompletableFuture<>();
+
+		FutureUtils.assertNoException(this.jobManagerRunnerFuture.handle((jobManagerRunner, throwable) -> {
+			// JM has been initialized, or the initialization failed
+			synchronized (lock) {
+				if (throwable == null) {
+					// Forward result future
+					FutureUtils.forward(jobManagerRunner.getResultFuture(), jobResultFuture);
+
+					if (jobStatus == JobStatus.CANCELLING) {
+						log.info("Cancellation during initialization has been requested for job {}. Initialization completed, cancelling job.", jobId);
+
+						// cancel job
+						jobManagerRunner
+							.getJobMasterGateway()
+							.thenCompose(gw -> gw.cancel(RpcUtils.INF_TIMEOUT))
+							.whenComplete((ignored, cancelThrowable) -> {
+							if (cancelThrowable != null) {
+								log.warn("Cancellation of job {} failed", jobId, cancelThrowable);
+							}
+						});
+
+						// cancellation will eventually complete the jobResultFuture
+						jobResultFuture.whenComplete((archivedExecutionGraph, resultThrowable) -> {
+							synchronized (lock) {
+								if (resultThrowable == null) {
+									jobStatus = archivedExecutionGraph.getState();
+								} else {
+									jobStatus = JobStatus.FAILED;
+								}
+							}
+						});
+					} else {
+						jobStatus = JobStatus.RUNNING; // this status should never be exposed from the DispatcherJob. Only used internally for tracking running state
+					}
+				} else { // failure during initialization
+					if (submissionType == SubmissionType.RECOVERY) {
+						jobResultFuture.completeExceptionally(throwable);
+					} else {
+						jobResultFuture.complete(ArchivedExecutionGraph.createFromInitializingJob(
+							jobId,
+							jobName,
+							throwable,
+							JobStatus.FAILED,
+							initializationTimestamp));
+					}
+				}
+			}
+			return null;
+		}));
+	}
+
+	public CompletableFuture<ArchivedExecutionGraph> getResultFuture() {
+		return jobResultFuture;
+	}
+
+	public CompletableFuture<JobDetails> requestJobDetails(Time timeout) {
+		synchronized (lock) {
+			if (isRunning()) {
+				return getJobMasterGateway().thenCompose(jobMasterGateway -> jobMasterGateway.requestJobDetails(
+					timeout));
+			} else {
+				int[] tasksPerState = new int[ExecutionState.values().length];
+				return CompletableFuture.completedFuture(new JobDetails(
+					jobId,
+					jobName,
+					initializationTimestamp,
+					0,
+					0,
+					jobStatus,
+					0,
+					tasksPerState,
+					0));
+			}
+		}
+	}
+
+	public CompletableFuture<Acknowledge> cancel(Time timeout) {
+		synchronized (lock) {
+			if (isRunning()) {
+				return getJobMasterGateway().thenCompose(jobMasterGateway -> jobMasterGateway.cancel(timeout));
+			} else {
+				log.info("Cancellation during initialization requested for job {}. Job will be cancelled once JobManager has been initialized.", jobId);
+				jobStatus = JobStatus.CANCELLING;
+				return jobResultFuture.thenApply(ignored -> Acknowledge.get());
+			}
+		}
+	}
+
+	public CompletableFuture<JobStatus> requestJobStatus(Time timeout) {
+		synchronized (lock) {
+			if (isRunning()) {
+				return getJobMasterGateway().thenCompose(jobMasterGateway -> jobMasterGateway.requestJobStatus(timeout));
+			} else {
+				return CompletableFuture.completedFuture(jobStatus);
+			}
+		}
+	}
+
+	public CompletableFuture<ArchivedExecutionGraph> requestJob(Time timeout) {
+		synchronized (lock) {
+			if (isRunning()) {
+				return getJobMasterGateway().thenCompose(jobMasterGateway -> jobMasterGateway.requestJob(timeout));
+			} else {
+				return CompletableFuture.supplyAsync(() -> ArchivedExecutionGraph.createFromInitializingJob(jobId, jobName, null, jobStatus, initializationTimestamp));
+			}
+		}
+	}
+
+	public boolean isRunning() {
+		synchronized (lock) {
+			return jobStatus == JobStatus.RUNNING;
+		}
+	}
+
+	public CompletableFuture<JobMasterGateway> getJobMasterGateway() {
+		Preconditions.checkState(
+			isRunning(),
+			"JobMaster Gateway is not available during initialization");
+		try {
+			return jobManagerRunnerFuture.thenCompose(JobManagerRunner::getJobMasterGateway);
+		} catch (Throwable e) {
+			throw new IllegalStateException("JobMaster gateway is not available", e);
+		}
+	}
+
+	@Override
+	public CompletableFuture<Void> closeAsync() {
+		jobManagerRunnerFuture.handle((runner, throwable) -> {

Review comment:
       `FutureUtils.assertNoException` missing

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherJob.java
##########
@@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.jobmaster.JobManagerRunner;
+import org.apache.flink.runtime.jobmaster.JobMasterGateway;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.messages.webmonitor.JobDetails;
+import org.apache.flink.runtime.rpc.RpcUtils;
+import org.apache.flink.util.AutoCloseableAsync;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.concurrent.GuardedBy;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Abstraction used by the {@link Dispatcher} to manage JobManagers, in
+ * particular during initialization.
+ * While a job is initializing, the JobMasterGateway is not available. A small subset
+ * of the methods of the JobMasterGateway necessary during initialization are provided
+ * by this class (job details, cancel).
+ */
+public final class DispatcherJob implements AutoCloseableAsync {
+
+	private final Logger log = LoggerFactory.getLogger(DispatcherJob.class);
+
+	private final CompletableFuture<JobManagerRunner> jobManagerRunnerFuture;
+	private final CompletableFuture<ArchivedExecutionGraph> jobResultFuture;
+
+	private final long initializationTimestamp;
+	private final JobID jobId;
+	private final String jobName;
+
+	private final Object lock = new Object();
+
+	// internal field to track job status during initialization. Is not updated anymore after
+	// job is initialized, cancelled or failed.
+	@GuardedBy("lock")
+	private JobStatus jobStatus = JobStatus.INITIALIZING;
+
+	private final CompletableFuture<Void> terminationFuture = new CompletableFuture<>();
+
+	private enum SubmissionType {
+		INITIAL, RECOVERY
+	}
+
+	static DispatcherJob createForSubmission(
+		CompletableFuture<JobManagerRunner> jobManagerRunnerFuture,
+		JobID jobId,
+		String jobName,
+		long initializationTimestamp) {
+		return new DispatcherJob(jobManagerRunnerFuture, jobId, jobName, initializationTimestamp, SubmissionType.INITIAL);
+	}
+
+	static DispatcherJob createForRecovery(
+		CompletableFuture<JobManagerRunner> jobManagerRunnerFuture,
+		JobID jobId,
+		String jobName,
+		long initializationTimestamp) {
+		return new DispatcherJob(jobManagerRunnerFuture, jobId, jobName, initializationTimestamp, SubmissionType.RECOVERY);
+	}
+
+	private DispatcherJob(
+		CompletableFuture<JobManagerRunner> jobManagerRunnerFuture,
+		JobID jobId,
+		String jobName,
+		long initializationTimestamp,
+		SubmissionType submissionType) {
+		this.jobManagerRunnerFuture = jobManagerRunnerFuture;
+		this.jobId = jobId;
+		this.jobName = jobName;
+		this.initializationTimestamp = initializationTimestamp;
+		this.jobResultFuture = new CompletableFuture<>();
+
+		FutureUtils.assertNoException(this.jobManagerRunnerFuture.handle((jobManagerRunner, throwable) -> {
+			// JM has been initialized, or the initialization failed
+			synchronized (lock) {
+				if (throwable == null) {
+					// Forward result future
+					FutureUtils.forward(jobManagerRunner.getResultFuture(), jobResultFuture);
+
+					if (jobStatus == JobStatus.CANCELLING) {
+						log.info("Cancellation during initialization has been requested for job {}. Initialization completed, cancelling job.", jobId);
+
+						// cancel job
+						jobManagerRunner
+							.getJobMasterGateway()
+							.thenCompose(gw -> gw.cancel(RpcUtils.INF_TIMEOUT))
+							.whenComplete((ignored, cancelThrowable) -> {
+							if (cancelThrowable != null) {
+								log.warn("Cancellation of job {} failed", jobId, cancelThrowable);
+							}
+						});
+
+						// cancellation will eventually complete the jobResultFuture
+						jobResultFuture.whenComplete((archivedExecutionGraph, resultThrowable) -> {
+							synchronized (lock) {
+								if (resultThrowable == null) {
+									jobStatus = archivedExecutionGraph.getState();
+								} else {
+									jobStatus = JobStatus.FAILED;
+								}
+							}
+						});
+					} else {
+						jobStatus = JobStatus.RUNNING; // this status should never be exposed from the DispatcherJob. Only used internally for tracking running state
+					}
+				} else { // failure during initialization
+					if (submissionType == SubmissionType.RECOVERY) {
+						jobResultFuture.completeExceptionally(throwable);
+					} else {
+						jobResultFuture.complete(ArchivedExecutionGraph.createFromInitializingJob(
+							jobId,
+							jobName,
+							throwable,
+							JobStatus.FAILED,
+							initializationTimestamp));
+					}
+				}
+			}
+			return null;
+		}));
+	}
+
+	public CompletableFuture<ArchivedExecutionGraph> getResultFuture() {
+		return jobResultFuture;
+	}
+
+	public CompletableFuture<JobDetails> requestJobDetails(Time timeout) {
+		synchronized (lock) {
+			if (isRunning()) {
+				return getJobMasterGateway().thenCompose(jobMasterGateway -> jobMasterGateway.requestJobDetails(
+					timeout));
+			} else {
+				int[] tasksPerState = new int[ExecutionState.values().length];
+				return CompletableFuture.completedFuture(new JobDetails(
+					jobId,
+					jobName,
+					initializationTimestamp,
+					0,
+					0,
+					jobStatus,
+					0,
+					tasksPerState,
+					0));
+			}
+		}
+	}
+
+	public CompletableFuture<Acknowledge> cancel(Time timeout) {
+		synchronized (lock) {
+			if (isRunning()) {
+				return getJobMasterGateway().thenCompose(jobMasterGateway -> jobMasterGateway.cancel(timeout));
+			} else {
+				log.info("Cancellation during initialization requested for job {}. Job will be cancelled once JobManager has been initialized.", jobId);
+				jobStatus = JobStatus.CANCELLING;
+				return jobResultFuture.thenApply(ignored -> Acknowledge.get());
+			}
+		}
+	}
+
+	public CompletableFuture<JobStatus> requestJobStatus(Time timeout) {
+		synchronized (lock) {
+			if (isRunning()) {
+				return getJobMasterGateway().thenCompose(jobMasterGateway -> jobMasterGateway.requestJobStatus(timeout));
+			} else {
+				return CompletableFuture.completedFuture(jobStatus);
+			}
+		}
+	}
+
+	public CompletableFuture<ArchivedExecutionGraph> requestJob(Time timeout) {
+		synchronized (lock) {
+			if (isRunning()) {
+				return getJobMasterGateway().thenCompose(jobMasterGateway -> jobMasterGateway.requestJob(timeout));
+			} else {
+				return CompletableFuture.supplyAsync(() -> ArchivedExecutionGraph.createFromInitializingJob(jobId, jobName, null, jobStatus, initializationTimestamp));
+			}

Review comment:
       I think this will return the wrong `ArchivedExecutionGraph` after the job has reached a globally terminal state.

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherJob.java
##########
@@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.jobmaster.JobManagerRunner;
+import org.apache.flink.runtime.jobmaster.JobMasterGateway;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.messages.webmonitor.JobDetails;
+import org.apache.flink.runtime.rpc.RpcUtils;
+import org.apache.flink.util.AutoCloseableAsync;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.concurrent.GuardedBy;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Abstraction used by the {@link Dispatcher} to manage JobManagers, in
+ * particular during initialization.
+ * While a job is initializing, the JobMasterGateway is not available. A small subset
+ * of the methods of the JobMasterGateway necessary during initialization are provided
+ * by this class (job details, cancel).
+ */
+public final class DispatcherJob implements AutoCloseableAsync {
+
+	private final Logger log = LoggerFactory.getLogger(DispatcherJob.class);
+
+	private final CompletableFuture<JobManagerRunner> jobManagerRunnerFuture;
+	private final CompletableFuture<ArchivedExecutionGraph> jobResultFuture;
+
+	private final long initializationTimestamp;
+	private final JobID jobId;
+	private final String jobName;
+
+	private final Object lock = new Object();
+
+	// internal field to track job status during initialization. Is not updated anymore after
+	// job is initialized, cancelled or failed.
+	@GuardedBy("lock")
+	private JobStatus jobStatus = JobStatus.INITIALIZING;
+
+	private final CompletableFuture<Void> terminationFuture = new CompletableFuture<>();
+
+	private enum SubmissionType {
+		INITIAL, RECOVERY
+	}
+
+	static DispatcherJob createForSubmission(
+		CompletableFuture<JobManagerRunner> jobManagerRunnerFuture,
+		JobID jobId,
+		String jobName,
+		long initializationTimestamp) {
+		return new DispatcherJob(jobManagerRunnerFuture, jobId, jobName, initializationTimestamp, SubmissionType.INITIAL);
+	}
+
+	static DispatcherJob createForRecovery(
+		CompletableFuture<JobManagerRunner> jobManagerRunnerFuture,
+		JobID jobId,
+		String jobName,
+		long initializationTimestamp) {
+		return new DispatcherJob(jobManagerRunnerFuture, jobId, jobName, initializationTimestamp, SubmissionType.RECOVERY);
+	}
+
+	private DispatcherJob(
+		CompletableFuture<JobManagerRunner> jobManagerRunnerFuture,
+		JobID jobId,
+		String jobName,
+		long initializationTimestamp,
+		SubmissionType submissionType) {
+		this.jobManagerRunnerFuture = jobManagerRunnerFuture;
+		this.jobId = jobId;
+		this.jobName = jobName;
+		this.initializationTimestamp = initializationTimestamp;
+		this.jobResultFuture = new CompletableFuture<>();
+
+		FutureUtils.assertNoException(this.jobManagerRunnerFuture.handle((jobManagerRunner, throwable) -> {
+			// JM has been initialized, or the initialization failed
+			synchronized (lock) {
+				if (throwable == null) {
+					// Forward result future
+					FutureUtils.forward(jobManagerRunner.getResultFuture(), jobResultFuture);
+
+					if (jobStatus == JobStatus.CANCELLING) {
+						log.info("Cancellation during initialization has been requested for job {}. Initialization completed, cancelling job.", jobId);
+
+						// cancel job
+						jobManagerRunner
+							.getJobMasterGateway()
+							.thenCompose(gw -> gw.cancel(RpcUtils.INF_TIMEOUT))
+							.whenComplete((ignored, cancelThrowable) -> {
+							if (cancelThrowable != null) {
+								log.warn("Cancellation of job {} failed", jobId, cancelThrowable);
+							}
+						});
+
+						// cancellation will eventually complete the jobResultFuture
+						jobResultFuture.whenComplete((archivedExecutionGraph, resultThrowable) -> {
+							synchronized (lock) {
+								if (resultThrowable == null) {
+									jobStatus = archivedExecutionGraph.getState();
+								} else {
+									jobStatus = JobStatus.FAILED;
+								}
+							}
+						});
+					} else {
+						jobStatus = JobStatus.RUNNING; // this status should never be exposed from the DispatcherJob. Only used internally for tracking running state
+					}
+				} else { // failure during initialization
+					if (submissionType == SubmissionType.RECOVERY) {
+						jobResultFuture.completeExceptionally(throwable);
+					} else {
+						jobResultFuture.complete(ArchivedExecutionGraph.createFromInitializingJob(
+							jobId,
+							jobName,
+							throwable,
+							JobStatus.FAILED,
+							initializationTimestamp));

Review comment:
       What `requestJobStatus` return in this case?

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraph.java
##########
@@ -351,4 +352,52 @@ public static ArchivedExecutionGraph createFrom(ExecutionGraph executionGraph) {
 			executionGraph.getCheckpointStatsSnapshot(),
 			executionGraph.getStateBackendName().orElse(null));
 	}
+
+	/**
+	 * Create an ArchivedExecutionGraph from an initializing job.
+	 */
+	public static ArchivedExecutionGraph createFromInitializingJob(
+		JobID jobId,
+		String jobName,
+		@Nullable Throwable throwable,
+		JobStatus finalJobStatus,

Review comment:
       Maybe it is just me, but I think the oder of the arguments are a bit arbitrary. I would expect that the `JobStatus` comes before a potential failure cause.

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherJob.java
##########
@@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.jobmaster.JobManagerRunner;
+import org.apache.flink.runtime.jobmaster.JobMasterGateway;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.messages.webmonitor.JobDetails;
+import org.apache.flink.runtime.rpc.RpcUtils;
+import org.apache.flink.util.AutoCloseableAsync;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.concurrent.GuardedBy;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Abstraction used by the {@link Dispatcher} to manage JobManagers, in
+ * particular during initialization.
+ * While a job is initializing, the JobMasterGateway is not available. A small subset
+ * of the methods of the JobMasterGateway necessary during initialization are provided
+ * by this class (job details, cancel).
+ */
+public final class DispatcherJob implements AutoCloseableAsync {
+
+	private final Logger log = LoggerFactory.getLogger(DispatcherJob.class);
+
+	private final CompletableFuture<JobManagerRunner> jobManagerRunnerFuture;
+	private final CompletableFuture<ArchivedExecutionGraph> jobResultFuture;
+
+	private final long initializationTimestamp;
+	private final JobID jobId;
+	private final String jobName;
+
+	private final Object lock = new Object();
+
+	// internal field to track job status during initialization. Is not updated anymore after
+	// job is initialized, cancelled or failed.
+	@GuardedBy("lock")
+	private JobStatus jobStatus = JobStatus.INITIALIZING;
+
+	private final CompletableFuture<Void> terminationFuture = new CompletableFuture<>();
+
+	private enum SubmissionType {
+		INITIAL, RECOVERY
+	}
+
+	static DispatcherJob createForSubmission(
+		CompletableFuture<JobManagerRunner> jobManagerRunnerFuture,
+		JobID jobId,
+		String jobName,
+		long initializationTimestamp) {
+		return new DispatcherJob(jobManagerRunnerFuture, jobId, jobName, initializationTimestamp, SubmissionType.INITIAL);
+	}
+
+	static DispatcherJob createForRecovery(
+		CompletableFuture<JobManagerRunner> jobManagerRunnerFuture,
+		JobID jobId,
+		String jobName,
+		long initializationTimestamp) {
+		return new DispatcherJob(jobManagerRunnerFuture, jobId, jobName, initializationTimestamp, SubmissionType.RECOVERY);
+	}
+
+	private DispatcherJob(
+		CompletableFuture<JobManagerRunner> jobManagerRunnerFuture,
+		JobID jobId,
+		String jobName,
+		long initializationTimestamp,
+		SubmissionType submissionType) {
+		this.jobManagerRunnerFuture = jobManagerRunnerFuture;
+		this.jobId = jobId;
+		this.jobName = jobName;
+		this.initializationTimestamp = initializationTimestamp;
+		this.jobResultFuture = new CompletableFuture<>();
+
+		FutureUtils.assertNoException(this.jobManagerRunnerFuture.handle((jobManagerRunner, throwable) -> {
+			// JM has been initialized, or the initialization failed
+			synchronized (lock) {
+				if (throwable == null) {
+					// Forward result future
+					FutureUtils.forward(jobManagerRunner.getResultFuture(), jobResultFuture);
+
+					if (jobStatus == JobStatus.CANCELLING) {
+						log.info("Cancellation during initialization has been requested for job {}. Initialization completed, cancelling job.", jobId);
+
+						// cancel job
+						jobManagerRunner
+							.getJobMasterGateway()
+							.thenCompose(gw -> gw.cancel(RpcUtils.INF_TIMEOUT))
+							.whenComplete((ignored, cancelThrowable) -> {
+							if (cancelThrowable != null) {
+								log.warn("Cancellation of job {} failed", jobId, cancelThrowable);
+							}
+						});

Review comment:
       Why is it beneficial to wait for the `JobManagerRunner` to complete instead of enqueuing this operation when `cancel` is being called?

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherJob.java
##########
@@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.jobmaster.JobManagerRunner;
+import org.apache.flink.runtime.jobmaster.JobMasterGateway;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.messages.webmonitor.JobDetails;
+import org.apache.flink.runtime.rpc.RpcUtils;
+import org.apache.flink.util.AutoCloseableAsync;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.concurrent.GuardedBy;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Abstraction used by the {@link Dispatcher} to manage JobManagers, in
+ * particular during initialization.
+ * While a job is initializing, the JobMasterGateway is not available. A small subset
+ * of the methods of the JobMasterGateway necessary during initialization are provided
+ * by this class (job details, cancel).
+ */
+public final class DispatcherJob implements AutoCloseableAsync {
+
+	private final Logger log = LoggerFactory.getLogger(DispatcherJob.class);
+
+	private final CompletableFuture<JobManagerRunner> jobManagerRunnerFuture;
+	private final CompletableFuture<ArchivedExecutionGraph> jobResultFuture;
+
+	private final long initializationTimestamp;
+	private final JobID jobId;
+	private final String jobName;
+
+	private final Object lock = new Object();
+
+	// internal field to track job status during initialization. Is not updated anymore after
+	// job is initialized, cancelled or failed.
+	@GuardedBy("lock")
+	private JobStatus jobStatus = JobStatus.INITIALIZING;
+
+	private final CompletableFuture<Void> terminationFuture = new CompletableFuture<>();
+
+	private enum SubmissionType {
+		INITIAL, RECOVERY
+	}
+
+	static DispatcherJob createForSubmission(
+		CompletableFuture<JobManagerRunner> jobManagerRunnerFuture,
+		JobID jobId,
+		String jobName,
+		long initializationTimestamp) {
+		return new DispatcherJob(jobManagerRunnerFuture, jobId, jobName, initializationTimestamp, SubmissionType.INITIAL);
+	}
+
+	static DispatcherJob createForRecovery(
+		CompletableFuture<JobManagerRunner> jobManagerRunnerFuture,
+		JobID jobId,
+		String jobName,
+		long initializationTimestamp) {
+		return new DispatcherJob(jobManagerRunnerFuture, jobId, jobName, initializationTimestamp, SubmissionType.RECOVERY);
+	}
+
+	private DispatcherJob(
+		CompletableFuture<JobManagerRunner> jobManagerRunnerFuture,
+		JobID jobId,
+		String jobName,
+		long initializationTimestamp,
+		SubmissionType submissionType) {
+		this.jobManagerRunnerFuture = jobManagerRunnerFuture;
+		this.jobId = jobId;
+		this.jobName = jobName;
+		this.initializationTimestamp = initializationTimestamp;
+		this.jobResultFuture = new CompletableFuture<>();
+
+		FutureUtils.assertNoException(this.jobManagerRunnerFuture.handle((jobManagerRunner, throwable) -> {
+			// JM has been initialized, or the initialization failed
+			synchronized (lock) {
+				if (throwable == null) {
+					// Forward result future
+					FutureUtils.forward(jobManagerRunner.getResultFuture(), jobResultFuture);
+
+					if (jobStatus == JobStatus.CANCELLING) {
+						log.info("Cancellation during initialization has been requested for job {}. Initialization completed, cancelling job.", jobId);
+
+						// cancel job
+						jobManagerRunner
+							.getJobMasterGateway()
+							.thenCompose(gw -> gw.cancel(RpcUtils.INF_TIMEOUT))
+							.whenComplete((ignored, cancelThrowable) -> {
+							if (cancelThrowable != null) {
+								log.warn("Cancellation of job {} failed", jobId, cancelThrowable);
+							}
+						});
+
+						// cancellation will eventually complete the jobResultFuture
+						jobResultFuture.whenComplete((archivedExecutionGraph, resultThrowable) -> {
+							synchronized (lock) {
+								if (resultThrowable == null) {
+									jobStatus = archivedExecutionGraph.getState();

Review comment:
       It seems a bit inconsistent to set the `jobStatus` to the terminal state only in the case where we cancel the job during the `initialization` phase.

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraph.java
##########
@@ -351,4 +352,52 @@ public static ArchivedExecutionGraph createFrom(ExecutionGraph executionGraph) {
 			executionGraph.getCheckpointStatsSnapshot(),
 			executionGraph.getStateBackendName().orElse(null));
 	}
+
+	/**
+	 * Create an ArchivedExecutionGraph from an initializing job.
+	 */

Review comment:
       Please add proper JavaDocs here

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherJob.java
##########
@@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.jobmaster.JobManagerRunner;
+import org.apache.flink.runtime.jobmaster.JobMasterGateway;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.messages.webmonitor.JobDetails;
+import org.apache.flink.runtime.rpc.RpcUtils;
+import org.apache.flink.util.AutoCloseableAsync;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.concurrent.GuardedBy;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Abstraction used by the {@link Dispatcher} to manage JobManagers, in
+ * particular during initialization.
+ * While a job is initializing, the JobMasterGateway is not available. A small subset
+ * of the methods of the JobMasterGateway necessary during initialization are provided
+ * by this class (job details, cancel).
+ */
+public final class DispatcherJob implements AutoCloseableAsync {
+
+	private final Logger log = LoggerFactory.getLogger(DispatcherJob.class);
+
+	private final CompletableFuture<JobManagerRunner> jobManagerRunnerFuture;
+	private final CompletableFuture<ArchivedExecutionGraph> jobResultFuture;
+
+	private final long initializationTimestamp;
+	private final JobID jobId;
+	private final String jobName;
+
+	private final Object lock = new Object();
+
+	// internal field to track job status during initialization. Is not updated anymore after
+	// job is initialized, cancelled or failed.
+	@GuardedBy("lock")
+	private JobStatus jobStatus = JobStatus.INITIALIZING;
+
+	private final CompletableFuture<Void> terminationFuture = new CompletableFuture<>();
+
+	private enum SubmissionType {
+		INITIAL, RECOVERY
+	}
+
+	static DispatcherJob createForSubmission(
+		CompletableFuture<JobManagerRunner> jobManagerRunnerFuture,
+		JobID jobId,
+		String jobName,
+		long initializationTimestamp) {
+		return new DispatcherJob(jobManagerRunnerFuture, jobId, jobName, initializationTimestamp, SubmissionType.INITIAL);
+	}
+
+	static DispatcherJob createForRecovery(
+		CompletableFuture<JobManagerRunner> jobManagerRunnerFuture,
+		JobID jobId,
+		String jobName,
+		long initializationTimestamp) {
+		return new DispatcherJob(jobManagerRunnerFuture, jobId, jobName, initializationTimestamp, SubmissionType.RECOVERY);
+	}
+
+	private DispatcherJob(
+		CompletableFuture<JobManagerRunner> jobManagerRunnerFuture,
+		JobID jobId,
+		String jobName,
+		long initializationTimestamp,
+		SubmissionType submissionType) {
+		this.jobManagerRunnerFuture = jobManagerRunnerFuture;
+		this.jobId = jobId;
+		this.jobName = jobName;
+		this.initializationTimestamp = initializationTimestamp;
+		this.jobResultFuture = new CompletableFuture<>();
+
+		FutureUtils.assertNoException(this.jobManagerRunnerFuture.handle((jobManagerRunner, throwable) -> {
+			// JM has been initialized, or the initialization failed
+			synchronized (lock) {
+				if (throwable == null) {
+					// Forward result future
+					FutureUtils.forward(jobManagerRunner.getResultFuture(), jobResultFuture);
+
+					if (jobStatus == JobStatus.CANCELLING) {
+						log.info("Cancellation during initialization has been requested for job {}. Initialization completed, cancelling job.", jobId);
+
+						// cancel job
+						jobManagerRunner
+							.getJobMasterGateway()
+							.thenCompose(gw -> gw.cancel(RpcUtils.INF_TIMEOUT))
+							.whenComplete((ignored, cancelThrowable) -> {
+							if (cancelThrowable != null) {
+								log.warn("Cancellation of job {} failed", jobId, cancelThrowable);
+							}
+						});
+
+						// cancellation will eventually complete the jobResultFuture
+						jobResultFuture.whenComplete((archivedExecutionGraph, resultThrowable) -> {
+							synchronized (lock) {
+								if (resultThrowable == null) {
+									jobStatus = archivedExecutionGraph.getState();
+								} else {
+									jobStatus = JobStatus.FAILED;
+								}
+							}
+						});
+					} else {
+						jobStatus = JobStatus.RUNNING; // this status should never be exposed from the DispatcherJob. Only used internally for tracking running state
+					}
+				} else { // failure during initialization
+					if (submissionType == SubmissionType.RECOVERY) {
+						jobResultFuture.completeExceptionally(throwable);
+					} else {
+						jobResultFuture.complete(ArchivedExecutionGraph.createFromInitializingJob(
+							jobId,
+							jobName,
+							throwable,
+							JobStatus.FAILED,
+							initializationTimestamp));
+					}
+				}
+			}
+			return null;
+		}));
+	}
+
+	public CompletableFuture<ArchivedExecutionGraph> getResultFuture() {
+		return jobResultFuture;
+	}
+
+	public CompletableFuture<JobDetails> requestJobDetails(Time timeout) {
+		synchronized (lock) {
+			if (isRunning()) {
+				return getJobMasterGateway().thenCompose(jobMasterGateway -> jobMasterGateway.requestJobDetails(
+					timeout));
+			} else {
+				int[] tasksPerState = new int[ExecutionState.values().length];
+				return CompletableFuture.completedFuture(new JobDetails(
+					jobId,
+					jobName,
+					initializationTimestamp,
+					0,
+					0,
+					jobStatus,
+					0,
+					tasksPerState,
+					0));
+			}
+		}
+	}
+
+	public CompletableFuture<Acknowledge> cancel(Time timeout) {
+		synchronized (lock) {
+			if (isRunning()) {
+				return getJobMasterGateway().thenCompose(jobMasterGateway -> jobMasterGateway.cancel(timeout));
+			} else {
+				log.info("Cancellation during initialization requested for job {}. Job will be cancelled once JobManager has been initialized.", jobId);
+				jobStatus = JobStatus.CANCELLING;
+				return jobResultFuture.thenApply(ignored -> Acknowledge.get());
+			}
+		}
+	}
+
+	public CompletableFuture<JobStatus> requestJobStatus(Time timeout) {
+		synchronized (lock) {
+			if (isRunning()) {
+				return getJobMasterGateway().thenCompose(jobMasterGateway -> jobMasterGateway.requestJobStatus(timeout));
+			} else {
+				return CompletableFuture.completedFuture(jobStatus);
+			}
+		}
+	}
+
+	public CompletableFuture<ArchivedExecutionGraph> requestJob(Time timeout) {
+		synchronized (lock) {
+			if (isRunning()) {
+				return getJobMasterGateway().thenCompose(jobMasterGateway -> jobMasterGateway.requestJob(timeout));
+			} else {
+				return CompletableFuture.supplyAsync(() -> ArchivedExecutionGraph.createFromInitializingJob(jobId, jobName, null, jobStatus, initializationTimestamp));
+			}
+		}
+	}
+
+	public boolean isRunning() {
+		synchronized (lock) {
+			return jobStatus == JobStatus.RUNNING;
+		}
+	}
+
+	public CompletableFuture<JobMasterGateway> getJobMasterGateway() {

Review comment:
       Please add a description of the contract of this method (e.g. that it must be `running` otherwise it fails with an `IllegalStateException`.

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraph.java
##########
@@ -351,4 +352,52 @@ public static ArchivedExecutionGraph createFrom(ExecutionGraph executionGraph) {
 			executionGraph.getCheckpointStatsSnapshot(),
 			executionGraph.getStateBackendName().orElse(null));
 	}
+
+	/**
+	 * Create an ArchivedExecutionGraph from an initializing job.
+	 */
+	public static ArchivedExecutionGraph createFromInitializingJob(
+		JobID jobId,
+		String jobName,
+		@Nullable Throwable throwable,
+		JobStatus finalJobStatus,
+		long initializationTimestamp) {
+
+		long failureTime = System.currentTimeMillis();
+		Map<JobVertexID, ArchivedExecutionJobVertex> archivedTasks = Collections.emptyMap();
+		List<ArchivedExecutionJobVertex> archivedVerticesInCreationOrder = Collections.emptyList();
+		final Map<String, SerializedValue<OptionalFailure<Object>>> serializedUserAccumulators = Collections.emptyMap();
+		StringifiedAccumulatorResult[] archivedUserAccumulators = new StringifiedAccumulatorResult[]{};
+
+		final long[] timestamps = new long[JobStatus.values().length];
+		timestamps[JobStatus.CREATED.ordinal()] = initializationTimestamp;

Review comment:
       Why is this correct?

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherJob.java
##########
@@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.jobmaster.JobManagerRunner;
+import org.apache.flink.runtime.jobmaster.JobMasterGateway;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.messages.webmonitor.JobDetails;
+import org.apache.flink.runtime.rpc.RpcUtils;
+import org.apache.flink.util.AutoCloseableAsync;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.concurrent.GuardedBy;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Abstraction used by the {@link Dispatcher} to manage JobManagers, in
+ * particular during initialization.
+ * While a job is initializing, the JobMasterGateway is not available. A small subset
+ * of the methods of the JobMasterGateway necessary during initialization are provided
+ * by this class (job details, cancel).
+ */
+public final class DispatcherJob implements AutoCloseableAsync {
+
+	private final Logger log = LoggerFactory.getLogger(DispatcherJob.class);
+
+	private final CompletableFuture<JobManagerRunner> jobManagerRunnerFuture;
+	private final CompletableFuture<ArchivedExecutionGraph> jobResultFuture;
+
+	private final long initializationTimestamp;
+	private final JobID jobId;
+	private final String jobName;
+
+	private final Object lock = new Object();
+
+	// internal field to track job status during initialization. Is not updated anymore after
+	// job is initialized, cancelled or failed.
+	@GuardedBy("lock")
+	private JobStatus jobStatus = JobStatus.INITIALIZING;
+
+	private final CompletableFuture<Void> terminationFuture = new CompletableFuture<>();
+
+	private enum SubmissionType {
+		INITIAL, RECOVERY
+	}
+
+	static DispatcherJob createForSubmission(
+		CompletableFuture<JobManagerRunner> jobManagerRunnerFuture,
+		JobID jobId,
+		String jobName,
+		long initializationTimestamp) {
+		return new DispatcherJob(jobManagerRunnerFuture, jobId, jobName, initializationTimestamp, SubmissionType.INITIAL);
+	}
+
+	static DispatcherJob createForRecovery(
+		CompletableFuture<JobManagerRunner> jobManagerRunnerFuture,
+		JobID jobId,
+		String jobName,
+		long initializationTimestamp) {
+		return new DispatcherJob(jobManagerRunnerFuture, jobId, jobName, initializationTimestamp, SubmissionType.RECOVERY);
+	}
+
+	private DispatcherJob(
+		CompletableFuture<JobManagerRunner> jobManagerRunnerFuture,
+		JobID jobId,
+		String jobName,
+		long initializationTimestamp,
+		SubmissionType submissionType) {
+		this.jobManagerRunnerFuture = jobManagerRunnerFuture;
+		this.jobId = jobId;
+		this.jobName = jobName;
+		this.initializationTimestamp = initializationTimestamp;
+		this.jobResultFuture = new CompletableFuture<>();
+
+		FutureUtils.assertNoException(this.jobManagerRunnerFuture.handle((jobManagerRunner, throwable) -> {
+			// JM has been initialized, or the initialization failed
+			synchronized (lock) {
+				if (throwable == null) {
+					// Forward result future
+					FutureUtils.forward(jobManagerRunner.getResultFuture(), jobResultFuture);
+
+					if (jobStatus == JobStatus.CANCELLING) {
+						log.info("Cancellation during initialization has been requested for job {}. Initialization completed, cancelling job.", jobId);
+
+						// cancel job
+						jobManagerRunner
+							.getJobMasterGateway()
+							.thenCompose(gw -> gw.cancel(RpcUtils.INF_TIMEOUT))
+							.whenComplete((ignored, cancelThrowable) -> {
+							if (cancelThrowable != null) {
+								log.warn("Cancellation of job {} failed", jobId, cancelThrowable);
+							}
+						});
+
+						// cancellation will eventually complete the jobResultFuture
+						jobResultFuture.whenComplete((archivedExecutionGraph, resultThrowable) -> {
+							synchronized (lock) {
+								if (resultThrowable == null) {
+									jobStatus = archivedExecutionGraph.getState();
+								} else {
+									jobStatus = JobStatus.FAILED;
+								}
+							}
+						});
+					} else {
+						jobStatus = JobStatus.RUNNING; // this status should never be exposed from the DispatcherJob. Only used internally for tracking running state
+					}
+				} else { // failure during initialization
+					if (submissionType == SubmissionType.RECOVERY) {
+						jobResultFuture.completeExceptionally(throwable);
+					} else {
+						jobResultFuture.complete(ArchivedExecutionGraph.createFromInitializingJob(
+							jobId,
+							jobName,
+							throwable,
+							JobStatus.FAILED,
+							initializationTimestamp));
+					}
+				}
+			}
+			return null;
+		}));
+	}
+
+	public CompletableFuture<ArchivedExecutionGraph> getResultFuture() {
+		return jobResultFuture;
+	}
+
+	public CompletableFuture<JobDetails> requestJobDetails(Time timeout) {
+		synchronized (lock) {
+			if (isRunning()) {
+				return getJobMasterGateway().thenCompose(jobMasterGateway -> jobMasterGateway.requestJobDetails(
+					timeout));
+			} else {
+				int[] tasksPerState = new int[ExecutionState.values().length];
+				return CompletableFuture.completedFuture(new JobDetails(
+					jobId,
+					jobName,
+					initializationTimestamp,
+					0,
+					0,
+					jobStatus,
+					0,
+					tasksPerState,
+					0));
+			}
+		}
+	}
+
+	public CompletableFuture<Acknowledge> cancel(Time timeout) {
+		synchronized (lock) {
+			if (isRunning()) {
+				return getJobMasterGateway().thenCompose(jobMasterGateway -> jobMasterGateway.cancel(timeout));
+			} else {
+				log.info("Cancellation during initialization requested for job {}. Job will be cancelled once JobManager has been initialized.", jobId);
+				jobStatus = JobStatus.CANCELLING;
+				return jobResultFuture.thenApply(ignored -> Acknowledge.get());
+			}
+		}
+	}
+
+	public CompletableFuture<JobStatus> requestJobStatus(Time timeout) {
+		synchronized (lock) {
+			if (isRunning()) {
+				return getJobMasterGateway().thenCompose(jobMasterGateway -> jobMasterGateway.requestJobStatus(timeout));
+			} else {
+				return CompletableFuture.completedFuture(jobStatus);
+			}
+		}
+	}
+
+	public CompletableFuture<ArchivedExecutionGraph> requestJob(Time timeout) {
+		synchronized (lock) {
+			if (isRunning()) {
+				return getJobMasterGateway().thenCompose(jobMasterGateway -> jobMasterGateway.requestJob(timeout));
+			} else {
+				return CompletableFuture.supplyAsync(() -> ArchivedExecutionGraph.createFromInitializingJob(jobId, jobName, null, jobStatus, initializationTimestamp));
+			}
+		}
+	}
+
+	public boolean isRunning() {
+		synchronized (lock) {
+			return jobStatus == JobStatus.RUNNING;
+		}
+	}
+
+	public CompletableFuture<JobMasterGateway> getJobMasterGateway() {
+		Preconditions.checkState(
+			isRunning(),
+			"JobMaster Gateway is not available during initialization");
+		try {
+			return jobManagerRunnerFuture.thenCompose(JobManagerRunner::getJobMasterGateway);
+		} catch (Throwable e) {
+			throw new IllegalStateException("JobMaster gateway is not available", e);
+		}
+	}
+
+	@Override
+	public CompletableFuture<Void> closeAsync() {
+		jobManagerRunnerFuture.handle((runner, throwable) -> {
+			if (throwable == null) {
+				// init was successful: close jobManager runner.
+				CompletableFuture<Void> jobManagerRunnerClose = jobManagerRunnerFuture.thenCompose(
+					AutoCloseableAsync::closeAsync);
+				FutureUtils.forward(jobManagerRunnerClose, terminationFuture);
+			} else {
+				// initialization has failed. Termination complete.

Review comment:
       Why are we not forwarding the failure to the `terminationFuture`?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org