You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by rm...@apache.org on 2020/09/06 12:00:53 UTC

[flink] branch master updated: [FLINK-16866] Make jobsubmission non-blocking

This is an automated email from the ASF dual-hosted git repository.

rmetzger pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 65ed039  [FLINK-16866] Make jobsubmission non-blocking
65ed039 is described below

commit 65ed03936a736190f7ffcb6ba0709a99ffcb98fa
Author: Robert Metzger <rm...@apache.org>
AuthorDate: Wed Jul 29 10:42:32 2020 +0200

    [FLINK-16866] Make jobsubmission non-blocking
    
    This closes #13217
---
 .../java/org/apache/flink/client/ClientUtils.java  |  47 +++
 .../application/executors/EmbeddedExecutor.java    |  14 +-
 .../executors/AbstractJobClusterExecutor.java      |   2 +-
 .../executors/AbstractSessionClusterExecutor.java  |  11 +-
 .../client/deployment/executors/LocalExecutor.java |   4 +-
 .../client/program/PerJobMiniClusterFactory.java   |  10 +-
 .../org/apache/flink/client/ClientUtilsTest.java   | 127 +++++++
 .../apache/flink/client/program/ClientTest.java    |   2 +-
 .../program/PerJobMiniClusterFactoryTest.java      |  18 +-
 .../org/apache/flink/api/common/JobStatus.java     |   7 +-
 .../flink/core/execution/PipelineExecutor.java     |   3 +-
 .../flink/api/java/ExecutionEnvironment.java       |   2 +-
 .../java/ExecutorDiscoveryAndJobClientTest.java    |   2 +-
 .../web-dashboard/src/app/app.config.ts            |   3 +-
 .../src/app/pages/job/job.component.html           |  20 +-
 .../src/app/pages/job/job.component.ts             |  42 ++-
 .../web-dashboard/src/app/services/job.service.ts  |   3 +-
 .../customize/job-list/job-list.component.html     |   2 +-
 .../share/customize/job-list/job-list.component.ts |  10 +-
 .../runtime/client/JobInitializationException.java |  32 +-
 .../flink/runtime/dispatcher/Dispatcher.java       | 368 +++++++++------------
 .../flink/runtime/dispatcher/DispatcherJob.java    | 252 ++++++++++++++
 .../runtime/dispatcher/DispatcherJobResult.java    |  66 ++++
 .../UnavailableDispatcherOperationException.java   |  29 +-
 .../executiongraph/ArchivedExecutionGraph.java     |  49 +++
 .../runtime/dispatcher/DispatcherJobTest.java      | 337 +++++++++++++++++++
 .../dispatcher/DispatcherResourceCleanupTest.java  |  19 +-
 .../flink/runtime/dispatcher/DispatcherTest.java   | 262 +++++++++++++--
 .../dispatcher/TestingJobManagerRunnerFactory.java |   5 +-
 .../runtime/jobmanager/BlobsCleanupITCase.java     |  25 +-
 .../runtime/jobmaster/TestingJobManagerRunner.java |  61 +++-
 .../LeaderChangeClusterComponentsTest.java         |   3 +
 .../runtime/taskexecutor/TaskExecutorITCase.java   |   3 +-
 .../flink/runtime/testutils/CommonTestUtils.java   |  14 +
 .../runtime/webmonitor/TestingRestfulGateway.java  |   2 +-
 .../environment/StreamExecutionEnvironment.java    |   2 +-
 .../ExecutorDiscoveryAndJobClientTest.java         |   2 +-
 .../table/client/gateway/local/LocalExecutor.java  |   4 +-
 .../client/gateway/local/ProgramDeployer.java      |   7 +-
 .../table/api/internal/BatchTableEnvImpl.scala     |   3 +-
 .../flink/test/checkpointing/SavepointITCase.java  |   3 +
 .../test/example/client/LocalExecutorITCase.java   |   4 +-
 .../example/failing/JobSubmissionFailsITCase.java  |  24 +-
 .../environment/RemoteStreamEnvironmentTest.java   |   2 +-
 .../test/streaming/runtime/BackPressureITCase.java |   5 +
 .../test/streaming/runtime/TimestampITCase.java    |   3 +-
 .../java/org/apache/flink/test/util/TestUtils.java |   9 +
 .../flink/yarn/YARNHighAvailabilityITCase.java     |   3 +-
 48 files changed, 1533 insertions(+), 394 deletions(-)

diff --git a/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java b/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java
index d7b069f..1991389 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java
@@ -18,15 +18,23 @@
 
 package org.apache.flink.client;
 
+import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.client.program.ContextEnvironment;
 import org.apache.flink.client.program.PackagedProgram;
 import org.apache.flink.client.program.ProgramInvocationException;
 import org.apache.flink.client.program.StreamContextEnvironment;
+import org.apache.flink.client.program.rest.retry.ExponentialWaitStrategy;
+import org.apache.flink.client.program.rest.retry.WaitStrategy;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.DeploymentOptions;
 import org.apache.flink.core.execution.PipelineExecutorServiceLoader;
+import org.apache.flink.runtime.client.JobInitializationException;
 import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders;
+import org.apache.flink.runtime.jobmaster.JobResult;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.SerializedThrowable;
+import org.apache.flink.util.function.SupplierWithException;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -34,6 +42,7 @@ import org.slf4j.LoggerFactory;
 import java.net.URL;
 import java.net.URLClassLoader;
 import java.util.List;
+import java.util.Optional;
 
 import static org.apache.flink.util.FlinkUserCodeClassLoader.NOOP_EXCEPTION_HANDLER;
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -111,4 +120,42 @@ public enum ClientUtils {
 			Thread.currentThread().setContextClassLoader(contextClassLoader);
 		}
 	}
+
+	/**
+	 * This method blocks until the job status is not INITIALIZING anymore.
+	 * @param jobStatusSupplier supplier returning the job status.
+	 * @param jobResultSupplier supplier returning the job result. This will only be called if the job reaches the FAILED state.
+	 * @throws JobInitializationException If the initialization failed
+	 */
+	public static void waitUntilJobInitializationFinished(
+				SupplierWithException<JobStatus, Exception> jobStatusSupplier,
+				SupplierWithException<JobResult, Exception> jobResultSupplier,
+				ClassLoader userCodeClassloader)
+			throws JobInitializationException {
+		LOG.debug("Wait until job initialization is finished");
+		WaitStrategy waitStrategy = new ExponentialWaitStrategy(50, 2000);
+		try {
+			JobStatus status = jobStatusSupplier.get();
+			long attempt = 0;
+			while (status == JobStatus.INITIALIZING) {
+				Thread.sleep(waitStrategy.sleepTime(attempt++));
+				status = jobStatusSupplier.get();
+			}
+			if (status == JobStatus.FAILED) {
+				JobResult result = jobResultSupplier.get();
+				Optional<SerializedThrowable> throwable = result.getSerializedThrowable();
+				if (throwable.isPresent()) {
+					Throwable t = throwable.get().deserializeError(userCodeClassloader);
+					if (t instanceof JobInitializationException) {
+						throw t;
+					}
+				}
+			}
+		} catch (JobInitializationException initializationException) {
+			throw initializationException;
+		} catch (Throwable throwable) {
+			ExceptionUtils.checkInterrupted(throwable);
+			throw new RuntimeException("Error while waiting for job to be initialized", throwable);
+		}
+	}
 }
diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/application/executors/EmbeddedExecutor.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/application/executors/EmbeddedExecutor.java
index aa2db40..ccb24d4 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/deployment/application/executors/EmbeddedExecutor.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/application/executors/EmbeddedExecutor.java
@@ -33,6 +33,7 @@ import org.apache.flink.runtime.client.ClientUtils;
 import org.apache.flink.runtime.dispatcher.DispatcherGateway;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.function.FunctionUtils;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -82,7 +83,7 @@ public class EmbeddedExecutor implements PipelineExecutor {
 	}
 
 	@Override
-	public CompletableFuture<JobClient> execute(final Pipeline pipeline, final Configuration configuration) throws MalformedURLException {
+	public CompletableFuture<JobClient> execute(final Pipeline pipeline, final Configuration configuration, ClassLoader userCodeClassloader) throws MalformedURLException {
 		checkNotNull(pipeline);
 		checkNotNull(configuration);
 
@@ -94,7 +95,7 @@ public class EmbeddedExecutor implements PipelineExecutor {
 			return getJobClientFuture(optJobId.get());
 		}
 
-		return submitAndGetJobClientFuture(pipeline, configuration);
+		return submitAndGetJobClientFuture(pipeline, configuration, userCodeClassloader);
 	}
 
 	private CompletableFuture<JobClient> getJobClientFuture(final JobID jobId) {
@@ -102,7 +103,7 @@ public class EmbeddedExecutor implements PipelineExecutor {
 		return CompletableFuture.completedFuture(jobClientCreator.getJobClient(jobId));
 	}
 
-	private CompletableFuture<JobClient> submitAndGetJobClientFuture(final Pipeline pipeline, final Configuration configuration) throws MalformedURLException {
+	private CompletableFuture<JobClient> submitAndGetJobClientFuture(final Pipeline pipeline, final Configuration configuration, final ClassLoader userCodeClassloader) throws MalformedURLException {
 		final Time timeout = Time.milliseconds(configuration.get(ClientOptions.CLIENT_TIMEOUT).toMillis());
 
 		final JobGraph jobGraph = PipelineExecutorUtils.getJobGraph(pipeline, configuration);
@@ -122,6 +123,13 @@ public class EmbeddedExecutor implements PipelineExecutor {
 				timeout);
 
 		return jobSubmissionFuture
+				.thenApplyAsync(FunctionUtils.uncheckedFunction(jobId -> {
+					org.apache.flink.client.ClientUtils.waitUntilJobInitializationFinished(
+						() -> dispatcherGateway.requestJobStatus(jobId, timeout).get(),
+						() -> dispatcherGateway.requestJobResult(jobId, timeout).get(),
+						userCodeClassloader);
+					return jobId;
+				}))
 				.thenApplyAsync(jobID -> jobClientCreator.getJobClient(actualJobId));
 	}
 
diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/AbstractJobClusterExecutor.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/AbstractJobClusterExecutor.java
index 60254f9..d4a51ac 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/AbstractJobClusterExecutor.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/AbstractJobClusterExecutor.java
@@ -58,7 +58,7 @@ public class AbstractJobClusterExecutor<ClusterID, ClientFactory extends Cluster
 	}
 
 	@Override
-	public CompletableFuture<JobClient> execute(@Nonnull final Pipeline pipeline, @Nonnull final Configuration configuration) throws Exception {
+	public CompletableFuture<JobClient> execute(@Nonnull final Pipeline pipeline, @Nonnull final Configuration configuration, @Nonnull final ClassLoader userCodeClassloader) throws Exception {
 		final JobGraph jobGraph = PipelineExecutorUtils.getJobGraph(pipeline, configuration);
 
 		try (final ClusterDescriptor<ClusterID> clusterDescriptor = clusterClientFactory.createClusterDescriptor(configuration)) {
diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/AbstractSessionClusterExecutor.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/AbstractSessionClusterExecutor.java
index 6445fb6..094a1b2 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/AbstractSessionClusterExecutor.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/AbstractSessionClusterExecutor.java
@@ -20,6 +20,7 @@ package org.apache.flink.client.deployment.executors;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.dag.Pipeline;
+import org.apache.flink.client.ClientUtils;
 import org.apache.flink.client.deployment.ClusterClientFactory;
 import org.apache.flink.client.deployment.ClusterClientJobClientAdapter;
 import org.apache.flink.client.deployment.ClusterDescriptor;
@@ -29,6 +30,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.execution.JobClient;
 import org.apache.flink.core.execution.PipelineExecutor;
 import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.util.function.FunctionUtils;
 
 import javax.annotation.Nonnull;
 
@@ -53,7 +55,7 @@ public class AbstractSessionClusterExecutor<ClusterID, ClientFactory extends Clu
 	}
 
 	@Override
-	public CompletableFuture<JobClient> execute(@Nonnull final Pipeline pipeline, @Nonnull final Configuration configuration) throws Exception {
+	public CompletableFuture<JobClient> execute(@Nonnull final Pipeline pipeline, @Nonnull final Configuration configuration, @Nonnull final ClassLoader userCodeClassloader) throws Exception {
 		final JobGraph jobGraph = PipelineExecutorUtils.getJobGraph(pipeline, configuration);
 
 		try (final ClusterDescriptor<ClusterID> clusterDescriptor = clusterClientFactory.createClusterDescriptor(configuration)) {
@@ -64,6 +66,13 @@ public class AbstractSessionClusterExecutor<ClusterID, ClientFactory extends Clu
 			ClusterClient<ClusterID> clusterClient = clusterClientProvider.getClusterClient();
 			return clusterClient
 					.submitJob(jobGraph)
+					.thenApplyAsync(FunctionUtils.uncheckedFunction(jobId -> {
+						ClientUtils.waitUntilJobInitializationFinished(
+							() -> clusterClient.getJobStatus(jobId).get(),
+							() -> clusterClient.requestJobResult(jobId).get(),
+							userCodeClassloader);
+						return jobId;
+					}))
 					.thenApplyAsync(jobID -> (JobClient) new ClusterClientJobClientAdapter<>(
 							clusterClientProvider,
 							jobID))
diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/LocalExecutor.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/LocalExecutor.java
index a64f030..578265d 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/LocalExecutor.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/LocalExecutor.java
@@ -65,7 +65,7 @@ public class LocalExecutor implements PipelineExecutor {
 	}
 
 	@Override
-	public CompletableFuture<JobClient> execute(Pipeline pipeline, Configuration configuration) throws Exception {
+	public CompletableFuture<JobClient> execute(Pipeline pipeline, Configuration configuration, ClassLoader userCodeClassloader) throws Exception {
 		checkNotNull(pipeline);
 		checkNotNull(configuration);
 
@@ -78,7 +78,7 @@ public class LocalExecutor implements PipelineExecutor {
 
 		final JobGraph jobGraph = getJobGraph(pipeline, effectiveConfig);
 
-		return PerJobMiniClusterFactory.createWithFactory(effectiveConfig, miniClusterFactory).submitJob(jobGraph);
+		return PerJobMiniClusterFactory.createWithFactory(effectiveConfig, miniClusterFactory).submitJob(jobGraph, userCodeClassloader);
 	}
 
 	private JobGraph getJobGraph(Pipeline pipeline, Configuration configuration) throws MalformedURLException {
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/PerJobMiniClusterFactory.java b/flink-clients/src/main/java/org/apache/flink/client/program/PerJobMiniClusterFactory.java
index 16ca6b6..f5a793d 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/PerJobMiniClusterFactory.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/PerJobMiniClusterFactory.java
@@ -38,6 +38,7 @@ import org.apache.flink.runtime.operators.coordination.CoordinationRequestGatewa
 import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
 import org.apache.flink.util.MathUtils;
 import org.apache.flink.util.SerializedValue;
+import org.apache.flink.util.function.FunctionUtils;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -81,13 +82,20 @@ public final class PerJobMiniClusterFactory {
 	/**
 	 * Starts a {@link MiniCluster} and submits a job.
 	 */
-	public CompletableFuture<JobClient> submitJob(JobGraph jobGraph) throws Exception {
+	public CompletableFuture<JobClient> submitJob(JobGraph jobGraph, ClassLoader userCodeClassloader) throws Exception {
 		MiniClusterConfiguration miniClusterConfig = getMiniClusterConfig(jobGraph.getMaximumParallelism());
 		MiniCluster miniCluster = miniClusterFactory.apply(miniClusterConfig);
 		miniCluster.start();
 
 		return miniCluster
 			.submitJob(jobGraph)
+			.thenApplyAsync(FunctionUtils.uncheckedFunction(submissionResult -> {
+				org.apache.flink.client.ClientUtils.waitUntilJobInitializationFinished(
+					() -> miniCluster.getJobStatus(submissionResult.getJobID()).get(),
+					() -> miniCluster.requestJobResult(submissionResult.getJobID()).get(),
+					userCodeClassloader);
+				return submissionResult;
+			}))
 			.thenApply(result -> new PerJobMiniClusterJobClient(result.getJobID(), miniCluster))
 			.whenComplete((ignored, throwable) -> {
 				if (throwable != null) {
diff --git a/flink-clients/src/test/java/org/apache/flink/client/ClientUtilsTest.java b/flink-clients/src/test/java/org/apache/flink/client/ClientUtilsTest.java
new file mode 100644
index 0000000..056e181
--- /dev/null
+++ b/flink-clients/src/test/java/org/apache/flink/client/ClientUtilsTest.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.core.testutils.CommonTestUtils;
+import org.apache.flink.runtime.client.JobInitializationException;
+import org.apache.flink.runtime.jobmaster.JobResult;
+import org.apache.flink.util.SerializedThrowable;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Iterator;
+
+/**
+ * Test for the ClientUtils.
+ */
+public class ClientUtilsTest extends TestLogger {
+
+	private static final JobID TESTING_JOB_ID = new JobID();
+
+	/**
+	 * Ensure that the waitUntilJobInitializationFinished() method throws JobInitializationException.
+	 */
+	@Test
+	public void testWaitUntilJobInitializationFinished_throwsInitializationException() {
+		Iterator<JobStatus> statusSequenceIterator = Arrays.asList(
+			JobStatus.INITIALIZING,
+			JobStatus.INITIALIZING,
+			JobStatus.FAILED).iterator();
+
+		CommonTestUtils.assertThrows("Something is wrong", JobInitializationException.class, () -> {
+			ClientUtils.waitUntilJobInitializationFinished(
+				statusSequenceIterator::next,
+				() -> {
+					Throwable throwable = new JobInitializationException(
+						TESTING_JOB_ID,
+						"Something is wrong",
+						new RuntimeException("Err"));
+					return buildJobResult(throwable);
+				},
+				ClassLoader.getSystemClassLoader());
+			return null;
+		});
+	}
+
+	/**
+	 * Ensure that waitUntilJobInitializationFinished() does not throw non-initialization exceptions.
+	 */
+	@Test
+	public void testWaitUntilJobInitializationFinished_doesNotThrowRuntimeException() throws Exception {
+		Iterator<JobStatus> statusSequenceIterator = Arrays.asList(
+			JobStatus.INITIALIZING,
+			JobStatus.INITIALIZING,
+			JobStatus.FAILED).iterator();
+		ClientUtils.waitUntilJobInitializationFinished(
+			statusSequenceIterator::next,
+			() -> buildJobResult(new RuntimeException("Err")),
+			ClassLoader.getSystemClassLoader());
+	}
+
+	/**
+	 * Ensure that other errors are thrown.
+	 */
+	@Test
+	public void testWaitUntilJobInitializationFinished_throwsOtherErrors() {
+		CommonTestUtils.assertThrows("Error while waiting for job to be initialized", RuntimeException.class, () -> {
+			ClientUtils.waitUntilJobInitializationFinished(() -> {
+					throw new RuntimeException("other error");
+				},
+				() -> {
+					Throwable throwable = new JobInitializationException(
+						TESTING_JOB_ID,
+						"Something is wrong",
+						new RuntimeException("Err"));
+					return buildJobResult(throwable);
+				},
+				ClassLoader.getSystemClassLoader());
+			return null;
+		});
+	}
+
+	private JobResult buildJobResult(Throwable throwable) {
+		return new JobResult.Builder()
+			.jobId(TESTING_JOB_ID)
+			.serializedThrowable(new SerializedThrowable(throwable))
+			.netRuntime(1)
+			.build();
+	}
+
+	/**
+	 * Test normal operation.
+	 */
+	@Test
+	public void testWaitUntilJobInitializationFinished_regular() throws Exception {
+		Iterator<JobStatus> statusSequenceIterator = Arrays.asList(
+			JobStatus.INITIALIZING,
+			JobStatus.INITIALIZING,
+			JobStatus.RUNNING).iterator();
+		ClientUtils.waitUntilJobInitializationFinished(
+			statusSequenceIterator::next, () -> {
+				Assert.fail("unexpected call");
+				return null;
+			},
+			ClassLoader.getSystemClassLoader());
+	}
+}
diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
index 22b78c8..4ca9270 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
@@ -420,7 +420,7 @@ public class ClientTest extends TestLogger {
 
 				@Override
 				public PipelineExecutor getExecutor(@Nonnull Configuration configuration) {
-					return (pipeline, config) -> {
+					return (pipeline, config, classLoader) -> {
 						final int parallelism = config.getInteger(CoreOptions.DEFAULT_PARALLELISM);
 						final JobGraph jobGraph = FlinkPipelineTranslationUtil.getJobGraph(plan, config, parallelism);
 
diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/PerJobMiniClusterFactoryTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/PerJobMiniClusterFactoryTest.java
index 82a4750..0a69cde 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/program/PerJobMiniClusterFactoryTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/program/PerJobMiniClusterFactoryTest.java
@@ -59,7 +59,7 @@ public class PerJobMiniClusterFactoryTest extends TestLogger {
 	public void testJobExecution() throws Exception {
 		PerJobMiniClusterFactory perJobMiniClusterFactory = initializeMiniCluster();
 
-		JobClient jobClient = perJobMiniClusterFactory.submitJob(getNoopJobGraph()).get();
+		JobClient jobClient = perJobMiniClusterFactory.submitJob(getNoopJobGraph(), ClassLoader.getSystemClassLoader()).get();
 
 		JobExecutionResult jobExecutionResult = jobClient.getJobExecutionResult(getClass().getClassLoader()).get();
 		assertThat(jobExecutionResult, is(notNullValue()));
@@ -76,7 +76,7 @@ public class PerJobMiniClusterFactoryTest extends TestLogger {
 
 		JobGraph cancellableJobGraph = getCancellableJobGraph();
 		JobClient jobClient = perJobMiniClusterFactory
-			.submitJob(cancellableJobGraph)
+			.submitJob(cancellableJobGraph, ClassLoader.getSystemClassLoader())
 			.get();
 
 		assertThat(jobClient.getJobID(), is(cancellableJobGraph.getJobID()));
@@ -96,7 +96,7 @@ public class PerJobMiniClusterFactoryTest extends TestLogger {
 	@Test
 	public void testJobClientSavepoint() throws Exception {
 		PerJobMiniClusterFactory perJobMiniClusterFactory = initializeMiniCluster();
-		JobClient jobClient = perJobMiniClusterFactory.submitJob(getCancellableJobGraph()).get();
+		JobClient jobClient = perJobMiniClusterFactory.submitJob(getCancellableJobGraph(), ClassLoader.getSystemClassLoader()).get();
 
 		assertThrows(
 			"is not a streaming job.",
@@ -117,23 +117,21 @@ public class PerJobMiniClusterFactoryTest extends TestLogger {
 		JobGraph jobGraph = new JobGraph();
 
 		assertThrows(
-			"Failed to submit job.",
+			"Could not instantiate JobManager",
 			ExecutionException.class,
-			() -> perJobMiniClusterFactory.submitJob(jobGraph).get());
-
-		assertThatMiniClusterIsShutdown();
+			() -> perJobMiniClusterFactory.submitJob(jobGraph, ClassLoader.getSystemClassLoader()).get());
 	}
 
 	@Test
 	public void testMultipleExecutions() throws Exception {
 		PerJobMiniClusterFactory perJobMiniClusterFactory = initializeMiniCluster();
 		{
-			JobClient jobClient = perJobMiniClusterFactory.submitJob(getNoopJobGraph()).get();
+			JobClient jobClient = perJobMiniClusterFactory.submitJob(getNoopJobGraph(), ClassLoader.getSystemClassLoader()).get();
 			jobClient.getJobExecutionResult(getClass().getClassLoader()).get();
 			assertThatMiniClusterIsShutdown();
 		}
 		{
-			JobClient jobClient = perJobMiniClusterFactory.submitJob(getNoopJobGraph()).get();
+			JobClient jobClient = perJobMiniClusterFactory.submitJob(getNoopJobGraph(), ClassLoader.getSystemClassLoader()).get();
 			jobClient.getJobExecutionResult(getClass().getClassLoader()).get();
 			assertThatMiniClusterIsShutdown();
 		}
@@ -142,7 +140,7 @@ public class PerJobMiniClusterFactoryTest extends TestLogger {
 	@Test
 	public void testJobClientInteractionAfterShutdown() throws Exception {
 		PerJobMiniClusterFactory perJobMiniClusterFactory = initializeMiniCluster();
-		JobClient jobClient = perJobMiniClusterFactory.submitJob(getNoopJobGraph()).get();
+		JobClient jobClient = perJobMiniClusterFactory.submitJob(getNoopJobGraph(), ClassLoader.getSystemClassLoader()).get();
 		jobClient.getJobExecutionResult(getClass().getClassLoader()).get();
 		assertThatMiniClusterIsShutdown();
 
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/JobStatus.java b/flink-core/src/main/java/org/apache/flink/api/common/JobStatus.java
index b0ddd24..5cb6d90 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/JobStatus.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/JobStatus.java
@@ -21,10 +21,15 @@ package org.apache.flink.api.common;
 import org.apache.flink.annotation.PublicEvolving;
 
 /**
- * Possible states of a job once it has been accepted by the job manager.
+ * Possible states of a job once it has been accepted by the dispatcher.
  */
 @PublicEvolving
 public enum JobStatus {
+	/**
+	 * The job has been received by the Dispatcher, and is waiting for the job manager to be
+	 * created.
+	 */
+	INITIALIZING(TerminalState.NON_TERMINAL),
 
 	/** Job is newly created, no task has started to run. */
 	CREATED(TerminalState.NON_TERMINAL),
diff --git a/flink-core/src/main/java/org/apache/flink/core/execution/PipelineExecutor.java b/flink-core/src/main/java/org/apache/flink/core/execution/PipelineExecutor.java
index afe7cad..66a9ef6 100644
--- a/flink-core/src/main/java/org/apache/flink/core/execution/PipelineExecutor.java
+++ b/flink-core/src/main/java/org/apache/flink/core/execution/PipelineExecutor.java
@@ -39,7 +39,8 @@ public interface PipelineExecutor {
 	 *
 	 * @param pipeline the {@link Pipeline} to execute
 	 * @param configuration the {@link Configuration} with the required execution parameters
+	 * @param userCodeClassloader the {@link ClassLoader} to deserialize usercode
 	 * @return a {@link CompletableFuture} with the {@link JobClient} corresponding to the pipeline.
 	 */
-	CompletableFuture<JobClient> execute(final Pipeline pipeline, final Configuration configuration) throws Exception;
+	CompletableFuture<JobClient> execute(final Pipeline pipeline, final Configuration configuration, final ClassLoader userCodeClassloader) throws Exception;
 }
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
index 910c419..4eb48e9 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
@@ -970,7 +970,7 @@ public class ExecutionEnvironment {
 
 		CompletableFuture<JobClient> jobClientFuture = executorFactory
 			.getExecutor(configuration)
-			.execute(plan, configuration);
+			.execute(plan, configuration, userClassloader);
 
 		try {
 			JobClient jobClient = jobClientFuture.get();
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/ExecutorDiscoveryAndJobClientTest.java b/flink-java/src/test/java/org/apache/flink/api/java/ExecutorDiscoveryAndJobClientTest.java
index ab4a66b..cf046af 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/ExecutorDiscoveryAndJobClientTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/ExecutorDiscoveryAndJobClientTest.java
@@ -86,7 +86,7 @@ public class ExecutorDiscoveryAndJobClientTest {
 
 		@Override
 		public PipelineExecutor getExecutor(Configuration configuration) {
-			return (pipeline, executionConfig) -> CompletableFuture.completedFuture(new TestingJobClient());
+			return (pipeline, executionConfig, classLoader) -> CompletableFuture.completedFuture(new TestingJobClient());
 		}
 	}
 }
diff --git a/flink-runtime-web/web-dashboard/src/app/app.config.ts b/flink-runtime-web/web-dashboard/src/app/app.config.ts
index 6d41b67..97a5c83 100644
--- a/flink-runtime-web/web-dashboard/src/app/app.config.ts
+++ b/flink-runtime-web/web-dashboard/src/app/app.config.ts
@@ -30,6 +30,7 @@ export const COLOR_MAP = {
   IN_PROGRESS: '#faad14',
   SCHEDULED: '#722ed1',
   COMPLETED: '#1890ff',
-  RESTARTING: '#13c2c2'
+  RESTARTING: '#13c2c2',
+  INITIALIZING: '#738df8'
 };
 export const LONG_MIN_VALUE = -9223372036854776000;
diff --git a/flink-runtime-web/web-dashboard/src/app/pages/job/job.component.html b/flink-runtime-web/web-dashboard/src/app/pages/job/job.component.html
index 62aa2fd..cb1fedc 100644
--- a/flink-runtime-web/web-dashboard/src/app/pages/job/job.component.html
+++ b/flink-runtime-web/web-dashboard/src/app/pages/job/job.component.html
@@ -15,11 +15,19 @@
   ~ See the License for the specific language governing permissions and
   ~ limitations under the License.
   -->
-<flink-job-status [isLoading]="isLoading"></flink-job-status>
-<div class="content">
-  <nz-skeleton [nzActive]="true" *ngIf="isLoading"></nz-skeleton>
-  <div class="router" *ngIf="!isLoading">
-    <router-outlet></router-outlet>
+<ng-container *ngIf="!isError">
+  <flink-job-status [isLoading]="isLoading"></flink-job-status>
+  <div class="content">
+    <nz-skeleton [nzActive]="true" *ngIf="isLoading"></nz-skeleton>
+    <div class="router" *ngIf="!isLoading">
+      <router-outlet></router-outlet>
+    </div>
   </div>
-</div>
+</ng-container>
+
+
+<nz-alert *ngIf="isError" nzShowIcon nzType="warning" nzMessage="Job failed during initialization of JobManager" [nzDescription]=descriptionTemplateRef></nz-alert>
+<ng-template #descriptionTemplateRef>
+  <pre>{{errorDetails}}</pre>
+</ng-template>
 
diff --git a/flink-runtime-web/web-dashboard/src/app/pages/job/job.component.ts b/flink-runtime-web/web-dashboard/src/app/pages/job/job.component.ts
index ef48463..bf185ab 100644
--- a/flink-runtime-web/web-dashboard/src/app/pages/job/job.component.ts
+++ b/flink-runtime-web/web-dashboard/src/app/pages/job/job.component.ts
@@ -16,11 +16,11 @@
  * limitations under the License.
  */
 
-import { ChangeDetectionStrategy, ChangeDetectorRef, Component, OnDestroy, OnInit } from '@angular/core';
-import { ActivatedRoute } from '@angular/router';
-import { Subject } from 'rxjs';
-import { flatMap, takeUntil } from 'rxjs/operators';
-import { JobService, StatusService } from 'services';
+import {ChangeDetectionStrategy, ChangeDetectorRef, Component, OnDestroy, OnInit} from '@angular/core';
+import {ActivatedRoute} from '@angular/router';
+import {EMPTY, Subject} from 'rxjs';
+import {catchError, flatMap, takeUntil} from 'rxjs/operators';
+import {JobService, StatusService} from 'services';
 
 @Component({
   selector: 'flink-job',
@@ -31,6 +31,8 @@ import { JobService, StatusService } from 'services';
 export class JobComponent implements OnInit, OnDestroy {
   destroy$ = new Subject();
   isLoading = true;
+  isError = false;
+  errorDetails: string;
 
   constructor(
     private cdr: ChangeDetectorRef,
@@ -43,18 +45,26 @@ export class JobComponent implements OnInit, OnDestroy {
     this.statusService.refresh$
       .pipe(
         takeUntil(this.destroy$),
-        flatMap(() => this.jobService.loadJob(this.activatedRoute.snapshot.params.jid))
+        flatMap(() =>
+          this.jobService.loadJob(this.activatedRoute.snapshot.params.jid).pipe(
+            catchError(() => {
+              this.jobService.loadExceptions(this.activatedRoute.snapshot.params.jid, 10).subscribe(data => {
+                this.errorDetails = data['root-exception'];
+                this.cdr.markForCheck();
+              });
+              this.isError = true;
+              this.isLoading = false;
+              this.cdr.markForCheck();
+              return EMPTY;
+            })
+          )
+        )
       )
-      .subscribe(
-        () => {
-          this.isLoading = false;
-          this.cdr.markForCheck();
-        },
-        () => {
-          this.isLoading = false;
-          this.cdr.markForCheck();
-        }
-      );
+      .subscribe(() => {
+        this.isLoading = false;
+        this.isError = false;
+        this.cdr.markForCheck();
+      });
   }
 
   ngOnDestroy() {
diff --git a/flink-runtime-web/web-dashboard/src/app/services/job.service.ts b/flink-runtime-web/web-dashboard/src/app/services/job.service.ts
index e1d057e..ef013f2 100644
--- a/flink-runtime-web/web-dashboard/src/app/services/job.service.ts
+++ b/flink-runtime-web/web-dashboard/src/app/services/job.service.ts
@@ -117,8 +117,7 @@ export class JobService {
       map(job => this.convertJob(job)),
       tap(job => {
         this.jobDetail$.next(job);
-      }),
-      catchError(() => EMPTY)
+      })
     );
   }
 
diff --git a/flink-runtime-web/web-dashboard/src/app/share/customize/job-list/job-list.component.html b/flink-runtime-web/web-dashboard/src/app/share/customize/job-list/job-list.component.html
index 9553bda..51f84dd 100644
--- a/flink-runtime-web/web-dashboard/src/app/share/customize/job-list/job-list.component.html
+++ b/flink-runtime-web/web-dashboard/src/app/share/customize/job-list/job-list.component.html
@@ -33,7 +33,7 @@
       </tr>
     </thead>
     <tbody>
-      <tr *ngFor="let job of listOfJob; trackBy:trackJobBy;" (click)="navigateToJob(job.jid)" class="clickable">
+      <tr *ngFor="let job of listOfJob; trackBy:trackJobBy;" (click)="navigateToJob(job)" class="clickable">
         <td>{{ job.name }}</td>
         <td>{{ job["start-time"] | humanizeDate: 'yyyy-MM-dd HH:mm:ss' }}</td>
         <td>{{ job.duration | humanizeDuration }}</td>
diff --git a/flink-runtime-web/web-dashboard/src/app/share/customize/job-list/job-list.component.ts b/flink-runtime-web/web-dashboard/src/app/share/customize/job-list/job-list.component.ts
index 01aceef..ec43952 100644
--- a/flink-runtime-web/web-dashboard/src/app/share/customize/job-list/job-list.component.ts
+++ b/flink-runtime-web/web-dashboard/src/app/share/customize/job-list/job-list.component.ts
@@ -23,6 +23,7 @@ import { Observable, Subject } from 'rxjs';
 import { flatMap, takeUntil } from 'rxjs/operators';
 import { JobService, StatusService } from 'services';
 import { deepFind, isNil } from 'utils';
+import { NzMessageService } from 'ng-zorro-antd';
 
 @Component({
   selector: 'flink-job-list',
@@ -64,13 +65,18 @@ export class JobListComponent implements OnInit, OnDestroy {
     return node.jid;
   }
 
-  navigateToJob(jid: string) {
-    this.router.navigate(['job', jid]).then();
+  navigateToJob(job: JobsItemInterface) {
+    if (job.state === 'INITIALIZING') {
+      this.nzMessageService.info('Job detail page is not available while it is in state INITIALIZING.');
+    } else {
+      this.router.navigate(['job', job.jid]).then();
+    }
   }
 
   constructor(
     private statusService: StatusService,
     private jobService: JobService,
+    private nzMessageService: NzMessageService,
     private activatedRoute: ActivatedRoute,
     private cdr: ChangeDetectorRef,
     private router: Router
diff --git a/flink-runtime-web/web-dashboard/src/app/app.config.ts b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobInitializationException.java
similarity index 65%
copy from flink-runtime-web/web-dashboard/src/app/app.config.ts
copy to flink-runtime/src/main/java/org/apache/flink/runtime/client/JobInitializationException.java
index 6d41b67..05752f1 100644
--- a/flink-runtime-web/web-dashboard/src/app/app.config.ts
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobInitializationException.java
@@ -16,20 +16,18 @@
  * limitations under the License.
  */
 
-export const BASE_URL = '.';
-export const COLOR_MAP = {
-  TOTAL: '#112641',
-  RUNNING: '#52c41a',
-  FAILED: '#f5222d',
-  FINISHED: '#1890ff',
-  CANCELED: '#fa8c16',
-  CANCELING: '#faad14',
-  CREATED: '#2f54eb',
-  DEPLOYING: '#13c2c2',
-  RECONCILING: '#eb2f96',
-  IN_PROGRESS: '#faad14',
-  SCHEDULED: '#722ed1',
-  COMPLETED: '#1890ff',
-  RESTARTING: '#13c2c2'
-};
-export const LONG_MIN_VALUE = -9223372036854776000;
+package org.apache.flink.runtime.client;
+
+import org.apache.flink.api.common.JobID;
+
+/**
+ * An exception indicating that the job has failed in the INITIALIZING job status.
+ */
+public class JobInitializationException extends JobExecutionException {
+
+	private static final long serialVersionUID = 2818087325120827526L;
+
+	public JobInitializationException(JobID jobID, String msg, Throwable cause){
+		super(jobID, msg, cause);
+	}
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
index 047b80e..e21d742 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
@@ -28,7 +28,7 @@ import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.runtime.blob.BlobServer;
 import org.apache.flink.runtime.checkpoint.Checkpoints;
 import org.apache.flink.runtime.client.DuplicateJobSubmissionException;
-import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.runtime.client.JobInitializationException;
 import org.apache.flink.runtime.client.JobSubmissionException;
 import org.apache.flink.runtime.clusterframework.ApplicationStatus;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
@@ -44,7 +44,6 @@ import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.jobmanager.JobGraphWriter;
 import org.apache.flink.runtime.jobmaster.JobManagerRunner;
-import org.apache.flink.runtime.jobmaster.JobManagerRunnerImpl;
 import org.apache.flink.runtime.jobmaster.JobManagerSharedServices;
 import org.apache.flink.runtime.jobmaster.JobMasterGateway;
 import org.apache.flink.runtime.jobmaster.JobNotFinishedException;
@@ -72,9 +71,8 @@ import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.SerializedValue;
-import org.apache.flink.util.function.BiConsumerWithException;
 import org.apache.flink.util.function.FunctionUtils;
-import org.apache.flink.util.function.FunctionWithException;
+import org.apache.flink.util.function.ThrowingConsumer;
 
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
@@ -90,7 +88,6 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
-import java.util.function.BiFunction;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 
@@ -119,7 +116,7 @@ public abstract class Dispatcher extends PermanentlyFencedRpcEndpoint<Dispatcher
 
 	private final FatalErrorHandler fatalErrorHandler;
 
-	private final Map<JobID, CompletableFuture<JobManagerRunner>> jobManagerRunnerFutures;
+	private final Map<JobID, DispatcherJob> runningJobs;
 
 	private final DispatcherBootstrap dispatcherBootstrap;
 
@@ -134,10 +131,17 @@ public abstract class Dispatcher extends PermanentlyFencedRpcEndpoint<Dispatcher
 	@Nullable
 	private final String metricServiceQueryAddress;
 
-	private final Map<JobID, CompletableFuture<Void>> jobManagerTerminationFutures;
+	private final Map<JobID, CompletableFuture<Void>> dispatcherJobTerminationFutures;
 
 	protected final CompletableFuture<ApplicationStatus> shutDownFuture;
 
+	/**
+	 * Enum to distinguish between initial job submission and re-submission for recovery.
+	 */
+	protected enum ExecutionType {
+		SUBMISSION, RECOVERY
+	}
+
 	public Dispatcher(
 			RpcService rpcService,
 			DispatcherId fencingToken,
@@ -163,7 +167,7 @@ public abstract class Dispatcher extends PermanentlyFencedRpcEndpoint<Dispatcher
 
 		this.runningJobsRegistry = highAvailabilityServices.getRunningJobsRegistry();
 
-		jobManagerRunnerFutures = new HashMap<>(16);
+		runningJobs = new HashMap<>(16);
 
 		this.historyServerArchivist = dispatcherServices.getHistoryServerArchivist();
 
@@ -171,7 +175,7 @@ public abstract class Dispatcher extends PermanentlyFencedRpcEndpoint<Dispatcher
 
 		this.jobManagerRunnerFactory = dispatcherServices.getJobManagerRunnerFactory();
 
-		this.jobManagerTerminationFutures = new HashMap<>(2);
+		this.dispatcherJobTerminationFutures = new HashMap<>(2);
 
 		this.shutDownFuture = new CompletableFuture<>();
 
@@ -213,18 +217,11 @@ public abstract class Dispatcher extends PermanentlyFencedRpcEndpoint<Dispatcher
 
 	void runRecoveredJob(final JobGraph recoveredJob) {
 		checkNotNull(recoveredJob);
-		FutureUtils.assertNoException(runJob(recoveredJob)
-			.handle(handleRecoveredJobStartError(recoveredJob.getJobID())));
-	}
-
-	private BiFunction<Void, Throwable, Void> handleRecoveredJobStartError(JobID jobId) {
-		return (ignored, throwable) -> {
-			if (throwable != null) {
-				onFatalError(new DispatcherException(String.format("Could not start recovered job %s.", jobId), throwable));
-			}
-
-			return null;
-		};
+		try {
+			runJob(recoveredJob, ExecutionType.RECOVERY);
+		} catch (Throwable throwable) {
+			onFatalError(new DispatcherException(String.format("Could not start recovered job %s.", recoveredJob.getJobID()), throwable));
+		}
 	}
 
 	private void handleStartDispatcherServicesException(Exception e) throws Exception {
@@ -241,10 +238,10 @@ public abstract class Dispatcher extends PermanentlyFencedRpcEndpoint<Dispatcher
 	public CompletableFuture<Void> onStop() {
 		log.info("Stopping dispatcher {}.", getAddress());
 
-		final CompletableFuture<Void> allJobManagerRunnersTerminationFuture = terminateJobManagerRunnersAndGetTerminationFuture();
+		final CompletableFuture<Void> allJobsTerminationFuture = terminateRunningJobsAndGetTerminationFuture();
 
 		return FutureUtils.runAfterwards(
-			allJobManagerRunnersTerminationFuture,
+			allJobsTerminationFuture,
 			() -> {
 				dispatcherBootstrap.stop();
 
@@ -307,7 +304,7 @@ public abstract class Dispatcher extends PermanentlyFencedRpcEndpoint<Dispatcher
 			throw new FlinkException(String.format("Failed to retrieve job scheduling status for job %s.", jobId), e);
 		}
 
-		return jobSchedulingStatus == RunningJobsRegistry.JobSchedulingStatus.DONE || jobManagerRunnerFutures.containsKey(jobId);
+		return jobSchedulingStatus == RunningJobsRegistry.JobSchedulingStatus.DONE || runningJobs.containsKey(jobId);
 	}
 
 	private boolean isPartialResourceConfigured(JobGraph jobGraph) {
@@ -332,7 +329,8 @@ public abstract class Dispatcher extends PermanentlyFencedRpcEndpoint<Dispatcher
 	private CompletableFuture<Acknowledge> internalSubmitJob(JobGraph jobGraph) {
 		log.info("Submitting job {} ({}).", jobGraph.getJobID(), jobGraph.getName());
 
-		final CompletableFuture<Acknowledge> persistAndRunFuture = waitForTerminatingJobManager(jobGraph.getJobID(), jobGraph, this::persistAndRunJob)
+		final CompletableFuture<Acknowledge> persistAndRunFuture = waitForTerminatingJob(jobGraph.getJobID(), jobGraph,
+			this::persistAndRunJob)
 			.thenApply(ignored -> Acknowledge.get());
 
 		return persistAndRunFuture.handleAsync((acknowledge, throwable) -> {
@@ -350,44 +348,59 @@ public abstract class Dispatcher extends PermanentlyFencedRpcEndpoint<Dispatcher
 		}, getRpcService().getExecutor());
 	}
 
-	private CompletableFuture<Void> persistAndRunJob(JobGraph jobGraph) throws Exception {
+	private void persistAndRunJob(JobGraph jobGraph) throws Exception {
 		jobGraphWriter.putJobGraph(jobGraph);
-
-		final CompletableFuture<Void> runJobFuture = runJob(jobGraph);
-
-		return runJobFuture.whenComplete(BiConsumerWithException.unchecked((Object ignored, Throwable throwable) -> {
-			if (throwable != null) {
-				jobGraphWriter.removeJobGraph(jobGraph.getJobID());
-			}
-		}));
+		runJob(jobGraph, ExecutionType.SUBMISSION);
 	}
 
-	private CompletableFuture<Void> runJob(JobGraph jobGraph) {
-		Preconditions.checkState(!jobManagerRunnerFutures.containsKey(jobGraph.getJobID()));
+	private void runJob(JobGraph jobGraph, ExecutionType executionType) {
+		Preconditions.checkState(!runningJobs.containsKey(jobGraph.getJobID()));
 
-		final CompletableFuture<JobManagerRunner> jobManagerRunnerFuture = createJobManagerRunner(jobGraph);
+		CompletableFuture<JobManagerRunner> jobManagerRunnerFuture = createJobManagerRunner(jobGraph);
 
-		jobManagerRunnerFutures.put(jobGraph.getJobID(), jobManagerRunnerFuture);
+		DispatcherJob dispatcherJob = DispatcherJob.createFor(
+				jobManagerRunnerFuture,
+				jobGraph.getJobID(),
+				jobGraph.getName());
+		runningJobs.put(jobGraph.getJobID(), dispatcherJob);
 
-		return jobManagerRunnerFuture
-			.thenApply(FunctionUtils.uncheckedFunction(this::startJobManagerRunner))
-			.thenApply(FunctionUtils.nullFn())
-			.whenCompleteAsync(
-				(ignored, throwable) -> {
-					if (throwable != null) {
-						jobManagerRunnerFutures.remove(jobGraph.getJobID());
+		final JobID jobId = jobGraph.getJobID();
+		FutureUtils.assertNoException(
+			dispatcherJob.getResultFuture().handleAsync(
+				(DispatcherJobResult dispatcherJobResult, Throwable throwable) -> {
+					// check if we are still the active DispatcherJob by checking the identity
+					DispatcherJob job = runningJobs.get(jobId);
+					if (job == dispatcherJob) {
+						if (dispatcherJobResult != null) {
+							if (dispatcherJobResult.isInitializationFailure() && executionType == ExecutionType.RECOVERY) {
+								dispatcherJobFailed(jobId, dispatcherJobResult.getInitializationFailure());
+							} else {
+								jobReachedGloballyTerminalState(dispatcherJobResult.getArchivedExecutionGraph());
+							}
+						} else {
+							dispatcherJobFailed(jobId, throwable);
+						}
+					} else {
+						log.debug("Job {} is not registered anymore at dispatcher", jobId);
 					}
-				},
-				getMainThreadExecutor());
+					return null;
+				}, getMainThreadExecutor()));
 	}
 
-	private CompletableFuture<JobManagerRunner> createJobManagerRunner(JobGraph jobGraph) {
-		final RpcService rpcService = getRpcService();
+	private void dispatcherJobFailed(JobID jobId, Throwable throwable) {
+		if (throwable instanceof JobNotFinishedException) {
+			jobNotFinished(jobId);
+		} else {
+			jobMasterFailed(jobId, throwable);
+		}
+	}
 
+	CompletableFuture<JobManagerRunner> createJobManagerRunner(JobGraph jobGraph) {
+		final RpcService rpcService = getRpcService();
 		return CompletableFuture.supplyAsync(
 			() -> {
 				try {
-					return jobManagerRunnerFactory.createJobManagerRunner(
+					JobManagerRunner runner = jobManagerRunnerFactory.createJobManagerRunner(
 						jobGraph,
 						configuration,
 						rpcService,
@@ -396,52 +409,19 @@ public abstract class Dispatcher extends PermanentlyFencedRpcEndpoint<Dispatcher
 						jobManagerSharedServices,
 						new DefaultJobManagerJobMetricGroupFactory(jobManagerMetricGroup),
 						fatalErrorHandler);
+					runner.start();
+					return runner;
 				} catch (Exception e) {
-					throw new CompletionException(new JobExecutionException(jobGraph.getJobID(), "Could not instantiate JobManager.", e));
+					throw new CompletionException(new JobInitializationException(jobGraph.getJobID(), "Could not instantiate JobManager.", e));
 				}
 			},
-			rpcService.getExecutor());
-	}
-
-	private JobManagerRunner startJobManagerRunner(JobManagerRunner jobManagerRunner) throws Exception {
-		final JobID jobId = jobManagerRunner.getJobID();
-
-		FutureUtils.assertNoException(
-			jobManagerRunner.getResultFuture().handleAsync(
-				(ArchivedExecutionGraph archivedExecutionGraph, Throwable throwable) -> {
-					// check if we are still the active JobManagerRunner by checking the identity
-					final JobManagerRunner currentJobManagerRunner = Optional.ofNullable(jobManagerRunnerFutures.get(jobId))
-						.map(future -> future.getNow(null))
-						.orElse(null);
-					//noinspection ObjectEquality
-					if (jobManagerRunner == currentJobManagerRunner) {
-						if (archivedExecutionGraph != null) {
-							jobReachedGloballyTerminalState(archivedExecutionGraph);
-						} else {
-							final Throwable strippedThrowable = ExceptionUtils.stripCompletionException(throwable);
-
-							if (strippedThrowable instanceof JobNotFinishedException) {
-								jobNotFinished(jobId);
-							} else {
-								jobMasterFailed(jobId, strippedThrowable);
-							}
-						}
-					} else {
-						log.debug("There is a newer JobManagerRunner for the job {}.", jobId);
-					}
-
-					return null;
-				}, getMainThreadExecutor()));
-
-		jobManagerRunner.start();
-
-		return jobManagerRunner;
+			rpcService.getExecutor()); // do not use main thread executor. Otherwise, Dispatcher is blocked on JobManager creation
 	}
 
 	@Override
 	public CompletableFuture<Collection<JobID>> listJobs(Time timeout) {
 		return CompletableFuture.completedFuture(
-			Collections.unmodifiableSet(new HashSet<>(jobManagerRunnerFutures.keySet())));
+			Collections.unmodifiableSet(new HashSet<>(runningJobs.keySet())));
 	}
 
 	@Override
@@ -465,17 +445,18 @@ public abstract class Dispatcher extends PermanentlyFencedRpcEndpoint<Dispatcher
 
 	@Override
 	public CompletableFuture<Acknowledge> cancelJob(JobID jobId, Time timeout) {
-		final CompletableFuture<JobMasterGateway> jobMasterGatewayFuture = getJobMasterGatewayFuture(jobId);
-
-		return jobMasterGatewayFuture.thenCompose((JobMasterGateway jobMasterGateway) -> jobMasterGateway.cancel(timeout));
+		Optional<DispatcherJob> maybeJob = getDispatcherJob(jobId);
+		return maybeJob.map(job -> job.cancel(timeout)).orElseGet(() -> {
+			log.debug("Dispatcher is unable to cancel job {}: not found", jobId);
+			return FutureUtils.completedExceptionally(new FlinkJobNotFoundException(jobId));
+		});
 	}
 
 	@Override
 	public CompletableFuture<ClusterOverview> requestClusterOverview(Time timeout) {
 		CompletableFuture<ResourceOverview> taskManagerOverviewFuture = runResourceManagerCommand(resourceManagerGateway -> resourceManagerGateway.requestResourceOverview(timeout));
 
-		final List<CompletableFuture<Optional<JobStatus>>> optionalJobInformation = queryJobMastersForInformation(
-			(JobMasterGateway jobMasterGateway) -> jobMasterGateway.requestJobStatus(timeout));
+		final List<CompletableFuture<Optional<JobStatus>>> optionalJobInformation = queryJobMastersForInformation(dispatcherJob -> dispatcherJob.requestJobStatus(timeout));
 
 		CompletableFuture<Collection<Optional<JobStatus>>> allOptionalJobsFuture = FutureUtils.combineAll(optionalJobInformation);
 
@@ -493,8 +474,7 @@ public abstract class Dispatcher extends PermanentlyFencedRpcEndpoint<Dispatcher
 
 	@Override
 	public CompletableFuture<MultipleJobsDetails> requestMultipleJobDetails(Time timeout) {
-		List<CompletableFuture<Optional<JobDetails>>> individualOptionalJobDetails = queryJobMastersForInformation(
-			(JobMasterGateway jobMasterGateway) -> jobMasterGateway.requestJobDetails(timeout));
+		List<CompletableFuture<Optional<JobDetails>>> individualOptionalJobDetails = queryJobMastersForInformation(dj -> dj.requestJobDetails(timeout));
 
 		CompletableFuture<Collection<Optional<JobDetails>>> optionalCombinedJobDetails = FutureUtils.combineAll(
 			individualOptionalJobDetails);
@@ -516,59 +496,47 @@ public abstract class Dispatcher extends PermanentlyFencedRpcEndpoint<Dispatcher
 
 	@Override
 	public CompletableFuture<JobStatus> requestJobStatus(JobID jobId, Time timeout) {
-
-		final CompletableFuture<JobMasterGateway> jobMasterGatewayFuture = getJobMasterGatewayFuture(jobId);
-
-		final CompletableFuture<JobStatus> jobStatusFuture = jobMasterGatewayFuture.thenCompose(
-			(JobMasterGateway jobMasterGateway) -> jobMasterGateway.requestJobStatus(timeout));
-
-		return jobStatusFuture.exceptionally(
-			(Throwable throwable) -> {
-				final JobDetails jobDetails = archivedExecutionGraphStore.getAvailableJobDetails(jobId);
-
-				// check whether it is a completed job
-				if (jobDetails == null) {
-					throw new CompletionException(ExceptionUtils.stripCompletionException(throwable));
-				} else {
-					return jobDetails.getStatus();
-				}
-			});
+		Optional<DispatcherJob> maybeJob = getDispatcherJob(jobId);
+		return maybeJob.map(job -> job.requestJobStatus(timeout)).orElseGet(() -> {
+			// is it a completed job?
+			final JobDetails jobDetails = archivedExecutionGraphStore.getAvailableJobDetails(jobId);
+			if (jobDetails == null) {
+				return FutureUtils.completedExceptionally(new FlinkJobNotFoundException(jobId));
+			} else {
+				return CompletableFuture.completedFuture(jobDetails.getStatus());
+			}
+		});
 	}
 
 	@Override
 	public CompletableFuture<OperatorBackPressureStatsResponse> requestOperatorBackPressureStats(
 			final JobID jobId,
 			final JobVertexID jobVertexId) {
-		final CompletableFuture<JobMasterGateway> jobMasterGatewayFuture = getJobMasterGatewayFuture(jobId);
-
-		return jobMasterGatewayFuture.thenCompose((JobMasterGateway jobMasterGateway) -> jobMasterGateway.requestOperatorBackPressureStats(jobVertexId));
+		return performOperationOnJobMasterGateway(jobId, gateway -> gateway.requestOperatorBackPressureStats(jobVertexId));
 	}
 
 	@Override
 	public CompletableFuture<ArchivedExecutionGraph> requestJob(JobID jobId, Time timeout) {
-		final CompletableFuture<JobMasterGateway> jobMasterGatewayFuture = getJobMasterGatewayFuture(jobId);
-
-		final CompletableFuture<ArchivedExecutionGraph> archivedExecutionGraphFuture = jobMasterGatewayFuture.thenCompose(
-			(JobMasterGateway jobMasterGateway) -> jobMasterGateway.requestJob(timeout));
-
-		return archivedExecutionGraphFuture.exceptionally(
-			(Throwable throwable) -> {
-				final ArchivedExecutionGraph serializableExecutionGraph = archivedExecutionGraphStore.get(jobId);
-
-				// check whether it is a completed job
-				if (serializableExecutionGraph == null) {
-					throw new CompletionException(ExceptionUtils.stripCompletionException(throwable));
-				} else {
-					return serializableExecutionGraph;
-				}
-			});
+		Function<Throwable, ArchivedExecutionGraph> checkExecutionGraphStoreOnException = throwable ->  {
+			// check whether it is a completed job
+			final ArchivedExecutionGraph archivedExecutionGraph = archivedExecutionGraphStore.get(jobId);
+			if (archivedExecutionGraph == null) {
+				throw new CompletionException(ExceptionUtils.stripCompletionException(throwable));
+			} else {
+				return archivedExecutionGraph;
+			}
+		};
+		Optional<DispatcherJob> maybeJob = getDispatcherJob(jobId);
+		return maybeJob.map(job -> job.requestJob(timeout))
+			.orElse(FutureUtils.completedExceptionally(new FlinkJobNotFoundException(jobId)))
+			.exceptionally(checkExecutionGraphStoreOnException);
 	}
 
 	@Override
 	public CompletableFuture<JobResult> requestJobResult(JobID jobId, Time timeout) {
-		final CompletableFuture<JobManagerRunner> jobManagerRunnerFuture = jobManagerRunnerFutures.get(jobId);
+		DispatcherJob job = runningJobs.get(jobId);
 
-		if (jobManagerRunnerFuture == null) {
+		if (job == null) {
 			final ArchivedExecutionGraph archivedExecutionGraph = archivedExecutionGraphStore.get(jobId);
 
 			if (archivedExecutionGraph == null) {
@@ -577,7 +545,7 @@ public abstract class Dispatcher extends PermanentlyFencedRpcEndpoint<Dispatcher
 				return CompletableFuture.completedFuture(JobResult.createFrom(archivedExecutionGraph));
 			}
 		} else {
-			return jobManagerRunnerFuture.thenCompose(JobManagerRunner::getResultFuture).thenApply(JobResult::createFrom);
+			return job.getResultFuture().thenApply(dispatcherJobResult -> JobResult.createFrom(dispatcherJobResult.getArchivedExecutionGraph()));
 		}
 	}
 
@@ -606,11 +574,8 @@ public abstract class Dispatcher extends PermanentlyFencedRpcEndpoint<Dispatcher
 			final String targetDirectory,
 			final boolean cancelJob,
 			final Time timeout) {
-		final CompletableFuture<JobMasterGateway> jobMasterGatewayFuture = getJobMasterGatewayFuture(jobId);
 
-		return jobMasterGatewayFuture.thenCompose(
-			(JobMasterGateway jobMasterGateway) ->
-				jobMasterGateway.triggerSavepoint(targetDirectory, cancelJob, timeout));
+		return performOperationOnJobMasterGateway(jobId, gateway -> gateway.triggerSavepoint(targetDirectory, cancelJob, timeout));
 	}
 
 	@Override
@@ -619,11 +584,7 @@ public abstract class Dispatcher extends PermanentlyFencedRpcEndpoint<Dispatcher
 			final String targetDirectory,
 			final boolean advanceToEndOfEventTime,
 			final Time timeout) {
-		final CompletableFuture<JobMasterGateway> jobMasterGatewayFuture = getJobMasterGatewayFuture(jobId);
-
-		return jobMasterGatewayFuture.thenCompose(
-				(JobMasterGateway jobMasterGateway) ->
-						jobMasterGateway.stopWithSavepoint(targetDirectory, advanceToEndOfEventTime, timeout));
+		return performOperationOnJobMasterGateway(jobId, gateway -> gateway.stopWithSavepoint(targetDirectory, advanceToEndOfEventTime, timeout));
 	}
 
 	@Override
@@ -643,11 +604,7 @@ public abstract class Dispatcher extends PermanentlyFencedRpcEndpoint<Dispatcher
 			OperatorID operatorId,
 			SerializedValue<CoordinationRequest> serializedRequest,
 			Time timeout) {
-		final CompletableFuture<JobMasterGateway> jobMasterGatewayFuture = getJobMasterGatewayFuture(jobId);
-
-		return jobMasterGatewayFuture.thenCompose(
-			(JobMasterGateway jobMasterGateway) ->
-				jobMasterGateway.deliverCoordinationRequestToCoordinator(operatorId, serializedRequest, timeout));
+		return performOperationOnJobMasterGateway(jobId, gateway -> gateway.deliverCoordinationRequestToCoordinator(operatorId, serializedRequest, timeout));
 	}
 
 	/**
@@ -660,38 +617,36 @@ public abstract class Dispatcher extends PermanentlyFencedRpcEndpoint<Dispatcher
 	private void removeJobAndRegisterTerminationFuture(JobID jobId, boolean cleanupHA) {
 		final CompletableFuture<Void> cleanupFuture = removeJob(jobId, cleanupHA);
 
-		registerJobManagerRunnerTerminationFuture(jobId, cleanupFuture);
+		registerDispatcherJobTerminationFuture(jobId, cleanupFuture);
 	}
 
-	private void registerJobManagerRunnerTerminationFuture(JobID jobId, CompletableFuture<Void> jobManagerRunnerTerminationFuture) {
-		Preconditions.checkState(!jobManagerTerminationFutures.containsKey(jobId));
-
-		jobManagerTerminationFutures.put(jobId, jobManagerRunnerTerminationFuture);
+	private void registerDispatcherJobTerminationFuture(JobID jobId, CompletableFuture<Void> dispatcherJobTerminationFuture) {
+		Preconditions.checkState(!dispatcherJobTerminationFutures.containsKey(jobId));
+		dispatcherJobTerminationFutures.put(jobId, dispatcherJobTerminationFuture);
 
 		// clean up the pending termination future
-		jobManagerRunnerTerminationFuture.thenRunAsync(
+		dispatcherJobTerminationFuture.thenRunAsync(
 			() -> {
-				final CompletableFuture<Void> terminationFuture = jobManagerTerminationFutures.remove(jobId);
+				final CompletableFuture<Void> terminationFuture = dispatcherJobTerminationFutures.remove(jobId);
 
 				//noinspection ObjectEquality
-				if (terminationFuture != null && terminationFuture != jobManagerRunnerTerminationFuture) {
-					jobManagerTerminationFutures.put(jobId, terminationFuture);
+				if (terminationFuture != null && terminationFuture != dispatcherJobTerminationFuture) {
+					dispatcherJobTerminationFutures.put(jobId, terminationFuture);
 				}
 			},
 			getMainThreadExecutor());
 	}
 
 	private CompletableFuture<Void> removeJob(JobID jobId, boolean cleanupHA) {
-		CompletableFuture<JobManagerRunner> jobManagerRunnerFuture = jobManagerRunnerFutures.remove(jobId);
+		DispatcherJob job = runningJobs.remove(jobId);
 
-		final CompletableFuture<Void> jobManagerRunnerTerminationFuture;
-		if (jobManagerRunnerFuture != null) {
-			jobManagerRunnerTerminationFuture = jobManagerRunnerFuture.thenCompose(JobManagerRunner::closeAsync);
+		final CompletableFuture<Void> dispatcherJobTerminationFuture;
+		if (job != null) {
+			dispatcherJobTerminationFuture = job.closeAsync();
 		} else {
-			jobManagerRunnerTerminationFuture = CompletableFuture.completedFuture(null);
+			dispatcherJobTerminationFuture = CompletableFuture.completedFuture(null);
 		}
-
-		return jobManagerRunnerTerminationFuture.thenRunAsync(
+		return dispatcherJobTerminationFuture.thenRunAsync(
 			() -> cleanUpJobData(jobId, cleanupHA),
 			getRpcService().getExecutor());
 	}
@@ -727,21 +682,21 @@ public abstract class Dispatcher extends PermanentlyFencedRpcEndpoint<Dispatcher
 	}
 
 	/**
-	 * Terminate all currently running {@link JobManagerRunnerImpl}.
+	 * Terminate all currently running {@link DispatcherJob}s.
 	 */
-	private void terminateJobManagerRunners() {
+	private void terminateRunningJobs() {
 		log.info("Stopping all currently running jobs of dispatcher {}.", getAddress());
 
-		final HashSet<JobID> jobsToRemove = new HashSet<>(jobManagerRunnerFutures.keySet());
+		final HashSet<JobID> jobsToRemove = new HashSet<>(runningJobs.keySet());
 
 		for (JobID jobId : jobsToRemove) {
 			removeJobAndRegisterTerminationFuture(jobId, false);
 		}
 	}
 
-	private CompletableFuture<Void> terminateJobManagerRunnersAndGetTerminationFuture() {
-		terminateJobManagerRunners();
-		final Collection<CompletableFuture<Void>> values = jobManagerTerminationFutures.values();
+	private CompletableFuture<Void> terminateRunningJobsAndGetTerminationFuture() {
+		terminateRunningJobs();
+		final Collection<CompletableFuture<Void>> values = dispatcherJobTerminationFutures.values();
 		return FutureUtils.completeAll(values);
 	}
 
@@ -802,30 +757,33 @@ public abstract class Dispatcher extends PermanentlyFencedRpcEndpoint<Dispatcher
 		onFatalError(new FlinkException(String.format("JobMaster for job %s failed.", jobId), cause));
 	}
 
-	private CompletableFuture<JobMasterGateway> getJobMasterGatewayFuture(JobID jobId) {
-		final CompletableFuture<JobManagerRunner> jobManagerRunnerFuture = jobManagerRunnerFutures.get(jobId);
-
-		if (jobManagerRunnerFuture == null) {
+	/**
+	 * Ensures that the JobMasterGateway is available.
+	 */
+	private CompletableFuture<JobMasterGateway> getJobMasterGateway(JobID jobId) {
+		DispatcherJob job = runningJobs.get(jobId);
+		if (job == null) {
 			return FutureUtils.completedExceptionally(new FlinkJobNotFoundException(jobId));
-		} else {
-			final CompletableFuture<JobMasterGateway> leaderGatewayFuture = jobManagerRunnerFuture.thenCompose(JobManagerRunner::getJobMasterGateway);
-			return leaderGatewayFuture.thenApplyAsync(
-				(JobMasterGateway jobMasterGateway) -> {
-					// check whether the retrieved JobMasterGateway belongs still to a running JobMaster
-					if (jobManagerRunnerFutures.containsKey(jobId)) {
-						return jobMasterGateway;
-					} else {
-						throw new CompletionException(new FlinkJobNotFoundException(jobId));
-					}
-				},
-				getMainThreadExecutor());
 		}
+		if (!job.isInitialized()) {
+			return FutureUtils.completedExceptionally(new UnavailableDispatcherOperationException("Unable to get JobMasterGateway for initializing job. "
+				+ "The requested operation is not available while the JobManager is initializing."));
+		}
+		return job.getJobMasterGateway();
+	}
+
+	private <T> CompletableFuture<T> performOperationOnJobMasterGateway(JobID jobId, Function<JobMasterGateway, CompletableFuture<T>> operation) {
+		return getJobMasterGateway(jobId).thenCompose(operation);
 	}
 
 	private CompletableFuture<ResourceManagerGateway> getResourceManagerGateway() {
 		return resourceManagerGatewayRetriever.getFuture();
 	}
 
+	private Optional<DispatcherJob> getDispatcherJob(JobID jobId) {
+		return Optional.ofNullable(runningJobs.get(jobId));
+	}
+
 	private <T> CompletableFuture<T> runResourceManagerCommand(Function<ResourceManagerGateway, CompletableFuture<T>> resourceManagerCommand) {
 		return getResourceManagerGateway().thenApply(resourceManagerCommand).thenCompose(Function.identity());
 	}
@@ -835,51 +793,47 @@ public abstract class Dispatcher extends PermanentlyFencedRpcEndpoint<Dispatcher
 	}
 
 	@Nonnull
-	private <T> List<CompletableFuture<Optional<T>>> queryJobMastersForInformation(Function<JobMasterGateway, CompletableFuture<T>> queryFunction) {
-		final int numberJobsRunning = jobManagerRunnerFutures.size();
-
-		ArrayList<CompletableFuture<Optional<T>>> optionalJobInformation = new ArrayList<>(
-			numberJobsRunning);
+	private <T> List<CompletableFuture<Optional<T>>> queryJobMastersForInformation(Function<DispatcherJob, CompletableFuture<T>> queryFunction) {
 
-		for (JobID jobId : jobManagerRunnerFutures.keySet()) {
-			final CompletableFuture<JobMasterGateway> jobMasterGatewayFuture = getJobMasterGatewayFuture(jobId);
+		List<CompletableFuture<Optional<T>>> optionalJobInformation = new ArrayList<>(
+			runningJobs.size());
 
-			final CompletableFuture<Optional<T>> optionalRequest = jobMasterGatewayFuture
-				.thenCompose(queryFunction::apply)
-				.handle((T value, Throwable throwable) -> Optional.ofNullable(value));
-
-			optionalJobInformation.add(optionalRequest);
+		for (DispatcherJob job : runningJobs.values()) {
+			final CompletableFuture<Optional<T>> queryResult = queryFunction.apply(job)
+					.handle((T value, Throwable t) -> Optional.ofNullable(value));
+			optionalJobInformation.add(queryResult);
 		}
 		return optionalJobInformation;
 	}
 
-	private CompletableFuture<Void> waitForTerminatingJobManager(JobID jobId, JobGraph jobGraph, FunctionWithException<JobGraph, CompletableFuture<Void>, ?> action) {
+	private CompletableFuture<Void> waitForTerminatingJob(JobID jobId, JobGraph jobGraph, ThrowingConsumer<JobGraph, ?> action) {
 		final CompletableFuture<Void> jobManagerTerminationFuture = getJobTerminationFuture(jobId)
 			.exceptionally((Throwable throwable) -> {
 				throw new CompletionException(
 					new DispatcherException(
 						String.format("Termination of previous JobManager for job %s failed. Cannot submit job under the same job id.", jobId),
-						throwable)); });
+						throwable));
+			});
 
-		return jobManagerTerminationFuture.thenComposeAsync(
-			FunctionUtils.uncheckedFunction((ignored) -> {
-				jobManagerTerminationFutures.remove(jobId);
-				return action.apply(jobGraph);
+		return jobManagerTerminationFuture.thenAcceptAsync(
+			FunctionUtils.uncheckedConsumer((ignored) -> {
+				dispatcherJobTerminationFutures.remove(jobId);
+				action.accept(jobGraph);
 			}),
 			getMainThreadExecutor());
 	}
 
 	CompletableFuture<Void> getJobTerminationFuture(JobID jobId) {
-		if (jobManagerRunnerFutures.containsKey(jobId)) {
+		if (runningJobs.containsKey(jobId)) {
 			return FutureUtils.completedExceptionally(new DispatcherException(String.format("Job with job id %s is still running.", jobId)));
 		} else {
-			return jobManagerTerminationFutures.getOrDefault(jobId, CompletableFuture.completedFuture(null));
+			return dispatcherJobTerminationFutures.getOrDefault(jobId, CompletableFuture.completedFuture(null));
 		}
 	}
 
 	private void registerDispatcherMetrics(MetricGroup jobManagerMetricGroup) {
 		jobManagerMetricGroup.gauge(MetricNames.NUM_RUNNING_JOBS,
-			() -> (long) jobManagerRunnerFutures.size());
+			() -> (long) runningJobs.size());
 	}
 
 	public CompletableFuture<Void> onRemovedJobGraph(JobID jobId) {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherJob.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherJob.java
new file mode 100644
index 0000000..58f826b
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherJob.java
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.jobmaster.JobManagerRunner;
+import org.apache.flink.runtime.jobmaster.JobMasterGateway;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.messages.webmonitor.JobDetails;
+import org.apache.flink.util.AutoCloseableAsync;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.GuardedBy;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Abstraction used by the {@link Dispatcher} to manage jobs.
+ */
+public final class DispatcherJob implements AutoCloseableAsync {
+
+	private final Logger log = LoggerFactory.getLogger(DispatcherJob.class);
+
+	private final CompletableFuture<JobManagerRunner> jobManagerRunnerFuture;
+	private final CompletableFuture<DispatcherJobResult> jobResultFuture;
+	private final CompletableFuture<Void> terminationFuture = new CompletableFuture<>();
+
+	private final long initializationTimestamp;
+	private final JobID jobId;
+	private final String jobName;
+
+	private final Object lock = new Object();
+
+	// internal field to track job status during initialization. Is not updated anymore after
+	// job is initialized, cancelled or failed.
+	@GuardedBy("lock")
+	private DispatcherJobStatus jobStatus = DispatcherJobStatus.INITIALIZING;
+
+	private enum DispatcherJobStatus {
+		// We are waiting for the JobManagerRunner to be initialized
+		INITIALIZING(JobStatus.INITIALIZING),
+		// JobManagerRunner is initialized
+		JOB_MANAGER_RUNNER_INITIALIZED(null),
+		// waiting for cancellation. We stay in this status until the job result future completed,
+		// then we consider the JobManager to be initialized.
+		CANCELLING(JobStatus.CANCELLING);
+
+		@Nullable
+		private final JobStatus jobStatus;
+
+		DispatcherJobStatus(JobStatus jobStatus) {
+			this.jobStatus = jobStatus;
+		}
+
+		public JobStatus asJobStatus() {
+			if (jobStatus == null) {
+				throw new IllegalStateException("This state is not defined as a 'JobStatus'");
+			}
+			return jobStatus;
+		}
+	}
+
+	static DispatcherJob createFor(
+			CompletableFuture<JobManagerRunner> jobManagerRunnerFuture,
+			JobID jobId,
+			String jobName) {
+		return new DispatcherJob(jobManagerRunnerFuture, jobId, jobName);
+	}
+
+	private DispatcherJob(
+			CompletableFuture<JobManagerRunner> jobManagerRunnerFuture,
+			JobID jobId,
+			String jobName) {
+		this.jobManagerRunnerFuture = jobManagerRunnerFuture;
+		this.jobId = jobId;
+		this.jobName = jobName;
+		this.initializationTimestamp = System.currentTimeMillis();
+		this.jobResultFuture = new CompletableFuture<>();
+
+		FutureUtils.assertNoException(this.jobManagerRunnerFuture.handle((jobManagerRunner, throwable) -> {
+			// JM has been initialized, or the initialization failed
+			synchronized (lock) {
+				jobStatus = DispatcherJobStatus.JOB_MANAGER_RUNNER_INITIALIZED;
+				if (throwable == null) { // initialization succeeded
+					// Forward result future
+					jobManagerRunner.getResultFuture().whenComplete((archivedExecutionGraph, resultThrowable) -> {
+						if (archivedExecutionGraph != null) {
+							jobResultFuture.complete(DispatcherJobResult.forSuccess(archivedExecutionGraph));
+						} else {
+							jobResultFuture.completeExceptionally(ExceptionUtils.stripCompletionException(resultThrowable));
+						}
+					});
+				} else { // failure during initialization
+					final Throwable strippedThrowable = ExceptionUtils.stripCompletionException(throwable);
+					ArchivedExecutionGraph archivedExecutionGraph = ArchivedExecutionGraph.createFromInitializingJob(
+						jobId,
+						jobName,
+						JobStatus.FAILED,
+						strippedThrowable,
+						initializationTimestamp);
+					jobResultFuture.complete(DispatcherJobResult.forInitializationFailure(archivedExecutionGraph, strippedThrowable));
+				}
+			}
+			return null;
+		}));
+	}
+
+	public CompletableFuture<DispatcherJobResult> getResultFuture() {
+		return jobResultFuture;
+	}
+
+	public CompletableFuture<JobDetails> requestJobDetails(Time timeout) {
+		return requestJobStatus(timeout).thenApply(status -> {
+			int[] tasksPerState = new int[ExecutionState.values().length];
+			synchronized (lock) {
+				return new JobDetails(
+					jobId,
+					jobName,
+					initializationTimestamp,
+					0,
+					0,
+					status,
+					0,
+					tasksPerState,
+					0);
+			}
+		});
+	}
+
+	/**
+	 * Cancel job.
+	 * A cancellation will be scheduled if the initialization is not completed.
+	 * The returned future will complete exceptionally if the JobManagerRunner initialization failed.
+	 */
+	public CompletableFuture<Acknowledge> cancel(Time timeout) {
+		synchronized (lock) {
+			if (isInitialized()) {
+				return getJobMasterGateway().thenCompose(jobMasterGateway -> jobMasterGateway.cancel(timeout));
+			} else {
+				log.info("Cancellation during initialization requested for job {}. Job will be cancelled once JobManager has been initialized.", jobId);
+
+				// cancel job
+				CompletableFuture<Acknowledge> cancelFuture = jobManagerRunnerFuture
+					.thenCompose(JobManagerRunner::getJobMasterGateway)
+					.thenCompose(jobMasterGateway -> jobMasterGateway.cancel(timeout));
+				cancelFuture.whenComplete((ignored, cancelThrowable) -> {
+					if (cancelThrowable != null) {
+						log.warn("Cancellation of job {} failed", jobId, cancelThrowable);
+					}
+				});
+				jobStatus = DispatcherJobStatus.CANCELLING;
+				return cancelFuture;
+			}
+		}
+	}
+
+	public CompletableFuture<JobStatus> requestJobStatus(Time timeout) {
+		return requestJob(timeout).thenApply(ArchivedExecutionGraph::getState);
+	}
+
+	/**
+	 * Returns a future completing to the ArchivedExecutionGraph of the job.
+	 */
+	public CompletableFuture<ArchivedExecutionGraph> requestJob(Time timeout) {
+		synchronized (lock) {
+			if (isInitialized()) {
+				if (jobResultFuture.isDone()) { // job is not running anymore
+					return jobResultFuture.thenApply(DispatcherJobResult::getArchivedExecutionGraph);
+				}
+				// job is still running
+				return getJobMasterGateway().thenCompose(jobMasterGateway -> jobMasterGateway.requestJob(
+					timeout));
+			} else {
+				Preconditions.checkState(this.jobStatus == DispatcherJobStatus.INITIALIZING || jobStatus == DispatcherJobStatus.CANCELLING);
+				return CompletableFuture.completedFuture(
+					ArchivedExecutionGraph.createFromInitializingJob(
+						jobId,
+						jobName,
+						jobStatus.asJobStatus(),
+						null,
+						initializationTimestamp));
+			}
+		}
+	}
+
+	/**
+	 * The job is initialized once the JobManager runner has been initialized.
+	 * It is also initialized if the runner initialization failed, or of it has been
+	 * canceled (and the cancellation is complete).
+	 */
+	public boolean isInitialized() {
+		synchronized (lock) {
+			return jobStatus == DispatcherJobStatus.JOB_MANAGER_RUNNER_INITIALIZED;
+		}
+	}
+
+	/**
+	 * Returns the {@link JobMasterGateway} from the JobManagerRunner.
+	 *
+	 * @return the {@link JobMasterGateway}. The future will complete exceptionally if the JobManagerRunner initialization failed.
+	 * @throws IllegalStateException is thrown if the job is not initialized
+	 */
+	public CompletableFuture<JobMasterGateway> getJobMasterGateway() {
+		Preconditions.checkState(
+			isInitialized(),
+			"JobMaster Gateway is not available during initialization");
+		return jobManagerRunnerFuture.thenCompose(JobManagerRunner::getJobMasterGateway);
+	}
+
+	@Override
+	public CompletableFuture<Void> closeAsync() {
+		FutureUtils.assertNoException(jobManagerRunnerFuture.handle((runner, throwable) -> {
+			if (throwable == null) {
+				// init was successful: close jobManager runner.
+				CompletableFuture<Void> jobManagerRunnerClose = jobManagerRunnerFuture.thenCompose(
+					AutoCloseableAsync::closeAsync);
+				FutureUtils.forward(jobManagerRunnerClose, terminationFuture);
+			} else {
+				// initialization has failed: complete termination.
+				terminationFuture.complete(null);
+			}
+			return null;
+		}));
+		return terminationFuture;
+	}
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherJobResult.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherJobResult.java
new file mode 100644
index 0000000..ade3459
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherJobResult.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+/**
+ * Container for returning the {@link ArchivedExecutionGraph} and a flag whether the initialization has failed.
+ * For initialization failures, the throwable is also attached, to avoid deserializing it from the ArchivedExecutionGraph.
+ */
+final class DispatcherJobResult {
+
+	private final ArchivedExecutionGraph archivedExecutionGraph;
+
+	// if the throwable field is set, the job failed during initialization.
+	@Nullable
+	private final Throwable initializationFailure;
+
+	private DispatcherJobResult(ArchivedExecutionGraph archivedExecutionGraph, @Nullable Throwable throwable) {
+		this.archivedExecutionGraph = archivedExecutionGraph;
+		this.initializationFailure = throwable;
+	}
+
+	public boolean isInitializationFailure() {
+		return initializationFailure != null;
+	}
+
+	public ArchivedExecutionGraph getArchivedExecutionGraph() {
+		return archivedExecutionGraph;
+	}
+
+	/**
+	 * @throws IllegalStateException if this DispatcherJobResult is a successful initialization.
+	 */
+	public Throwable getInitializationFailure() {
+		Preconditions.checkState(isInitializationFailure(), "This DispatcherJobResult does not represent a failed initialization.");
+		return initializationFailure;
+	}
+
+	public static DispatcherJobResult forInitializationFailure(ArchivedExecutionGraph archivedExecutionGraph, Throwable throwable) {
+		return new DispatcherJobResult(archivedExecutionGraph, throwable);
+	}
+
+	public static DispatcherJobResult forSuccess(ArchivedExecutionGraph archivedExecutionGraph) {
+		return new DispatcherJobResult(archivedExecutionGraph, null);
+	}
+}
diff --git a/flink-runtime-web/web-dashboard/src/app/app.config.ts b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/UnavailableDispatcherOperationException.java
similarity index 66%
copy from flink-runtime-web/web-dashboard/src/app/app.config.ts
copy to flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/UnavailableDispatcherOperationException.java
index 6d41b67..6ce4251 100644
--- a/flink-runtime-web/web-dashboard/src/app/app.config.ts
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/UnavailableDispatcherOperationException.java
@@ -16,20 +16,15 @@
  * limitations under the License.
  */
 
-export const BASE_URL = '.';
-export const COLOR_MAP = {
-  TOTAL: '#112641',
-  RUNNING: '#52c41a',
-  FAILED: '#f5222d',
-  FINISHED: '#1890ff',
-  CANCELED: '#fa8c16',
-  CANCELING: '#faad14',
-  CREATED: '#2f54eb',
-  DEPLOYING: '#13c2c2',
-  RECONCILING: '#eb2f96',
-  IN_PROGRESS: '#faad14',
-  SCHEDULED: '#722ed1',
-  COMPLETED: '#1890ff',
-  RESTARTING: '#13c2c2'
-};
-export const LONG_MIN_VALUE = -9223372036854776000;
+package org.apache.flink.runtime.dispatcher;
+
+/**
+ * Exception indicating that a Dispatcher operation is temporarily unavailable.
+ */
+public class UnavailableDispatcherOperationException extends DispatcherException {
+	private static final long serialVersionUID = -45499335133622792L;
+
+	public UnavailableDispatcherOperationException(String message) {
+		super(message);
+	}
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraph.java
index 8461138..c011739 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraph.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.executiongraph;
 
 import org.apache.flink.api.common.ArchivedExecutionConfig;
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
@@ -351,4 +352,52 @@ public class ArchivedExecutionGraph implements AccessExecutionGraph, Serializabl
 			executionGraph.getCheckpointStatsSnapshot(),
 			executionGraph.getStateBackendName().orElse(null));
 	}
+
+	/**
+	 * Create a sparse ArchivedExecutionGraph for a job while it is still initializing.
+	 * Most fields will be empty, only job status and error-related fields are set.
+	 */
+	public static ArchivedExecutionGraph createFromInitializingJob(
+			JobID jobId,
+			String jobName,
+			JobStatus jobStatus,
+			@Nullable Throwable throwable,
+			long initializationTimestamp) {
+		Map<JobVertexID, ArchivedExecutionJobVertex> archivedTasks = Collections.emptyMap();
+		List<ArchivedExecutionJobVertex> archivedVerticesInCreationOrder = Collections.emptyList();
+		final Map<String, SerializedValue<OptionalFailure<Object>>> serializedUserAccumulators = Collections.emptyMap();
+		StringifiedAccumulatorResult[] archivedUserAccumulators = new StringifiedAccumulatorResult[]{};
+
+		final long[] timestamps = new long[JobStatus.values().length];
+		timestamps[JobStatus.INITIALIZING.ordinal()] = initializationTimestamp;
+
+		String jsonPlan = "{}";
+
+		ErrorInfo failureInfo = null;
+		if (throwable != null) {
+			Preconditions.checkState(jobStatus == JobStatus.FAILED);
+			long failureTime = System.currentTimeMillis();
+			failureInfo = new ErrorInfo(throwable, failureTime);
+			timestamps[JobStatus.FAILED.ordinal()] = failureTime;
+		}
+
+		return new ArchivedExecutionGraph(
+			jobId,
+			jobName,
+			archivedTasks,
+			archivedVerticesInCreationOrder,
+			timestamps,
+			jobStatus,
+			failureInfo,
+			jsonPlan,
+			archivedUserAccumulators,
+			serializedUserAccumulators,
+			new ExecutionConfig().archive(),
+			false,
+			null,
+			null,
+			null);
+
+	}
+
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherJobTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherJobTest.java
new file mode 100644
index 0000000..9c0784f
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherJobTest.java
@@ -0,0 +1,337 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.core.testutils.CommonTestUtils;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobmaster.JobManagerRunner;
+import org.apache.flink.runtime.jobmaster.TestingJobManagerRunner;
+import org.apache.flink.runtime.jobmaster.utils.TestingJobMasterGateway;
+import org.apache.flink.runtime.jobmaster.utils.TestingJobMasterGatewayBuilder;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.messages.webmonitor.JobDetails;
+import org.apache.flink.runtime.testtasks.NoOpInvokable;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+
+import static org.hamcrest.core.Is.is;
+import static org.hamcrest.core.StringContains.containsString;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Test for the {@link DispatcherJob} class.
+ */
+public class DispatcherJobTest extends TestLogger {
+
+	private static final Time TIMEOUT = Time.seconds(10L);
+	private static final JobID TEST_JOB_ID = new JobID();
+
+	@Test
+	public void testStatusWhenInitializing() throws Exception {
+		TestContext testContext = createTestContext();
+		DispatcherJob dispatcherJob = testContext.getDispatcherJob();
+
+		assertThat(dispatcherJob.isInitialized(), is(false));
+		assertThat(dispatcherJob.getResultFuture().isDone(), is(false));
+		assertJobStatus(dispatcherJob, JobStatus.INITIALIZING);
+	}
+
+	@Test
+	public void testStatusWhenRunning() throws Exception {
+		TestContext testContext = createTestContext();
+		DispatcherJob dispatcherJob = testContext.getDispatcherJob();
+
+		// finish initialization
+		testContext.setRunning();
+
+		assertJobStatus(dispatcherJob, JobStatus.RUNNING);
+
+		// result future not done
+		assertThat(dispatcherJob.getResultFuture().isDone(), is(false));
+
+		assertThat(dispatcherJob.isInitialized(), is(true));
+	}
+
+	@Test
+	public void testStatusWhenJobFinished() throws Exception {
+		TestContext testContext = createTestContext();
+		DispatcherJob dispatcherJob = testContext.getDispatcherJob();
+
+		// finish job
+		testContext.setRunning();
+		testContext.finishJob();
+
+		assertJobStatus(dispatcherJob, JobStatus.FINISHED);
+
+		// assert result future done
+		DispatcherJobResult result = dispatcherJob.getResultFuture().get();
+
+		assertThat(result.getArchivedExecutionGraph().getState(), is(JobStatus.FINISHED));
+	}
+
+	@Test
+	public void testStatusWhenCancellingWhileInitializing() throws Exception {
+		TestContext testContext = createTestContext();
+		DispatcherJob dispatcherJob = testContext.getDispatcherJob();
+		assertJobStatus(dispatcherJob, JobStatus.INITIALIZING);
+
+		CompletableFuture<Acknowledge> cancelFuture = dispatcherJob.cancel(
+			TIMEOUT);
+
+		assertThat(cancelFuture.isDone(), is(false));
+		assertThat(dispatcherJob.isInitialized(), is(false));
+
+		assertJobStatus(dispatcherJob, JobStatus.CANCELLING);
+
+		testContext.setRunning();
+		testContext.finishCancellation();
+
+		// assert that cancel future completes
+		cancelFuture.get();
+
+		assertJobStatus(dispatcherJob, JobStatus.CANCELED);
+		assertThat(dispatcherJob.isInitialized(), is(true));
+		// assert that the result future completes
+		assertThat(dispatcherJob.getResultFuture().get().getArchivedExecutionGraph().getState(), is(JobStatus.CANCELED));
+	}
+
+	@Test
+	public void testStatusWhenCancellingWhileRunning() throws Exception {
+		TestContext testContext = createTestContext();
+		DispatcherJob dispatcherJob = testContext.getDispatcherJob();
+
+		testContext.setRunning();
+		CompletableFuture<Acknowledge> cancelFuture = dispatcherJob.cancel(TIMEOUT);
+
+		assertJobStatus(dispatcherJob, JobStatus.CANCELLING);
+		testContext.finishCancellation();
+
+		cancelFuture.get();
+		assertJobStatus(dispatcherJob, JobStatus.CANCELED);
+		assertThat(dispatcherJob.getResultFuture().get().getArchivedExecutionGraph().getState(), is(JobStatus.CANCELED));
+	}
+
+	@Test
+	public void testStatusWhenCancellingWhileFailed() throws Exception {
+		TestContext testContext = createTestContext();
+		DispatcherJob dispatcherJob = testContext.getDispatcherJob();
+
+		RuntimeException exception = new RuntimeException("Artificial failure in runner initialization");
+		testContext.failInitialization(exception);
+
+		assertJobStatus(dispatcherJob, JobStatus.FAILED);
+
+		CommonTestUtils.assertThrows("Artificial failure", ExecutionException.class, () -> dispatcherJob.cancel(TIMEOUT).get());
+
+		assertJobStatus(dispatcherJob, JobStatus.FAILED);
+	}
+
+	@Test
+	public void testErrorWhileInitializing() throws Exception {
+		TestContext testContext = createTestContext();
+		DispatcherJob dispatcherJob = testContext.getDispatcherJob();
+
+		// now fail
+		RuntimeException exception = new RuntimeException("Artificial failure in runner initialization");
+		testContext.failInitialization(exception);
+
+		assertThat(dispatcherJob.isInitialized(), is(true));
+		assertJobStatus(dispatcherJob, JobStatus.FAILED);
+
+		ArchivedExecutionGraph aeg = dispatcherJob.getResultFuture().get().getArchivedExecutionGraph();
+		assertThat(aeg.getFailureInfo().getException().deserializeError(ClassLoader.getSystemClassLoader()), is(exception));
+	}
+
+	@Test
+	public void testDispatcherJobResult() throws Exception {
+		TestContext testContext = createTestContext();
+		DispatcherJob dispatcherJob = testContext.getDispatcherJob();
+		testContext.failInitialization(new RuntimeException("Artificial failure in runner initialization"));
+
+		DispatcherJobResult result = dispatcherJob.getResultFuture().get();
+		assertThat(result.isInitializationFailure(), is(true));
+		assertThat(result.getArchivedExecutionGraph().getState(), is(JobStatus.FAILED));
+		assertThat(result.getInitializationFailure().getMessage(), containsString("Artificial failure"));
+	}
+
+	@Test
+	public void testCloseWhileInitializingSuccessfully() throws Exception {
+		TestContext testContext = createTestContext();
+		DispatcherJob dispatcherJob = testContext.getDispatcherJob();
+
+		CompletableFuture<Void> closeFuture = dispatcherJob.closeAsync();
+		assertThat(closeFuture.isDone(), is(false));
+
+		// set job running, so that we can cancel it
+		testContext.setRunning();
+
+		// assert future completes now
+		closeFuture.get();
+
+		// ensure the result future is complete (how it completes is up to the JobManager)
+		CompletableFuture<DispatcherJobResult> resultFuture = dispatcherJob.getResultFuture();
+		CommonTestUtils.assertThrows("has not been finished", ExecutionException.class,
+			resultFuture::get);
+	}
+
+	@Test
+	public void testCloseWhileInitializingErroneously() throws Exception {
+		TestContext testContext = createTestContext();
+		DispatcherJob dispatcherJob = testContext.getDispatcherJob();
+
+		CompletableFuture<Void> closeFuture = dispatcherJob.closeAsync();
+		assertThat(closeFuture.isDone(), is(false));
+
+		testContext.failInitialization(new RuntimeException("fail"));
+
+		// assert future completes now
+		closeFuture.get();
+
+		// ensure the result future is complete
+		dispatcherJob.getResultFuture().get();
+	}
+
+	@Test
+	public void testCloseWhileRunning() throws Exception {
+		TestContext testContext = createTestContext();
+		DispatcherJob dispatcherJob = testContext.getDispatcherJob();
+
+		// complete JobManager runner future to indicate to the DispatcherJob that the Runner has been initialized
+		testContext.setRunning();
+
+		CompletableFuture<Void> closeFuture = dispatcherJob.closeAsync();
+
+		closeFuture.get();
+
+		// result future should complete exceptionally.
+		CompletableFuture<DispatcherJobResult> resultFuture = dispatcherJob.getResultFuture();
+		CommonTestUtils.assertThrows("has not been finished", ExecutionException.class,
+			resultFuture::get);
+	}
+
+	@Test(expected = IllegalStateException.class)
+	public void testUnavailableJobMasterGateway() {
+		TestContext testContext = createTestContext();
+		DispatcherJob dispatcherJob = testContext.getDispatcherJob();
+		dispatcherJob.getJobMasterGateway();
+	}
+
+	private TestContext createTestContext() {
+		final JobVertex testVertex = new JobVertex("testVertex");
+		testVertex.setInvokableClass(NoOpInvokable.class);
+
+		JobGraph jobGraph = new JobGraph(TEST_JOB_ID, "testJob", testVertex);
+		CompletableFuture<JobManagerRunner> jobManagerRunnerCompletableFuture = new CompletableFuture<>();
+		DispatcherJob dispatcherJob = DispatcherJob.createFor(jobManagerRunnerCompletableFuture,
+			jobGraph.getJobID(), jobGraph.getName());
+
+		return new TestContext(
+			jobManagerRunnerCompletableFuture,
+			dispatcherJob,
+			jobGraph);
+	}
+
+	private static class TestContext {
+		private final CompletableFuture<JobManagerRunner> jobManagerRunnerCompletableFuture;
+		private final DispatcherJob dispatcherJob;
+		private final JobGraph jobGraph;
+		private final TestingJobMasterGateway mockRunningJobMasterGateway;
+		private final CompletableFuture<Acknowledge> cancellationFuture;
+
+		private JobStatus internalJobStatus = JobStatus.INITIALIZING;
+		private CompletableFuture<ArchivedExecutionGraph> resultFuture = new CompletableFuture<>();
+
+		public TestContext (
+				CompletableFuture<JobManagerRunner> jobManagerRunnerCompletableFuture,
+				DispatcherJob dispatcherJob,
+				JobGraph jobGraph) {
+			this.jobManagerRunnerCompletableFuture = jobManagerRunnerCompletableFuture;
+			this.dispatcherJob = dispatcherJob;
+			this.jobGraph = jobGraph;
+
+			this.cancellationFuture = new CompletableFuture<>();
+			this.mockRunningJobMasterGateway = new TestingJobMasterGatewayBuilder()
+				.setRequestJobSupplier(() -> CompletableFuture.completedFuture(ArchivedExecutionGraph.createFromInitializingJob(getJobID(), "test", internalJobStatus, null, 1337)))
+				.setRequestJobDetailsSupplier(() -> {
+					JobDetails jobDetails = new JobDetails(getJobID(), "", 0, 0, 0, internalJobStatus, 0,
+						new int[]{0, 0, 0, 0, 0, 0, 0, 0, 0}, 0);
+					return CompletableFuture.completedFuture(jobDetails);
+				})
+				// once JobManagerRunner is initialized, complete result future with CANCELLED AEG and ack cancellation.
+				.setCancelFunction(() -> {
+					internalJobStatus = JobStatus.CANCELLING;
+					return cancellationFuture;
+				})
+				.build();
+		}
+
+		public JobID getJobID() {
+			return jobGraph.getJobID();
+		}
+
+		public void failInitialization(Throwable ex) {
+			jobManagerRunnerCompletableFuture.completeExceptionally(ex);
+		}
+
+		public DispatcherJob getDispatcherJob() {
+			return dispatcherJob;
+		}
+
+		public void setRunning() {
+			internalJobStatus = JobStatus.RUNNING;
+			JobManagerRunner jobManagerRunner = new TestingJobManagerRunner.Builder()
+				.setJobId(getJobID())
+				.setBlockingTermination(false)
+				.setJobMasterGatewayFuture(CompletableFuture.completedFuture(mockRunningJobMasterGateway))
+				.setResultFuture(resultFuture)
+				.build();
+			jobManagerRunnerCompletableFuture.complete(jobManagerRunner);
+		}
+
+		public void finishJob() {
+			internalJobStatus = JobStatus.FINISHED;
+			resultFuture.complete(
+				ArchivedExecutionGraph.createFromInitializingJob(getJobID(), "test", JobStatus.FINISHED, null, 1337));
+		}
+
+		public void finishCancellation() {
+			jobManagerRunnerCompletableFuture.thenAccept(runner -> {
+				internalJobStatus = JobStatus.CANCELED;
+				runner.getResultFuture()
+					.complete(ArchivedExecutionGraph.createFromInitializingJob(getJobID(), "test", JobStatus.CANCELED, null, 1337));
+				cancellationFuture.complete(Acknowledge.get());
+			});
+		}
+	}
+
+	private void assertJobStatus(DispatcherJob dispatcherJob, JobStatus expectedStatus) throws Exception {
+		assertThat(dispatcherJob.requestJobDetails(TIMEOUT).get().getStatus(), is(expectedStatus));
+		assertThat(dispatcherJob.requestJob(TIMEOUT).get().getState(), is(expectedStatus));
+		assertThat(dispatcherJob.requestJobStatus(TIMEOUT).get(), is(expectedStatus));
+	}
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java
index d2ad995..1caec0d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java
@@ -31,7 +31,7 @@ import org.apache.flink.runtime.blob.PermanentBlobKey;
 import org.apache.flink.runtime.blob.TestingBlobStore;
 import org.apache.flink.runtime.blob.TestingBlobStoreBuilder;
 import org.apache.flink.runtime.client.DuplicateJobSubmissionException;
-import org.apache.flink.runtime.client.JobSubmissionException;
+import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
@@ -58,6 +58,7 @@ import org.apache.flink.runtime.util.TestingFatalErrorHandlerResource;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.SerializedThrowable;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.After;
@@ -75,6 +76,7 @@ import javax.annotation.Nonnull;
 import java.io.File;
 import java.io.IOException;
 import java.util.Collections;
+import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
@@ -280,15 +282,16 @@ public class DispatcherResourceCleanupTest extends TestLogger {
 	@Test
 	public void testBlobServerCleanupWhenJobSubmissionFails() throws Exception {
 		startDispatcher(new FailingJobManagerRunnerFactory(new FlinkException("Test exception")));
-		final CompletableFuture<Acknowledge> submissionFuture = dispatcherGateway.submitJob(jobGraph, timeout);
+		dispatcherGateway.submitJob(jobGraph, timeout).get();
 
-		try {
-			submissionFuture.get();
-			fail("Job submission was expected to fail.");
-		} catch (ExecutionException ee) {
-			assertThat(ExceptionUtils.findThrowable(ee, JobSubmissionException.class).isPresent(), is(true));
-		}
+		Optional<SerializedThrowable> maybeError = dispatcherGateway.requestJobResult(
+			jobId,
+			timeout).get().getSerializedThrowable();
+
+		assertThat(maybeError.isPresent(), is(true));
+		Throwable exception = maybeError.get().deserializeError(this.getClass().getClassLoader());
 
+		assertThat(ExceptionUtils.findThrowable(exception, JobExecutionException.class).isPresent(), is(true));
 		assertThatHABlobsHaveBeenRemoved();
 	}
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
index 854cfcd..51e43f7 100755
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
@@ -21,7 +21,9 @@ package org.apache.flink.runtime.dispatcher;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.api.common.operators.ResourceSpec;
+import org.apache.flink.api.common.time.Deadline;
 import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.BlobServerOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.testutils.OneShotLatch;
@@ -30,7 +32,9 @@ import org.apache.flink.runtime.blob.VoidBlobStore;
 import org.apache.flink.runtime.checkpoint.Checkpoints;
 import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
 import org.apache.flink.runtime.checkpoint.metadata.CheckpointMetadata;
+import org.apache.flink.runtime.client.JobInitializationException;
 import org.apache.flink.runtime.client.JobSubmissionException;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
 import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
 import org.apache.flink.runtime.executiongraph.ErrorInfo;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
@@ -45,10 +49,13 @@ import org.apache.flink.runtime.jobmaster.JobNotFinishedException;
 import org.apache.flink.runtime.jobmaster.JobResult;
 import org.apache.flink.runtime.jobmaster.TestingJobManagerRunner;
 import org.apache.flink.runtime.jobmaster.factories.JobManagerJobMetricGroupFactory;
+import org.apache.flink.runtime.jobmaster.utils.TestingJobMasterGateway;
+import org.apache.flink.runtime.jobmaster.utils.TestingJobMasterGatewayBuilder;
 import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
 import org.apache.flink.runtime.leaderretrieval.SettableLeaderRetrievalService;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
+import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
 import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
 import org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway;
 import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder;
@@ -62,6 +69,7 @@ import org.apache.flink.runtime.state.CheckpointStorageLocation;
 import org.apache.flink.runtime.state.CompletedCheckpointStorageLocation;
 import org.apache.flink.runtime.state.StateBackend;
 import org.apache.flink.runtime.testtasks.NoOpInvokable;
+import org.apache.flink.runtime.testutils.CommonTestUtils;
 import org.apache.flink.runtime.testutils.TestingJobGraphStore;
 import org.apache.flink.runtime.util.TestingFatalErrorHandler;
 import org.apache.flink.runtime.util.TestingFatalErrorHandlerResource;
@@ -73,6 +81,7 @@ import org.apache.flink.util.function.ThrowingRunnable;
 import org.hamcrest.Matchers;
 import org.junit.After;
 import org.junit.AfterClass;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Rule;
@@ -89,6 +98,8 @@ import java.net.URISyntaxException;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.Paths;
+import java.time.Duration;
+import java.time.temporal.ChronoUnit;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Optional;
@@ -103,7 +114,6 @@ import java.util.concurrent.TimeoutException;
 import static org.hamcrest.Matchers.empty;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.instanceOf;
-import static org.hamcrest.Matchers.notNullValue;
 import static org.hamcrest.core.Is.is;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertThat;
@@ -201,7 +211,10 @@ public class DispatcherTest extends TestLogger {
 		return dispatcher;
 	}
 
-	private class TestingDispatcherBuilder {
+	/**
+	 * Builder for the TestingDispatcher.
+	 */
+	public class TestingDispatcherBuilder {
 
 		private DispatcherBootstrap dispatcherBootstrap = new DefaultDispatcherBootstrap(Collections.emptyList());
 
@@ -285,12 +298,11 @@ public class DispatcherTest extends TestLogger {
 	@Test
 	public void testJobSubmission() throws Exception {
 		dispatcher = createAndStartDispatcher(heartbeatServices, haServices, new ExpectedJobIdJobManagerRunnerFactory(TEST_JOB_ID, createdJobManagerRunnerLatch));
-
 		DispatcherGateway dispatcherGateway = dispatcher.getSelfGateway(DispatcherGateway.class);
 
-		CompletableFuture<Acknowledge> acknowledgeFuture = dispatcherGateway.submitJob(jobGraph, TIMEOUT);
+		dispatcherGateway.submitJob(jobGraph, TIMEOUT).get();
 
-		acknowledgeFuture.get();
+		jobMasterLeaderElectionService.getStartFuture().get();
 
 		assertTrue(
 			"jobManagerRunner was not started",
@@ -329,6 +341,111 @@ public class DispatcherTest extends TestLogger {
 		}
 	}
 
+	@Test(timeout = 5_000L)
+	public void testNonBlockingJobSubmission() throws Exception {
+		dispatcher = createAndStartDispatcher(heartbeatServices, haServices, new ExpectedJobIdJobManagerRunnerFactory(TEST_JOB_ID, createdJobManagerRunnerLatch));
+		jobMasterLeaderElectionService.isLeader(UUID.randomUUID());
+		DispatcherGateway dispatcherGateway = dispatcher.getSelfGateway(DispatcherGateway.class);
+
+		Tuple2<JobGraph, BlockingJobVertex> blockingJobGraph = getBlockingJobGraphAndVertex();
+		JobID jobID = blockingJobGraph.f0.getJobID();
+
+		dispatcherGateway.submitJob(blockingJobGraph.f0, TIMEOUT).get();
+
+		// ensure INITIALIZING status
+		assertThat(dispatcherGateway.requestJobStatus(jobID, TIMEOUT).get(), is(JobStatus.INITIALIZING));
+
+		// ensure correct JobDetails
+		MultipleJobsDetails multiDetails = dispatcherGateway.requestMultipleJobDetails(TIMEOUT).get();
+		assertEquals(1, multiDetails.getJobs().size());
+		assertEquals(jobID, multiDetails.getJobs().iterator().next().getJobId());
+
+		// submission has succeeded, let the initialization finish.
+		blockingJobGraph.f1.unblock();
+
+		// ensure job is running
+		CommonTestUtils.waitUntilCondition(() -> dispatcherGateway.requestJobStatus(jobGraph.getJobID(), TIMEOUT).get() == JobStatus.RUNNING,
+			Deadline.fromNow(Duration.of(10, ChronoUnit.SECONDS)), 5L);
+	}
+
+	@Test(timeout = 5_000L)
+	public void testInvalidCallDuringInitialization() throws Exception {
+		dispatcher = createAndStartDispatcher(heartbeatServices, haServices, new ExpectedJobIdJobManagerRunnerFactory(TEST_JOB_ID, createdJobManagerRunnerLatch));
+		jobMasterLeaderElectionService.isLeader(UUID.randomUUID());
+		DispatcherGateway dispatcherGateway = dispatcher.getSelfGateway(DispatcherGateway.class);
+
+		Tuple2<JobGraph, BlockingJobVertex> blockingJobGraph = getBlockingJobGraphAndVertex();
+		JobID jid = blockingJobGraph.f0.getJobID();
+
+		dispatcherGateway.submitJob(blockingJobGraph.f0, TIMEOUT).get();
+
+		assertThat(dispatcherGateway.requestJobStatus(jid, TIMEOUT).get(), is(JobStatus.INITIALIZING));
+
+		// this call is supposed to fail
+		try {
+			dispatcherGateway.triggerSavepoint(jid, "file:///tmp/savepoint", false, TIMEOUT).get();
+			fail("Previous statement should have failed");
+		} catch (ExecutionException t) {
+			assertTrue(t.getCause() instanceof UnavailableDispatcherOperationException);
+		}
+
+		// submission has succeeded, let the initialization finish.
+		blockingJobGraph.f1.unblock();
+
+		// ensure job is running
+		CommonTestUtils.waitUntilCondition(() -> dispatcherGateway.requestJobStatus(jobGraph.getJobID(), TIMEOUT).get() == JobStatus.RUNNING,
+			Deadline.fromNow(Duration.of(10, ChronoUnit.SECONDS)), 5L);
+	}
+
+	@Test
+	public void testCancellationDuringInitialization() throws Exception {
+		dispatcher = createAndStartDispatcher(heartbeatServices, haServices, new ExpectedJobIdJobManagerRunnerFactory(TEST_JOB_ID, createdJobManagerRunnerLatch));
+		jobMasterLeaderElectionService.isLeader(UUID.randomUUID());
+		DispatcherGateway dispatcherGateway = dispatcher.getSelfGateway(DispatcherGateway.class);
+
+		// create a job graph of a job that blocks forever
+		Tuple2<JobGraph, BlockingJobVertex> blockingJobGraph = getBlockingJobGraphAndVertex();
+		JobID jobID = blockingJobGraph.f0.getJobID();
+
+		dispatcherGateway.submitJob(blockingJobGraph.f0, TIMEOUT).get();
+
+		assertThat(dispatcherGateway.requestJobStatus(jobID, TIMEOUT).get(), is(JobStatus.INITIALIZING));
+
+		// submission has succeeded, now cancel the job
+		CompletableFuture<Acknowledge> cancellationFuture = dispatcherGateway.cancelJob(jobID, TIMEOUT);
+		assertThat(dispatcherGateway.requestJobStatus(jobID, TIMEOUT).get(), is(JobStatus.CANCELLING));
+		assertThat(cancellationFuture.isDone(), is(false));
+		// unblock
+		blockingJobGraph.f1.unblock();
+		// wait until cancelled
+		cancellationFuture.get();
+		assertThat(dispatcherGateway.requestJobResult(jobID, TIMEOUT).get().getApplicationStatus(), is(ApplicationStatus.CANCELED));
+	}
+
+	@Test
+	public void testErrorDuringInitialization() throws Exception {
+		dispatcher = createAndStartDispatcher(heartbeatServices, haServices, new ExpectedJobIdJobManagerRunnerFactory(TEST_JOB_ID, createdJobManagerRunnerLatch));
+		DispatcherGateway dispatcherGateway = dispatcher.getSelfGateway(DispatcherGateway.class);
+
+		// create a job graph that fails during initialization
+		final FailingInitializationJobVertex failingInitializationJobVertex = new FailingInitializationJobVertex("testVertex");
+		failingInitializationJobVertex.setInvokableClass(NoOpInvokable.class);
+		JobGraph blockingJobGraph = new JobGraph(TEST_JOB_ID, "failingTestJob", failingInitializationJobVertex);
+
+		dispatcherGateway.submitJob(blockingJobGraph, TIMEOUT).get();
+
+		// wait till job has failed
+		dispatcherGateway.requestJobResult(TEST_JOB_ID, TIMEOUT).get();
+
+		// get failure cause
+		ArchivedExecutionGraph execGraph = dispatcherGateway.requestJob(jobGraph.getJobID(), TIMEOUT).get();
+		Assert.assertNotNull(execGraph.getFailureInfo());
+		Throwable throwable = execGraph.getFailureInfo().getException().deserializeError(ClassLoader.getSystemClassLoader());
+
+		// ensure correct exception type
+		assertTrue(throwable instanceof JobInitializationException);
+	}
+
 	/**
 	 * Test that {@link JobResult} is cached when the job finishes.
 	 */
@@ -413,26 +530,42 @@ public class DispatcherTest extends TestLogger {
 	 */
 	@Test
 	public void testWaitingForJobMasterLeadership() throws Exception {
-		dispatcher = createAndStartDispatcher(heartbeatServices, haServices, new ExpectedJobIdJobManagerRunnerFactory(TEST_JOB_ID, createdJobManagerRunnerLatch));
+		ExpectedJobIdJobManagerRunnerFactory jobManagerRunnerFactor = new ExpectedJobIdJobManagerRunnerFactory(
+			TEST_JOB_ID,
+			createdJobManagerRunnerLatch);
+		dispatcher = createAndStartDispatcher(heartbeatServices, haServices, jobManagerRunnerFactor);
 
 		final DispatcherGateway dispatcherGateway = dispatcher.getSelfGateway(DispatcherGateway.class);
 
 		dispatcherGateway.submitJob(jobGraph, TIMEOUT).get();
-
-		final CompletableFuture<JobStatus> jobStatusFuture = dispatcherGateway.requestJobStatus(jobGraph.getJobID(), TIMEOUT);
-
-		assertThat(jobStatusFuture.isDone(), is(false));
-
-		try {
-			jobStatusFuture.get(10, TimeUnit.MILLISECONDS);
-			fail("Should not complete.");
-		} catch (TimeoutException ignored) {
-			// ignored
+		log.info("Job submission completed");
+
+		// wait until job has been initialized: approximated by the time when the leaderelection finished
+		jobMasterLeaderElectionService.getStartFuture().get();
+
+		// try getting a blocking, non-initializing job status future in a retry-loop.
+		// In some CI environments, we can not guarantee that the job immediately leaves the INITIALIZING status
+		// after the jobMasterLeaderElectionService has been started.
+		CompletableFuture<JobStatus> jobStatusFuture = null;
+		for (int i = 0; i < 5; i++) {
+			jobStatusFuture = dispatcherGateway.requestJobStatus(jobGraph.getJobID(), TIMEOUT);
+			try {
+				JobStatus status = jobStatusFuture.get(10, TimeUnit.MILLISECONDS);
+				if (status == JobStatus.INITIALIZING) {
+					jobStatusFuture = null;
+					Thread.sleep(100);
+				}
+			} catch (TimeoutException ignored) {
+				break; // great, we have a blocking future
+			}
+		}
+		if (jobStatusFuture == null) {
+			fail("Unable to get a job status future blocked on leader election.");
 		}
 
 		jobMasterLeaderElectionService.isLeader(UUID.randomUUID()).get();
 
-		assertThat(jobStatusFuture.get(), notNullValue());
+		assertThat(jobStatusFuture.get(), is(JobStatus.RUNNING));
 	}
 
 	/**
@@ -476,19 +609,17 @@ public class DispatcherTest extends TestLogger {
 
 		final DispatcherGateway dispatcherGateway = dispatcher.getSelfGateway(DispatcherGateway.class);
 
-		final CompletableFuture<Acknowledge> submissionFuture = dispatcherGateway.submitJob(jobGraph, TIMEOUT);
+		dispatcherGateway.submitJob(jobGraph, TIMEOUT).get();
 
-		assertThat(submissionFuture.isDone(), is(false));
+		assertThat(dispatcherGateway.requestJobStatus(jobGraph.getJobID(), TIMEOUT).get(), is(JobStatus.INITIALIZING));
 
 		final CompletableFuture<Collection<String>> metricQueryServiceAddressesFuture = dispatcherGateway.requestMetricQueryServiceAddresses(Time.seconds(5L));
 
 		assertThat(metricQueryServiceAddressesFuture.get(), is(empty()));
 
-		assertThat(submissionFuture.isDone(), is(false));
+		assertThat(dispatcherGateway.requestJobStatus(jobGraph.getJobID(), TIMEOUT).get(), is(JobStatus.INITIALIZING));
 
 		jobManagerRunnerCreationLatch.trigger();
-
-		submissionFuture.get();
 	}
 
 	/**
@@ -511,26 +642,31 @@ public class DispatcherTest extends TestLogger {
 				}
 			}));
 
+		jobMasterLeaderElectionService.isLeader(UUID.randomUUID());
 		final DispatcherGateway dispatcherGateway = dispatcher.getSelfGateway(DispatcherGateway.class);
 
-		CompletableFuture<Acknowledge> submissionFuture = dispatcherGateway.submitJob(jobGraph, TIMEOUT);
+		dispatcherGateway.submitJob(jobGraph, TIMEOUT).get();
 
-		assertThat(submissionFuture.isDone(), is(false));
+		assertThat(dispatcherGateway.requestJobStatus(jobGraph.getJobID(), TIMEOUT).get(), is(JobStatus.INITIALIZING));
 
 		queue.offer(Optional.of(testException));
 
-		try {
-			submissionFuture.get();
-			fail("Should fail because we could not instantiate the JobManagerRunner.");
-		} catch (Exception e) {
-			assertThat(ExceptionUtils.findThrowable(e, t -> t.equals(testException)).isPresent(), is(true));
-		}
+		// wait till job is failed
+		dispatcherGateway.requestJobResult(jobGraph.getJobID(), TIMEOUT).get();
 
-		submissionFuture = dispatcherGateway.submitJob(jobGraph, TIMEOUT);
+		ArchivedExecutionGraph execGraph = dispatcherGateway.requestJob(jobGraph.getJobID(), TIMEOUT).get();
+		Assert.assertNotNull(execGraph.getFailureInfo());
+		assertThat(ExceptionUtils.findSerializedThrowable(execGraph.getFailureInfo().getException(), FlinkException.class, this.getClass().getClassLoader()).isPresent(), is(true));
 
+		// submit job again
+		dispatcherGateway.submitJob(jobGraph, TIMEOUT).get();
+
+		// don't fail this time
 		queue.offer(Optional.empty());
 
-		submissionFuture.get();
+		// Ensure job is running
+		CommonTestUtils.waitUntilCondition(() -> dispatcherGateway.requestJobStatus(jobGraph.getJobID(), TIMEOUT).get() == JobStatus.RUNNING,
+			Deadline.fromNow(Duration.of(10, ChronoUnit.SECONDS)), 5L);
 	}
 
 	@Test
@@ -546,8 +682,8 @@ public class DispatcherTest extends TestLogger {
 		dispatcher.start();
 
 		final DispatcherGateway dispatcherGateway = dispatcher.getSelfGateway(DispatcherGateway.class);
-		final CompletableFuture<Acknowledge> submissionFuture = dispatcherGateway.submitJob(jobGraph, TIMEOUT);
-		submissionFuture.get();
+		dispatcherGateway.submitJob(jobGraph, TIMEOUT).get();
+
 		assertThat(dispatcher.getNumberJobs(TIMEOUT).get(), Matchers.is(1));
 
 		dispatcher.close();
@@ -626,10 +762,36 @@ public class DispatcherTest extends TestLogger {
 		public TestingJobManagerRunner createJobManagerRunner(JobGraph jobGraph, Configuration configuration, RpcService rpcService, HighAvailabilityServices highAvailabilityServices, HeartbeatServices heartbeatServices, JobManagerSharedServices jobManagerSharedServices, JobManagerJobMetricGroupFactory jobManagerJobMetricGroupFactory, FatalErrorHandler fatalErrorHandler) throws Exception {
 			jobManagerRunnerCreationLatch.run();
 
-			return super.createJobManagerRunner(jobGraph, configuration, rpcService, highAvailabilityServices, heartbeatServices, jobManagerSharedServices, jobManagerJobMetricGroupFactory, fatalErrorHandler);
+			TestingJobManagerRunner testingRunner = super.createJobManagerRunner(
+				jobGraph,
+				configuration,
+				rpcService,
+				highAvailabilityServices,
+				heartbeatServices,
+				jobManagerSharedServices,
+				jobManagerJobMetricGroupFactory,
+				fatalErrorHandler);
+
+			TestingJobMasterGateway testingJobMasterGateway = new TestingJobMasterGatewayBuilder()
+				.setRequestJobSupplier(() -> CompletableFuture.completedFuture(ArchivedExecutionGraph.createFromInitializingJob(jobGraph.getJobID(),
+					jobGraph.getName(),
+					JobStatus.RUNNING,
+					null,
+					1337))
+				).build();
+			testingRunner.completeJobMasterGatewayFuture(testingJobMasterGateway);
+			return testingRunner;
 		}
 	}
 
+	private Tuple2<JobGraph, BlockingJobVertex> getBlockingJobGraphAndVertex() {
+		final BlockingJobVertex blockingJobVertex = new BlockingJobVertex("testVertex");
+		blockingJobVertex.setInvokableClass(NoOpInvokable.class);
+		return Tuple2.of(
+			new JobGraph(TEST_JOB_ID, "blockingTestJob", blockingJobVertex),
+			blockingJobVertex);
+	}
+
 	private JobGraph createFailingJobGraph(Exception failureCause) {
 		final FailingJobVertex jobVertex = new FailingJobVertex("Failing JobVertex", failureCause);
 		jobVertex.setInvokableClass(NoOpInvokable.class);
@@ -690,4 +852,34 @@ public class DispatcherTest extends TestLogger {
 		}
 	}
 
+	private static class BlockingJobVertex extends JobVertex {
+		private final OneShotLatch oneShotLatch = new OneShotLatch();
+		public BlockingJobVertex(String name) {
+			super(name);
+		}
+
+		@Override
+		public void initializeOnMaster(ClassLoader loader) throws Exception {
+			super.initializeOnMaster(loader);
+			oneShotLatch.await();
+		}
+
+		public void unblock() {
+			oneShotLatch.trigger();
+		}
+	}
+
+	/**
+	 * JobVertex that fails during initialization.
+	 */
+	public static class FailingInitializationJobVertex extends JobVertex {
+		public FailingInitializationJobVertex(String name) {
+			super(name);
+		}
+
+		@Override
+		public void initializeOnMaster(ClassLoader loader) {
+			throw new IllegalStateException("Artificial test failure");
+		}
+	}
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingJobManagerRunnerFactory.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingJobManagerRunnerFactory.java
index 1f3216b..a5307e6 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingJobManagerRunnerFactory.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingJobManagerRunnerFactory.java
@@ -77,7 +77,10 @@ public class TestingJobManagerRunnerFactory implements JobManagerRunnerFactory {
 			blockingTermination = false;
 		}
 
-		return new TestingJobManagerRunner(jobGraph.getJobID(), blockingTermination);
+		return new TestingJobManagerRunner.Builder()
+			.setJobId(jobGraph.getJobID())
+			.setBlockingTermination(blockingTermination)
+			.build();
 	}
 
 	public TestingJobManagerRunner takeCreatedJobManagerRunner() throws InterruptedException {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/BlobsCleanupITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/BlobsCleanupITCase.java
index d62930b..1eec46d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/BlobsCleanupITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/BlobsCleanupITCase.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.jobmanager;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.api.common.JobSubmissionResult;
 import org.apache.flink.api.common.time.Deadline;
 import org.apache.flink.configuration.BlobServerOptions;
@@ -28,7 +29,6 @@ import org.apache.flink.configuration.UnmodifiableConfiguration;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.blob.BlobClient;
 import org.apache.flink.runtime.blob.PermanentBlobKey;
-import org.apache.flink.runtime.client.JobSubmissionException;
 import org.apache.flink.runtime.clusterframework.ApplicationStatus;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobVertex;
@@ -40,6 +40,7 @@ import org.apache.flink.runtime.testtasks.NoOpInvokable;
 import org.apache.flink.runtime.testutils.MiniClusterResource;
 import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
 import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.SerializedThrowable;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.AfterClass;
@@ -53,17 +54,19 @@ import javax.annotation.Nonnull;
 import java.io.File;
 import java.io.FilenameFilter;
 import java.net.InetSocketAddress;
+import java.nio.file.NoSuchFileException;
 import java.time.Duration;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
+import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
 
 import static org.hamcrest.Matchers.hasSize;
 import static org.hamcrest.Matchers.is;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 /**
@@ -180,18 +183,17 @@ public class BlobsCleanupITCase extends TestLogger {
 			jobGraph.addUserJarBlobKey(new PermanentBlobKey());
 		}
 
-		final CompletableFuture<JobSubmissionResult> submissionFuture = miniCluster.submitJob(jobGraph);
+		final JobSubmissionResult jobSubmissionResult = miniCluster.submitJob(jobGraph).get();
 
 		if (testCase == TestCase.JOB_SUBMISSION_FAILS) {
-			try {
-				submissionFuture.get();
-				fail("Expected job submission failure.");
-			} catch (ExecutionException e) {
-				assertThat(ExceptionUtils.findThrowable(e, JobSubmissionException.class).isPresent(), is(true));
-			}
-		} else {
-			final JobSubmissionResult jobSubmissionResult = submissionFuture.get();
+			// Wait for submission to fail & check if exception is forwarded
+			Optional<SerializedThrowable> exception = miniCluster.requestJobResult(jid).get().getSerializedThrowable();
+			assertTrue(exception.isPresent());
+			assertTrue(ExceptionUtils.findThrowableSerializedAware(exception.get(), NoSuchFileException.class, getClass().getClassLoader()).isPresent());
 
+			// check job status
+			assertThat(miniCluster.getJobStatus(jid).get(), is(JobStatus.FAILED));
+		} else {
 			assertThat(jobSubmissionResult.getJobID(), is(jid));
 
 			final CompletableFuture<JobResult> resultFuture = miniCluster.requestJobResult(jid);
@@ -220,7 +222,6 @@ public class BlobsCleanupITCase extends TestLogger {
 						.orElse(null);
 				assertThat(ExceptionUtils.stringifyException(cause), jobResult.isSuccess(), is(true));
 			}
-
 		}
 
 		// both BlobServer and BlobCache should eventually delete all files
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingJobManagerRunner.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingJobManagerRunner.java
index 55cfde4..b9a480c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingJobManagerRunner.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingJobManagerRunner.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.jobmaster;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.util.Preconditions;
 
 import java.util.concurrent.CompletableFuture;
 
@@ -32,21 +33,20 @@ public class TestingJobManagerRunner implements JobManagerRunner {
 
 	private final boolean blockingTermination;
 
-	private final CompletableFuture<ArchivedExecutionGraph> resultFuture;
+	private final CompletableFuture<Void> terminationFuture;
 
 	private final CompletableFuture<JobMasterGateway> jobMasterGatewayFuture;
 
-	private final CompletableFuture<Void> terminationFuture;
-
-	public TestingJobManagerRunner(JobID jobId) {
-		this(jobId, false);
-	}
+	private final CompletableFuture<ArchivedExecutionGraph> resultFuture;
 
-	public TestingJobManagerRunner(JobID jobId, boolean blockingTermination) {
+	private TestingJobManagerRunner(JobID jobId,
+			boolean blockingTermination,
+			CompletableFuture<JobMasterGateway> jobMasterGatewayFuture,
+			CompletableFuture<ArchivedExecutionGraph> resultFuture) {
 		this.jobId = jobId;
 		this.blockingTermination = blockingTermination;
-		this.resultFuture = new CompletableFuture<>();
-		this.jobMasterGatewayFuture = new CompletableFuture<>();
+		this.jobMasterGatewayFuture = jobMasterGatewayFuture;
+		this.resultFuture = resultFuture;
 		this.terminationFuture = new CompletableFuture<>();
 
 		terminationFuture.whenComplete((ignored, ignoredThrowable) -> resultFuture.completeExceptionally(new JobNotFinishedException(jobId)));
@@ -94,4 +94,47 @@ public class TestingJobManagerRunner implements JobManagerRunner {
 	public CompletableFuture<Void> getTerminationFuture() {
 		return terminationFuture;
 	}
+
+	public void completeJobMasterGatewayFuture(JobMasterGateway testingJobMasterGateway) {
+		this.jobMasterGatewayFuture.complete(testingJobMasterGateway);
+	}
+
+	public static class Builder {
+		private JobID jobId = null;
+		private boolean blockingTermination = false;
+		private CompletableFuture<JobMasterGateway> jobMasterGatewayFuture = new CompletableFuture<>();
+		private CompletableFuture<ArchivedExecutionGraph> resultFuture = new CompletableFuture<>();
+
+		public Builder setJobId(JobID jobId) {
+			this.jobId = jobId;
+			return this;
+		}
+
+		public Builder setBlockingTermination(boolean blockingTermination) {
+			this.blockingTermination = blockingTermination;
+			return this;
+		}
+
+		public Builder setJobMasterGatewayFuture(CompletableFuture<JobMasterGateway> jobMasterGatewayFuture) {
+			Preconditions.checkNotNull(jobMasterGatewayFuture);
+			this.jobMasterGatewayFuture = jobMasterGatewayFuture;
+			return this;
+		}
+
+		public Builder setResultFuture(CompletableFuture<ArchivedExecutionGraph> resultFuture) {
+			Preconditions.checkNotNull(resultFuture);
+			this.resultFuture = resultFuture;
+			return this;
+		}
+
+		public TestingJobManagerRunner build() {
+			Preconditions.checkNotNull(jobId);
+			return new TestingJobManagerRunner(
+				jobId,
+				blockingTermination,
+				jobMasterGatewayFuture,
+				resultFuture);
+		}
+	}
+
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeClusterComponentsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeClusterComponentsTest.java
index 22f86d7..2edcc23 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeClusterComponentsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeClusterComponentsTest.java
@@ -137,6 +137,9 @@ public class LeaderChangeClusterComponentsTest extends TestLogger {
 
 		CompletableFuture<JobResult> jobResultFuture = miniCluster.requestJobResult(jobId);
 
+		// need to wait until init is finished, so that the leadership revocation is possible
+		CommonTestUtils.waitUntilJobManagerIsInitialized(() -> miniCluster.getJobStatus(jobId).get());
+
 		highAvailabilityServices.revokeJobMasterLeadership(jobId).get();
 
 		JobResultUtils.assertIncomplete(jobResultFuture);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
index 58e6996..f7c882b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.taskexecutor;
 
 import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.api.common.time.Deadline;
 import org.apache.flink.runtime.execution.Environment;
@@ -150,7 +151,7 @@ public class TaskExecutorITCase extends TestLogger {
 
 		return () -> {
 			final AccessExecutionGraph executionGraph = executionGraphFutureSupplier.get().join();
-			return allExecutionsRunning.test(executionGraph);
+			return allExecutionsRunning.test(executionGraph) && executionGraph.getState() == JobStatus.RUNNING;
 		};
 	}
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/CommonTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/CommonTestUtils.java
index 0c11966..ee4c20b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/CommonTestUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/CommonTestUtils.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.testutils;
 
+import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.api.common.time.Deadline;
 import org.apache.flink.util.FileUtils;
 import org.apache.flink.util.function.SupplierWithException;
@@ -31,6 +32,8 @@ import java.io.PrintWriter;
 import java.io.StringWriter;
 import java.lang.management.ManagementFactory;
 import java.lang.management.RuntimeMXBean;
+import java.time.Duration;
+import java.time.temporal.ChronoUnit;
 import java.util.concurrent.TimeoutException;
 
 /**
@@ -130,6 +133,17 @@ public class CommonTestUtils {
 		}
 	}
 
+	public static void waitUntilJobManagerIsInitialized(SupplierWithException<JobStatus, Exception> jobStatusSupplier) throws
+		Exception {
+		waitUntilJobManagerIsInitialized(jobStatusSupplier, Deadline.fromNow(Duration.of(1,
+			ChronoUnit.MINUTES)));
+	}
+
+	public static void waitUntilJobManagerIsInitialized(SupplierWithException<JobStatus, Exception> jobStatusSupplier, Deadline timeout) throws
+		Exception {
+		waitUntilCondition(() -> jobStatusSupplier.get() != JobStatus.INITIALIZING, timeout, 20L);
+	}
+
 	/**
 	 * Utility class to read the output of a process stream and forward it into a StringWriter.
 	 */
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/TestingRestfulGateway.java b/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/TestingRestfulGateway.java
index 0acc5ac..315a733 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/TestingRestfulGateway.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/TestingRestfulGateway.java
@@ -53,7 +53,7 @@ public class TestingRestfulGateway implements RestfulGateway {
 	static final Function<JobID, CompletableFuture<Acknowledge>> DEFAULT_CANCEL_JOB_FUNCTION = jobId -> CompletableFuture.completedFuture(Acknowledge.get());
 	static final Function<JobID, CompletableFuture<JobResult>> DEFAULT_REQUEST_JOB_RESULT_FUNCTION = jobId -> FutureUtils.completedExceptionally(new UnsupportedOperationException());
 	static final Function<JobID, CompletableFuture<ArchivedExecutionGraph>> DEFAULT_REQUEST_JOB_FUNCTION = jobId -> FutureUtils.completedExceptionally(new UnsupportedOperationException());
-	static final Function<JobID, CompletableFuture<JobStatus>> DEFAULT_REQUEST_JOB_STATUS_FUNCTION = jobId -> FutureUtils.completedExceptionally(new UnsupportedOperationException());
+	static final Function<JobID, CompletableFuture<JobStatus>> DEFAULT_REQUEST_JOB_STATUS_FUNCTION = jobId -> CompletableFuture.completedFuture(JobStatus.RUNNING);
 	static final Supplier<CompletableFuture<MultipleJobsDetails>> DEFAULT_REQUEST_MULTIPLE_JOB_DETAILS_SUPPLIER = () -> CompletableFuture.completedFuture(new MultipleJobsDetails(Collections.emptyList()));
 	static final Supplier<CompletableFuture<ClusterOverview>> DEFAULT_REQUEST_CLUSTER_OVERVIEW_SUPPLIER = () -> CompletableFuture.completedFuture(new ClusterOverview(0, 0, 0, 0, 0, 0, 0));
 	static final Supplier<CompletableFuture<Collection<String>>> DEFAULT_REQUEST_METRIC_QUERY_SERVICE_PATHS_SUPPLIER = () -> CompletableFuture.completedFuture(Collections.emptyList());
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index 2a3a98c..9746f13 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -1798,7 +1798,7 @@ public class StreamExecutionEnvironment {
 
 		CompletableFuture<JobClient> jobClientFuture = executorFactory
 			.getExecutor(configuration)
-			.execute(streamGraph, configuration);
+			.execute(streamGraph, configuration, userClassloader);
 
 		try {
 			JobClient jobClient = jobClientFuture.get();
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/environment/ExecutorDiscoveryAndJobClientTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/environment/ExecutorDiscoveryAndJobClientTest.java
index bee2bdf..153a457 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/environment/ExecutorDiscoveryAndJobClientTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/environment/ExecutorDiscoveryAndJobClientTest.java
@@ -86,7 +86,7 @@ public class ExecutorDiscoveryAndJobClientTest {
 
 		@Override
 		public PipelineExecutor getExecutor(Configuration configuration) {
-			return (pipeline, executionConfig) -> CompletableFuture.completedFuture(new TestingJobClient());
+			return (pipeline, executionConfig, classLoader) -> CompletableFuture.completedFuture(new TestingJobClient());
 		}
 	}
 }
diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
index e76044d..6b4f12a 100644
--- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
+++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
@@ -485,7 +485,7 @@ public class LocalExecutor implements Executor {
 		configuration.set(DeploymentOptions.ATTACHED, false);
 
 		// create execution
-		final ProgramDeployer deployer = new ProgramDeployer(configuration, jobName, pipeline);
+		final ProgramDeployer deployer = new ProgramDeployer(configuration, jobName, pipeline, context.getClassLoader());
 
 		// wrap in classloader because CodeGenOperatorFactory#getStreamOperatorClass
 		// requires to access UDF in deployer.deploy().
@@ -542,7 +542,7 @@ public class LocalExecutor implements Executor {
 
 		// create execution
 		final ProgramDeployer deployer = new ProgramDeployer(
-				configuration, jobName, pipeline);
+				configuration, jobName, pipeline, context.getClassLoader());
 
 		JobClient jobClient;
 		// wrap in classloader because CodeGenOperatorFactory#getStreamOperatorClass
diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ProgramDeployer.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ProgramDeployer.java
index 31b5143..e2b7d46 100644
--- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ProgramDeployer.java
+++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ProgramDeployer.java
@@ -41,6 +41,7 @@ public class ProgramDeployer {
 	private final Configuration configuration;
 	private final Pipeline pipeline;
 	private final String jobName;
+	private final ClassLoader userCodeClassloader;
 
 	/**
 	 * Deploys a table program on the cluster.
@@ -52,10 +53,12 @@ public class ProgramDeployer {
 	public ProgramDeployer(
 			Configuration configuration,
 			String jobName,
-			Pipeline pipeline) {
+			Pipeline pipeline,
+			ClassLoader userCodeClassloader) {
 		this.configuration = configuration;
 		this.pipeline = pipeline;
 		this.jobName = jobName;
+		this.userCodeClassloader = userCodeClassloader;
 	}
 
 	public CompletableFuture<JobClient> deploy() {
@@ -79,7 +82,7 @@ public class ProgramDeployer {
 		final PipelineExecutor executor = executorFactory.getExecutor(configuration);
 		CompletableFuture<JobClient> jobClient;
 		try {
-			jobClient = executor.execute(pipeline, configuration);
+			jobClient = executor.execute(pipeline, configuration, userCodeClassloader);
 		} catch (Exception e) {
 			throw new RuntimeException("Could not execute program.", e);
 		}
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/BatchTableEnvImpl.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/BatchTableEnvImpl.scala
index 4a4f3ef..5d7e755 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/BatchTableEnvImpl.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/BatchTableEnvImpl.scala
@@ -343,7 +343,8 @@ abstract class BatchTableEnvImpl(
       "Cannot find compatible factory for specified execution.target (=%s)",
       configuration.get(DeploymentOptions.TARGET))
 
-    val jobClientFuture = executorFactory.getExecutor(configuration).execute(plan, configuration)
+    val jobClientFuture = executorFactory.getExecutor(configuration)
+      .execute(plan, configuration, execEnv.getUserCodeClassLoader)
     try {
       jobClientFuture.get
     } catch {
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
index d70a409..81b979b 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
@@ -53,6 +53,7 @@ import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.api.graph.StreamGraph;
 import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.flink.test.util.TestUtils;
 import org.apache.flink.testutils.EntropyInjectingTestFileSystem;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.ExceptionUtils;
@@ -337,6 +338,8 @@ public class SavepointITCase extends TestLogger {
 
 		try {
 			client.submitJob(graph).get();
+			// triggerSavepoint is only available after job is initialized
+			TestUtils.waitUntilJobInitializationFinished(graph.getJobID(), cluster, ClassLoader.getSystemClassLoader());
 
 			client.triggerSavepoint(graph.getJobID(), null).get();
 
diff --git a/flink-tests/src/test/java/org/apache/flink/test/example/client/LocalExecutorITCase.java b/flink-tests/src/test/java/org/apache/flink/test/example/client/LocalExecutorITCase.java
index f13a243..0dc34a0 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/example/client/LocalExecutorITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/example/client/LocalExecutorITCase.java
@@ -81,7 +81,7 @@ public class LocalExecutorITCase extends TestLogger {
 
 			Plan wcPlan = getWordCountPlan(inFile, outFile, parallelism);
 			wcPlan.setExecutionConfig(new ExecutionConfig());
-			JobClient jobClient = executor.execute(wcPlan, config).get();
+			JobClient jobClient = executor.execute(wcPlan, config, ClassLoader.getSystemClassLoader()).get();
 			jobClient.getJobExecutionResult(getClass().getClassLoader()).get();
 		} catch (Exception e) {
 			e.printStackTrace();
@@ -99,7 +99,7 @@ public class LocalExecutorITCase extends TestLogger {
 		Configuration config = new Configuration();
 		config.setBoolean(DeploymentOptions.ATTACHED, true);
 
-		JobClient jobClient = executor.execute(runtimeExceptionPlan, config).get();
+		JobClient jobClient = executor.execute(runtimeExceptionPlan, config, ClassLoader.getSystemClassLoader()).get();
 
 		assertThrows(
 			"Job execution failed.",
diff --git a/flink-tests/src/test/java/org/apache/flink/test/example/failing/JobSubmissionFailsITCase.java b/flink-tests/src/test/java/org/apache/flink/test/example/failing/JobSubmissionFailsITCase.java
index 5723978..d1a6c4e 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/example/failing/JobSubmissionFailsITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/example/failing/JobSubmissionFailsITCase.java
@@ -34,14 +34,10 @@ import org.apache.flink.util.TestLogger;
 
 import org.junit.ClassRule;
 import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
 
 import javax.annotation.Nonnull;
 
 import java.io.IOException;
-import java.util.Arrays;
-import java.util.Collection;
 import java.util.function.Predicate;
 
 import static org.apache.flink.test.util.TestUtils.submitJobAndWaitForResult;
@@ -50,7 +46,6 @@ import static org.junit.Assert.fail;
 /**
  * Tests for failing job submissions.
  */
-@RunWith(Parameterized.class)
 public class JobSubmissionFailsITCase extends TestLogger {
 
 	private static final int NUM_TM = 2;
@@ -80,19 +75,6 @@ public class JobSubmissionFailsITCase extends TestLogger {
 		return new JobGraph("Working testing job", jobVertex);
 	}
 
-	// --------------------------------------------------------------------------------------------
-
-	private final boolean detached;
-
-	public JobSubmissionFailsITCase(boolean detached) {
-		this.detached = detached;
-	}
-
-	@Parameterized.Parameters(name = "Detached mode = {0}")
-	public static Collection<Boolean[]> executionModes(){
-		return Arrays.asList(new Boolean[]{false},
-				new Boolean[]{true});
-	}
 
 	// --------------------------------------------------------------------------------------------
 
@@ -131,11 +113,7 @@ public class JobSubmissionFailsITCase extends TestLogger {
 		ClusterClient<?> client = MINI_CLUSTER_RESOURCE.getClusterClient();
 
 		try {
-			if (detached) {
-				client.submitJob(jobGraph).get();
-			} else {
-				submitJobAndWaitForResult(client, jobGraph, getClass().getClassLoader());
-			}
+			submitJobAndWaitForResult(client, jobGraph, getClass().getClassLoader());
 			fail("Job submission should have thrown an exception.");
 		} catch (Exception e) {
 			if (!failurePredicate.test(e)) {
diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/api/environment/RemoteStreamEnvironmentTest.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/api/environment/RemoteStreamEnvironmentTest.java
index e548f5e..7452332 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/streaming/api/environment/RemoteStreamEnvironmentTest.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/api/environment/RemoteStreamEnvironmentTest.java
@@ -152,7 +152,7 @@ public class RemoteStreamEnvironmentTest extends TestLogger {
 
 				@Override
 				public PipelineExecutor getExecutor(@Nonnull Configuration configuration) {
-					return (pipeline, config) -> {
+					return (pipeline, config, classLoader) -> {
 						assertTrue(pipeline instanceof StreamGraph);
 
 						actualSavepointRestoreSettings =
diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/BackPressureITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/BackPressureITCase.java
index 4f3b6f0..6447183 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/BackPressureITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/BackPressureITCase.java
@@ -21,6 +21,7 @@ package org.apache.flink.test.streaming.runtime;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.client.ClientUtils;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.MemorySize;
 import org.apache.flink.configuration.TaskManagerOptions;
@@ -118,6 +119,10 @@ public class BackPressureITCase extends TestLogger {
 		final JobVertex mapJobVertex = vertices.get(1);
 
 		testingMiniCluster.submitJob(jobGraph).get();
+		ClientUtils.waitUntilJobInitializationFinished(
+			() -> testingMiniCluster.getJobStatus(TEST_JOB_ID).get(),
+			() -> testingMiniCluster.requestJobResult(TEST_JOB_ID).get(),
+			ClassLoader.getSystemClassLoader());
 
 		assertJobVertexSubtasksAreBackPressured(mapJobVertex);
 		assertJobVertexSubtasksAreBackPressured(sourceJobVertex);
diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/TimestampITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/TimestampITCase.java
index 2647226..229c655 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/TimestampITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/TimestampITCase.java
@@ -19,6 +19,7 @@
 package org.apache.flink.test.streaming.runtime;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
@@ -840,7 +841,7 @@ public class TimestampITCase extends TestLogger {
 	private static List<JobID> getRunningJobs(ClusterClient<?> client) throws Exception {
 		Collection<JobStatusMessage> statusMessages = client.listJobs().get();
 		return statusMessages.stream()
-			.filter(status -> !status.getJobState().isGloballyTerminalState())
+			.filter(status -> !status.getJobState().isGloballyTerminalState() && status.getJobState() != JobStatus.INITIALIZING)
 			.map(JobStatusMessage::getJobId)
 			.collect(Collectors.toList());
 	}
diff --git a/flink-tests/src/test/java/org/apache/flink/test/util/TestUtils.java b/flink-tests/src/test/java/org/apache/flink/test/util/TestUtils.java
index 95550e0..7a2bd0b 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/util/TestUtils.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/util/TestUtils.java
@@ -19,9 +19,12 @@
 package org.apache.flink.test.util;
 
 import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.client.ClientUtils;
 import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.client.program.ProgramInvocationException;
 import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.runtime.client.JobInitializationException;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 
@@ -61,4 +64,10 @@ public class TestUtils {
 			.get()
 			.toJobExecutionResult(classLoader);
 	}
+
+	public static void waitUntilJobInitializationFinished(JobID id, MiniClusterWithClientResource miniCluster, ClassLoader userCodeClassloader) throws
+		JobInitializationException {
+		ClusterClient<?> clusterClient = miniCluster.getClusterClient();
+		ClientUtils.waitUntilJobInitializationFinished(() -> clusterClient.getJobStatus(id).get(), () -> clusterClient.requestJobResult(id).get(), userCodeClassloader);
+	}
 }
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
index 6935ce3..1642a39 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
@@ -19,6 +19,7 @@
 package org.apache.flink.yarn;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.api.common.time.Deadline;
 import org.apache.flink.client.deployment.ClusterDeploymentException;
 import org.apache.flink.client.deployment.ClusterSpecification;
@@ -310,7 +311,7 @@ public class YARNHighAvailabilityITCase extends YarnTestBase {
 		CommonTestUtils.waitUntilCondition(
 			() -> {
 				final JobDetailsInfo jobDetails = restClusterClient.getJobDetails(jobId).get();
-				return jobDetails.getJobVertexInfos()
+				return jobDetails.getJobStatus() == JobStatus.RUNNING && jobDetails.getJobVertexInfos()
 					.stream()
 					.map(toExecutionState())
 					.allMatch(isRunning());