You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/09/14 19:51:22 UTC

[flink] 02/02: [FLINK-10314] Making JobManagerRunner creation non-blocking in Dispatcher

This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 199f1758e9bdbdc316ec7168791a8e95373159a6
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Fri Sep 14 12:07:12 2018 +0200

    [FLINK-10314] Making JobManagerRunner creation non-blocking in Dispatcher
    
    The JobManagerRunner creation can be a blocking operation, e.g. if the CheckpointCoordinator
    needs to access a FileSystem. Therefore, this operation should not be executed in the main thread
    of the Dispatcher in order to not block this component.
    
    This closes #6699.
---
 .../apache/flink/util/function/FunctionUtils.java  |  13 ++
 .../flink/runtime/dispatcher/Dispatcher.java       | 135 ++++++++++--------
 .../flink/runtime/dispatcher/DispatcherTest.java   | 156 +++++++++++++++++++--
 .../dispatcher/TestingJobManagerRunnerFactory.java |   5 +-
 .../utils/TestingResourceManagerGateway.java       |   2 +-
 5 files changed, 241 insertions(+), 70 deletions(-)

diff --git a/flink-core/src/main/java/org/apache/flink/util/function/FunctionUtils.java b/flink-core/src/main/java/org/apache/flink/util/function/FunctionUtils.java
index 678ef9f..83846a6 100644
--- a/flink-core/src/main/java/org/apache/flink/util/function/FunctionUtils.java
+++ b/flink-core/src/main/java/org/apache/flink/util/function/FunctionUtils.java
@@ -32,6 +32,19 @@ public class FunctionUtils {
 		throw new UnsupportedOperationException("This class should never be instantiated.");
 	}
 
+	private static final Function<Object, Void> NULL_FN = ignored -> null;
+
+	/**
+	 * Function which returns {@code null} (type: Void).
+	 *
+	 * @param <T> input type
+	 * @return Function which returns {@code null}.
+	 */
+	@SuppressWarnings("unchecked")
+	public static <T> Function<T, Void> nullFn() {
+		return (Function<T, Void>) NULL_FN;
+	}
+
 	/**
 	 * Convert at {@link FunctionWithException} into a {@link Function}.
 	 *
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
index 5279e50..a1da213 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
@@ -63,10 +63,11 @@ import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.function.BiConsumerWithException;
 import org.apache.flink.util.function.BiFunctionWithException;
+import org.apache.flink.util.function.CheckedSupplier;
 import org.apache.flink.util.function.FunctionUtils;
-import org.apache.flink.util.function.ThrowingConsumer;
-import org.apache.flink.util.function.ThrowingRunnable;
+import org.apache.flink.util.function.FunctionWithException;
 
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
@@ -110,7 +111,7 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme
 
 	private final FatalErrorHandler fatalErrorHandler;
 
-	private final Map<JobID, JobManagerRunner> jobManagerRunners;
+	private final Map<JobID, CompletableFuture<JobManagerRunner>> jobManagerRunnerFutures;
 
 	private final LeaderElectionService leaderElectionService;
 
@@ -166,7 +167,7 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme
 
 		this.runningJobsRegistry = highAvailabilityServices.getRunningJobsRegistry();
 
-		jobManagerRunners = new HashMap<>(16);
+		jobManagerRunnerFutures = new HashMap<>(16);
 
 		leaderElectionService = highAvailabilityServices.getDispatcherLeaderElectionService();
 
@@ -248,7 +249,7 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme
 			return FutureUtils.completedExceptionally(new FlinkException(String.format("Failed to retrieve job scheduling status for job %s.", jobId), e));
 		}
 
-		if (jobSchedulingStatus == RunningJobsRegistry.JobSchedulingStatus.DONE || jobManagerRunners.containsKey(jobId)) {
+		if (jobSchedulingStatus == RunningJobsRegistry.JobSchedulingStatus.DONE || jobManagerRunnerFutures.containsKey(jobId)) {
 			return FutureUtils.completedExceptionally(
 				new JobSubmissionException(jobId, String.format("Job has already been submitted and is in state %s.", jobSchedulingStatus)));
 		} else {
@@ -257,58 +258,72 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme
 
 			return persistAndRunFuture.exceptionally(
 				(Throwable throwable) -> {
+					final Throwable strippedThrowable = ExceptionUtils.stripCompletionException(throwable);
+					log.error("Failed to submit job {}.", jobId, strippedThrowable);
 					throw new CompletionException(
-						new JobSubmissionException(jobId, "Failed to submit job.", throwable));
+						new JobSubmissionException(jobId, "Failed to submit job.", strippedThrowable));
 				});
 		}
 	}
 
-	private void persistAndRunJob(JobGraph jobGraph) throws Exception {
+	private CompletableFuture<Void> persistAndRunJob(JobGraph jobGraph) throws Exception {
 		submittedJobGraphStore.putJobGraph(new SubmittedJobGraph(jobGraph, null));
 
-		try {
-			runJob(jobGraph);
-		} catch (Exception e) {
-			try {
+		final CompletableFuture<Void> runJobFuture = runJob(jobGraph);
+
+		return runJobFuture.whenComplete(BiConsumerWithException.unchecked((Object ignored, Throwable throwable) -> {
+			if (throwable != null) {
 				submittedJobGraphStore.removeJobGraph(jobGraph.getJobID());
-			} catch (Exception ie) {
-				e.addSuppressed(ie);
 			}
-
-			throw e;
-		}
+		}));
 	}
 
-	private void runJob(JobGraph jobGraph) throws Exception {
-		Preconditions.checkState(!jobManagerRunners.containsKey(jobGraph.getJobID()));
+	private CompletableFuture<Void> runJob(JobGraph jobGraph) {
+		Preconditions.checkState(!jobManagerRunnerFutures.containsKey(jobGraph.getJobID()));
 
-		final JobManagerRunner jobManagerRunner = createJobManagerRunner(jobGraph);
+		final CompletableFuture<JobManagerRunner> jobManagerRunnerFuture = createJobManagerRunner(jobGraph);
 
-		jobManagerRunner.start();
+		jobManagerRunnerFutures.put(jobGraph.getJobID(), jobManagerRunnerFuture);
 
-		jobManagerRunners.put(jobGraph.getJobID(), jobManagerRunner);
+		return jobManagerRunnerFuture
+			.thenApply(FunctionUtils.nullFn())
+			.whenCompleteAsync(
+				(ignored, throwable) -> {
+					if (throwable != null) {
+						jobManagerRunnerFutures.remove(jobGraph.getJobID());
+					}
+				},
+				getMainThreadExecutor());
 	}
 
-	private JobManagerRunner createJobManagerRunner(JobGraph jobGraph) throws Exception {
-		final JobID jobId = jobGraph.getJobID();
-
-		final JobManagerRunner jobManagerRunner = jobManagerRunnerFactory.createJobManagerRunner(
-			ResourceID.generate(),
-			jobGraph,
-			configuration,
-			getRpcService(),
-			highAvailabilityServices,
-			heartbeatServices,
-			blobServer,
-			jobManagerSharedServices,
-			new DefaultJobManagerJobMetricGroupFactory(jobManagerMetricGroup),
-			fatalErrorHandler);
+	private CompletableFuture<JobManagerRunner> createJobManagerRunner(JobGraph jobGraph) {
+		final RpcService rpcService = getRpcService();
+
+		final CompletableFuture<JobManagerRunner> jobManagerRunnerFuture = CompletableFuture.supplyAsync(
+			CheckedSupplier.unchecked(() ->
+				jobManagerRunnerFactory.createJobManagerRunner(
+					ResourceID.generate(),
+					jobGraph,
+					configuration,
+					rpcService,
+					highAvailabilityServices,
+					heartbeatServices,
+					blobServer,
+					jobManagerSharedServices,
+					new DefaultJobManagerJobMetricGroupFactory(jobManagerMetricGroup),
+					fatalErrorHandler)),
+			rpcService.getExecutor());
+
+		return jobManagerRunnerFuture.thenApply(FunctionUtils.uncheckedFunction(this::startJobManagerRunner));
+	}
 
+	private JobManagerRunner startJobManagerRunner(JobManagerRunner jobManagerRunner) throws Exception {
+		final JobID jobId = jobManagerRunner.getJobGraph().getJobID();
 		jobManagerRunner.getResultFuture().whenCompleteAsync(
 			(ArchivedExecutionGraph archivedExecutionGraph, Throwable throwable) -> {
 				// check if we are still the active JobManagerRunner by checking the identity
 				//noinspection ObjectEquality
-				if (jobManagerRunner == jobManagerRunners.get(jobId)) {
+				if (jobManagerRunner == jobManagerRunnerFutures.get(jobId).getNow(null)) {
 					if (archivedExecutionGraph != null) {
 						jobReachedGloballyTerminalState(archivedExecutionGraph);
 					} else {
@@ -325,13 +340,15 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme
 				}
 			}, getMainThreadExecutor());
 
+		jobManagerRunner.start();
+
 		return jobManagerRunner;
 	}
 
 	@Override
 	public CompletableFuture<Collection<JobID>> listJobs(Time timeout) {
 		return CompletableFuture.completedFuture(
-			Collections.unmodifiableSet(new HashSet<>(jobManagerRunners.keySet())));
+			Collections.unmodifiableSet(new HashSet<>(jobManagerRunnerFutures.keySet())));
 	}
 
 	@Override
@@ -481,9 +498,9 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme
 
 	@Override
 	public CompletableFuture<JobResult> requestJobResult(JobID jobId, Time timeout) {
-		final JobManagerRunner jobManagerRunner = jobManagerRunners.get(jobId);
+		final CompletableFuture<JobManagerRunner> jobManagerRunnerFuture = jobManagerRunnerFutures.get(jobId);
 
-		if (jobManagerRunner == null) {
+		if (jobManagerRunnerFuture == null) {
 			final ArchivedExecutionGraph archivedExecutionGraph = archivedExecutionGraphStore.get(jobId);
 
 			if (archivedExecutionGraph == null) {
@@ -492,7 +509,7 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme
 				return CompletableFuture.completedFuture(JobResult.createFrom(archivedExecutionGraph));
 			}
 		} else {
-			return jobManagerRunner.getResultFuture().thenApply(JobResult::createFrom);
+			return jobManagerRunnerFuture.thenCompose(JobManagerRunner::getResultFuture).thenApply(JobResult::createFrom);
 		}
 	}
 
@@ -566,11 +583,11 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme
 	}
 
 	private CompletableFuture<Void> removeJob(JobID jobId, boolean cleanupHA) {
-		JobManagerRunner jobManagerRunner = jobManagerRunners.remove(jobId);
+		CompletableFuture<JobManagerRunner> jobManagerRunnerFuture = jobManagerRunnerFutures.remove(jobId);
 
 		final CompletableFuture<Void> jobManagerRunnerTerminationFuture;
-		if (jobManagerRunner != null) {
-			jobManagerRunnerTerminationFuture = jobManagerRunner.closeAsync();
+		if (jobManagerRunnerFuture != null) {
+			jobManagerRunnerTerminationFuture = jobManagerRunnerFuture.thenCompose(JobManagerRunner::closeAsync);
 		} else {
 			jobManagerRunnerTerminationFuture = CompletableFuture.completedFuture(null);
 		}
@@ -616,7 +633,7 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme
 	private void terminateJobManagerRunners() {
 		log.info("Stopping all currently running jobs of dispatcher {}.", getAddress());
 
-		final HashSet<JobID> jobsToRemove = new HashSet<>(jobManagerRunners.keySet());
+		final HashSet<JobID> jobsToRemove = new HashSet<>(jobManagerRunnerFutures.keySet());
 
 		for (JobID jobId : jobsToRemove) {
 			removeJobAndRegisterTerminationFuture(jobId, false);
@@ -739,16 +756,16 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme
 	}
 
 	private CompletableFuture<JobMasterGateway> getJobMasterGatewayFuture(JobID jobId) {
-		final JobManagerRunner jobManagerRunner = jobManagerRunners.get(jobId);
+		final CompletableFuture<JobManagerRunner> jobManagerRunnerFuture = jobManagerRunnerFutures.get(jobId);
 
-		if (jobManagerRunner == null) {
+		if (jobManagerRunnerFuture == null) {
 			return FutureUtils.completedExceptionally(new FlinkJobNotFoundException(jobId));
 		} else {
-			final CompletableFuture<JobMasterGateway> leaderGatewayFuture = jobManagerRunner.getLeaderGatewayFuture();
+			final CompletableFuture<JobMasterGateway> leaderGatewayFuture = jobManagerRunnerFuture.thenCompose(JobManagerRunner::getLeaderGatewayFuture);
 			return leaderGatewayFuture.thenApplyAsync(
 				(JobMasterGateway jobMasterGateway) -> {
 					// check whether the retrieved JobMasterGateway belongs still to a running JobMaster
-					if (jobManagerRunners.containsKey(jobId)) {
+					if (jobManagerRunnerFutures.containsKey(jobId)) {
 						return jobMasterGateway;
 					} else {
 						throw new CompletionException(new FlinkJobNotFoundException(jobId));
@@ -764,12 +781,12 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme
 
 	@Nonnull
 	private <T> List<CompletableFuture<Optional<T>>> queryJobMastersForInformation(Function<JobMasterGateway, CompletableFuture<T>> queryFunction) {
-		final int numberJobsRunning = jobManagerRunners.size();
+		final int numberJobsRunning = jobManagerRunnerFutures.size();
 
 		ArrayList<CompletableFuture<Optional<T>>> optionalJobInformation = new ArrayList<>(
 			numberJobsRunning);
 
-		for (JobID jobId : jobManagerRunners.keySet()) {
+		for (JobID jobId : jobManagerRunnerFutures.keySet()) {
 			final CompletableFuture<JobMasterGateway> jobMasterGatewayFuture = getJobMasterGatewayFuture(jobId);
 
 			final CompletableFuture<Optional<T>> optionalRequest = jobMasterGatewayFuture
@@ -836,10 +853,10 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme
 			log.debug("Dispatcher {} accepted leadership with fencing token {}. Start recovered jobs.", getAddress(), dispatcherId);
 			setNewFencingToken(dispatcherId);
 
-			Collection<CompletableFuture<Void>> runFutures = new ArrayList<>(recoveredJobs.size());
+			Collection<CompletableFuture<?>> runFutures = new ArrayList<>(recoveredJobs.size());
 
 			for (JobGraph recoveredJob : recoveredJobs) {
-				final CompletableFuture<Void> runFuture = waitForTerminatingJobManager(recoveredJob.getJobID(), recoveredJob, this::runJob);
+				final CompletableFuture<?> runFuture = waitForTerminatingJobManager(recoveredJob.getJobID(), recoveredJob, this::runJob);
 				runFutures.add(runFuture);
 			}
 
@@ -850,7 +867,7 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme
 		}
 	}
 
-	private CompletableFuture<Void> waitForTerminatingJobManager(JobID jobId, JobGraph jobGraph, ThrowingConsumer<JobGraph, ?> action) {
+	private CompletableFuture<Void> waitForTerminatingJobManager(JobID jobId, JobGraph jobGraph, FunctionWithException<JobGraph, CompletableFuture<Void>, ?> action) {
 		final CompletableFuture<Void> jobManagerTerminationFuture = getJobTerminationFuture(jobId)
 			.exceptionally((Throwable throwable) -> {
 				throw new CompletionException(
@@ -858,16 +875,16 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme
 						String.format("Termination of previous JobManager for job %s failed. Cannot submit job under the same job id.", jobId),
 						throwable)); });
 
-		return jobManagerTerminationFuture.thenRunAsync(
-			ThrowingRunnable.unchecked(() -> {
+		return jobManagerTerminationFuture.thenComposeAsync(
+			FunctionUtils.uncheckedFunction((ignored) -> {
 				jobManagerTerminationFutures.remove(jobId);
-				action.accept(jobGraph);
+				return action.apply(jobGraph);
 			}),
 			getMainThreadExecutor());
 	}
 
 	CompletableFuture<Void> getJobTerminationFuture(JobID jobId) {
-		if (jobManagerRunners.containsKey(jobId)) {
+		if (jobManagerRunnerFutures.containsKey(jobId)) {
 			return FutureUtils.completedExceptionally(new DispatcherException(String.format("Job with job id %s is still running.", jobId)));
 		} else {
 			return jobManagerTerminationFutures.getOrDefault(jobId, CompletableFuture.completedFuture(null));
@@ -923,7 +940,7 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme
 	public void onAddedJobGraph(final JobID jobId) {
 		runAsync(
 			() -> {
-				if (!jobManagerRunners.containsKey(jobId)) {
+				if (!jobManagerRunnerFutures.containsKey(jobId)) {
 					// IMPORTANT: onAddedJobGraph can generate false positives and, thus, we must expect that
 					// the specified job is already removed from the SubmittedJobGraphStore. In this case,
 					// SubmittedJobGraphStore.recoverJob returns null.
@@ -962,7 +979,7 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme
 	private CompletableFuture<Boolean> tryRunRecoveredJobGraph(JobGraph jobGraph, DispatcherId dispatcherId) throws Exception {
 		if (leaderElectionService.hasLeadership(dispatcherId.toUUID())) {
 			final JobID jobId = jobGraph.getJobID();
-			if (jobManagerRunners.containsKey(jobId)) {
+			if (jobManagerRunnerFutures.containsKey(jobId)) {
 				// we must not release the job graph lock since it can only be locked once and
 				// is currently being executed. Once we support multiple locks, we must release
 				// the JobGraph here
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
index 1af10b8..2442676 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.BlobServerOptions;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.runtime.blob.BlobServer;
 import org.apache.flink.runtime.blob.VoidBlobStore;
 import org.apache.flink.runtime.checkpoint.Checkpoints;
@@ -64,6 +65,7 @@ import org.apache.flink.runtime.util.TestingFatalErrorHandler;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.function.ThrowingRunnable;
 
 import org.junit.After;
 import org.junit.AfterClass;
@@ -86,7 +88,9 @@ import java.nio.file.Paths;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Map;
+import java.util.Optional;
 import java.util.UUID;
+import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
@@ -144,6 +148,10 @@ public class DispatcherTest extends TestLogger {
 	/** Instance under test. */
 	private TestingDispatcher dispatcher;
 
+	private TestingHighAvailabilityServices haServices;
+
+	private HeartbeatServices heartbeatServices;
+
 	@BeforeClass
 	public static void setupClass() {
 		rpcService = new TestingRpcService();
@@ -166,13 +174,13 @@ public class DispatcherTest extends TestLogger {
 		jobGraph.setAllowQueuedScheduling(true);
 
 		fatalErrorHandler = new TestingFatalErrorHandler();
-		final HeartbeatServices heartbeatServices = new HeartbeatServices(1000L, 10000L);
+		heartbeatServices = new HeartbeatServices(1000L, 10000L);
 		submittedJobGraphStore = new FaultySubmittedJobGraphStore();
 
 		dispatcherLeaderElectionService = new TestingLeaderElectionService();
 		jobMasterLeaderElectionService = new TestingLeaderElectionService();
 
-		final TestingHighAvailabilityServices haServices = new TestingHighAvailabilityServices();
+		haServices = new TestingHighAvailabilityServices();
 		haServices.setDispatcherLeaderElectionService(dispatcherLeaderElectionService);
 		haServices.setSubmittedJobGraphStore(submittedJobGraphStore);
 		haServices.setJobMasterLeaderElectionService(TEST_JOB_ID, jobMasterLeaderElectionService);
@@ -188,14 +196,18 @@ public class DispatcherTest extends TestLogger {
 
 		createdJobManagerRunnerLatch = new CountDownLatch(2);
 		blobServer = new BlobServer(configuration, new VoidBlobStore());
+	}
 
-		dispatcher = createDispatcher(heartbeatServices, haServices);
-
+	@Nonnull
+	private TestingDispatcher createAndStartDispatcher(HeartbeatServices heartbeatServices, TestingHighAvailabilityServices haServices, Dispatcher.JobManagerRunnerFactory jobManagerRunnerFactory) throws Exception {
+		final TestingDispatcher dispatcher = createDispatcher(heartbeatServices, haServices, jobManagerRunnerFactory);
 		dispatcher.start();
+
+		return dispatcher;
 	}
 
 	@Nonnull
-	private TestingDispatcher createDispatcher(HeartbeatServices heartbeatServices, TestingHighAvailabilityServices haServices) throws Exception {
+	private TestingDispatcher createDispatcher(HeartbeatServices heartbeatServices, TestingHighAvailabilityServices haServices, Dispatcher.JobManagerRunnerFactory jobManagerRunnerFactory) throws Exception {
 		return new TestingDispatcher(
 			rpcService,
 			Dispatcher.DISPATCHER_NAME + '_' + name.getMethodName(),
@@ -207,7 +219,7 @@ public class DispatcherTest extends TestLogger {
 			UnregisteredMetricGroups.createUnregisteredJobManagerMetricGroup(),
 			null,
 			new MemoryArchivedExecutionGraphStore(),
-			new ExpectedJobIdJobManagerRunnerFactory(TEST_JOB_ID, createdJobManagerRunnerLatch),
+			jobManagerRunnerFactory,
 			fatalErrorHandler);
 	}
 
@@ -216,7 +228,13 @@ public class DispatcherTest extends TestLogger {
 		try {
 			fatalErrorHandler.rethrowError();
 		} finally {
-			RpcUtils.terminateRpcEndpoint(dispatcher, TIMEOUT);
+			if (dispatcher != null) {
+				RpcUtils.terminateRpcEndpoint(dispatcher, TIMEOUT);
+			}
+		}
+
+		if (haServices != null) {
+			haServices.closeAndCleanupAllData();
 		}
 
 		if (blobServer != null) {
@@ -230,6 +248,8 @@ public class DispatcherTest extends TestLogger {
 	 */
 	@Test
 	public void testJobSubmission() throws Exception {
+		dispatcher = createAndStartDispatcher(heartbeatServices, haServices, new ExpectedJobIdJobManagerRunnerFactory(TEST_JOB_ID, createdJobManagerRunnerLatch));
+
 		CompletableFuture<UUID> leaderFuture = dispatcherLeaderElectionService.isLeader(UUID.randomUUID());
 
 		// wait for the leader to be elected
@@ -251,6 +271,8 @@ public class DispatcherTest extends TestLogger {
 	 */
 	@Test
 	public void testLeaderElection() throws Exception {
+		dispatcher = createAndStartDispatcher(heartbeatServices, haServices, new ExpectedJobIdJobManagerRunnerFactory(TEST_JOB_ID, createdJobManagerRunnerLatch));
+
 		CompletableFuture<Void> jobIdsFuture = new CompletableFuture<>();
 		submittedJobGraphStore.setJobIdsFunction(
 			(Collection<JobID> jobIds) -> {
@@ -270,6 +292,8 @@ public class DispatcherTest extends TestLogger {
 	 */
 	@Test
 	public void testSubmittedJobGraphListener() throws Exception {
+		dispatcher = createAndStartDispatcher(heartbeatServices, haServices, new ExpectedJobIdJobManagerRunnerFactory(TEST_JOB_ID, createdJobManagerRunnerLatch));
+
 		dispatcherLeaderElectionService.isLeader(UUID.randomUUID()).get();
 
 		final DispatcherGateway dispatcherGateway = dispatcher.getSelfGateway(DispatcherGateway.class);
@@ -294,6 +318,8 @@ public class DispatcherTest extends TestLogger {
 
 	@Test
 	public void testOnAddedJobGraphRecoveryFailure() throws Exception {
+		dispatcher = createAndStartDispatcher(heartbeatServices, haServices, new ExpectedJobIdJobManagerRunnerFactory(TEST_JOB_ID, createdJobManagerRunnerLatch));
+
 		final FlinkException expectedFailure = new FlinkException("Expected failure");
 		submittedJobGraphStore.setRecoveryFailure(expectedFailure);
 
@@ -313,6 +339,8 @@ public class DispatcherTest extends TestLogger {
 
 	@Test
 	public void testOnAddedJobGraphWithFinishedJob() throws Throwable {
+		dispatcher = createAndStartDispatcher(heartbeatServices, haServices, new ExpectedJobIdJobManagerRunnerFactory(TEST_JOB_ID, createdJobManagerRunnerLatch));
+
 		dispatcherLeaderElectionService.isLeader(UUID.randomUUID()).get();
 
 		submittedJobGraphStore.putJobGraph(new SubmittedJobGraph(jobGraph, null));
@@ -333,6 +361,8 @@ public class DispatcherTest extends TestLogger {
 	 */
 	@Test
 	public void testCacheJobExecutionResult() throws Exception {
+		dispatcher = createAndStartDispatcher(heartbeatServices, haServices, new ExpectedJobIdJobManagerRunnerFactory(TEST_JOB_ID, createdJobManagerRunnerLatch));
+
 		dispatcherLeaderElectionService.isLeader(UUID.randomUUID()).get();
 
 		final DispatcherGateway dispatcherGateway = dispatcher.getSelfGateway(DispatcherGateway.class);
@@ -358,6 +388,8 @@ public class DispatcherTest extends TestLogger {
 
 	@Test
 	public void testThrowExceptionIfJobExecutionResultNotFound() throws Exception {
+		dispatcher = createAndStartDispatcher(heartbeatServices, haServices, new ExpectedJobIdJobManagerRunnerFactory(TEST_JOB_ID, createdJobManagerRunnerLatch));
+
 		dispatcherLeaderElectionService.isLeader(UUID.randomUUID()).get();
 
 		final DispatcherGateway dispatcherGateway = dispatcher.getSelfGateway(DispatcherGateway.class);
@@ -374,6 +406,8 @@ public class DispatcherTest extends TestLogger {
 	 */
 	@Test
 	public void testJobRecovery() throws Exception {
+		dispatcher = createAndStartDispatcher(heartbeatServices, haServices, new ExpectedJobIdJobManagerRunnerFactory(TEST_JOB_ID, createdJobManagerRunnerLatch));
+
 		final DispatcherGateway dispatcherGateway = dispatcher.getSelfGateway(DispatcherGateway.class);
 
 		// elect the initial dispatcher as the leader
@@ -413,6 +447,8 @@ public class DispatcherTest extends TestLogger {
 		final URI externalPointer = createTestingSavepoint();
 		final Path savepointPath = Paths.get(externalPointer);
 
+		dispatcher = createAndStartDispatcher(heartbeatServices, haServices, new ExpectedJobIdJobManagerRunnerFactory(TEST_JOB_ID, createdJobManagerRunnerLatch));
+
 		final DispatcherGateway dispatcherGateway = dispatcher.getSelfGateway(DispatcherGateway.class);
 
 		dispatcherLeaderElectionService.isLeader(UUID.randomUUID()).get();
@@ -447,7 +483,9 @@ public class DispatcherTest extends TestLogger {
 	 * to it. See FLINK-8887.
 	 */
 	@Test
-	public void testWaitingForJobMasterLeadership() throws ExecutionException, InterruptedException {
+	public void testWaitingForJobMasterLeadership() throws Exception {
+		dispatcher = createAndStartDispatcher(heartbeatServices, haServices, new ExpectedJobIdJobManagerRunnerFactory(TEST_JOB_ID, createdJobManagerRunnerLatch));
+
 		final DispatcherGateway dispatcherGateway = dispatcher.getSelfGateway(DispatcherGateway.class);
 
 		dispatcherLeaderElectionService.isLeader(UUID.randomUUID()).get();
@@ -476,6 +514,8 @@ public class DispatcherTest extends TestLogger {
 	 */
 	@Test
 	public void testFatalErrorAfterJobIdRecoveryFailure() throws Exception {
+		dispatcher = createAndStartDispatcher(heartbeatServices, haServices, new ExpectedJobIdJobManagerRunnerFactory(TEST_JOB_ID, createdJobManagerRunnerLatch));
+
 		final FlinkException testException = new FlinkException("Test exception");
 		submittedJobGraphStore.setJobIdsFunction(
 			(Collection<JobID> jobIds) -> {
@@ -500,6 +540,8 @@ public class DispatcherTest extends TestLogger {
 	public void testFatalErrorAfterJobRecoveryFailure() throws Exception {
 		final FlinkException testException = new FlinkException("Test exception");
 
+		dispatcher = createAndStartDispatcher(heartbeatServices, haServices, new ExpectedJobIdJobManagerRunnerFactory(TEST_JOB_ID, createdJobManagerRunnerLatch));
+
 		final SubmittedJobGraph submittedJobGraph = new SubmittedJobGraph(jobGraph, null);
 		submittedJobGraphStore.putJobGraph(submittedJobGraph);
 
@@ -526,6 +568,8 @@ public class DispatcherTest extends TestLogger {
 	public void testJobSubmissionErrorAfterJobRecovery() throws Exception {
 		final FlinkException testException = new FlinkException("Test exception");
 
+		dispatcher = createAndStartDispatcher(heartbeatServices, haServices, new ExpectedJobIdJobManagerRunnerFactory(TEST_JOB_ID, createdJobManagerRunnerLatch));
+
 		final JobGraph failingJobGraph = createFailingJobGraph(testException);
 
 		final SubmittedJobGraph submittedJobGraph = new SubmittedJobGraph(failingJobGraph, null);
@@ -540,6 +584,102 @@ public class DispatcherTest extends TestLogger {
 		fatalErrorHandler.clearError();
 	}
 
+	/**
+	 * Tests that a blocking {@link JobManagerRunner} creation, e.g. due to blocking FileSystem access,
+	 * does not block the {@link Dispatcher}.
+	 *
+	 * <p>See FLINK-10314
+	 */
+	@Test
+	public void testBlockingJobManagerRunner() throws Exception {
+		final OneShotLatch jobManagerRunnerCreationLatch = new OneShotLatch();
+		dispatcher = createAndStartDispatcher(
+			heartbeatServices,
+			haServices,
+			new BlockingJobManagerRunnerFactory(jobManagerRunnerCreationLatch::await));
+
+		dispatcherLeaderElectionService.isLeader(UUID.randomUUID()).get();
+
+		final DispatcherGateway dispatcherGateway = dispatcher.getSelfGateway(DispatcherGateway.class);
+
+		final CompletableFuture<Acknowledge> submissionFuture = dispatcherGateway.submitJob(jobGraph, TIMEOUT);
+
+		assertThat(submissionFuture.isDone(), is(false));
+
+		final CompletableFuture<Collection<String>> metricQueryServicePathsFuture = dispatcherGateway.requestMetricQueryServicePaths(Time.seconds(5L));
+
+		assertThat(metricQueryServicePathsFuture.get(), is(empty()));
+
+		assertThat(submissionFuture.isDone(), is(false));
+
+		jobManagerRunnerCreationLatch.trigger();
+
+		submissionFuture.get();
+	}
+
+	/**
+	 * Tests that a failing {@link JobManagerRunner} will be properly cleaned up.
+	 */
+	@Test
+	public void testFailingJobManagerRunnerCleanup() throws Exception {
+		final FlinkException testException = new FlinkException("Test exception.");
+		final ArrayBlockingQueue<Optional<Exception>> queue = new ArrayBlockingQueue<>(2);
+
+		dispatcher = createAndStartDispatcher(
+			heartbeatServices,
+			haServices,
+			new BlockingJobManagerRunnerFactory(() -> {
+				final Optional<Exception> take = queue.take();
+				final Exception exception = take.orElse(null);
+
+				if (exception != null) {
+					throw exception;
+				}
+			}));
+
+		dispatcherLeaderElectionService.isLeader(UUID.randomUUID()).get();
+
+		final DispatcherGateway dispatcherGateway = dispatcher.getSelfGateway(DispatcherGateway.class);
+
+		CompletableFuture<Acknowledge> submissionFuture = dispatcherGateway.submitJob(jobGraph, TIMEOUT);
+
+		assertThat(submissionFuture.isDone(), is(false));
+
+		queue.offer(Optional.of(testException));
+
+		try {
+			submissionFuture.get();
+			fail("Should fail because we could not instantiate the JobManagerRunner.");
+		} catch (Exception e) {
+			assertThat(ExceptionUtils.findThrowable(e, t -> t.equals(testException)).isPresent(), is(true));
+		}
+
+		submissionFuture = dispatcherGateway.submitJob(jobGraph, TIMEOUT);
+
+		queue.offer(Optional.empty());
+
+		submissionFuture.get();
+	}
+
+	private final class BlockingJobManagerRunnerFactory extends TestingJobManagerRunnerFactory {
+
+		@Nonnull
+		private final ThrowingRunnable<Exception> jobManagerRunnerCreationLatch;
+
+		BlockingJobManagerRunnerFactory(@Nonnull ThrowingRunnable<Exception> jobManagerRunnerCreationLatch) {
+			super(new CompletableFuture<>(), new CompletableFuture<>(), CompletableFuture.completedFuture(null));
+
+			this.jobManagerRunnerCreationLatch = jobManagerRunnerCreationLatch;
+		}
+
+		@Override
+		public JobManagerRunner createJobManagerRunner(ResourceID resourceId, JobGraph jobGraph, Configuration configuration, RpcService rpcService, HighAvailabilityServices highAvailabilityServices, HeartbeatServices heartbeatServices, BlobServer blobServer, JobManagerSharedServices jobManagerSharedServices, JobManagerJobMetricGroupFactory jobManagerJobMetricGroupFactory, FatalErrorHandler fatalErrorHandler) throws Exception {
+			jobManagerRunnerCreationLatch.run();
+
+			return super.createJobManagerRunner(resourceId, jobGraph, configuration, rpcService, highAvailabilityServices, heartbeatServices, blobServer, jobManagerSharedServices, jobManagerJobMetricGroupFactory, fatalErrorHandler);
+		}
+	}
+
 	private void electDispatcher() {
 		UUID expectedLeaderSessionId = UUID.randomUUID();
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingJobManagerRunnerFactory.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingJobManagerRunnerFactory.java
index cb48648..992f087 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingJobManagerRunnerFactory.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingJobManagerRunnerFactory.java
@@ -40,7 +40,7 @@ import static org.mockito.Mockito.when;
  * {@link org.apache.flink.runtime.dispatcher.Dispatcher.JobManagerRunnerFactory} implementation for
  * testing purposes.
  */
-final class TestingJobManagerRunnerFactory implements Dispatcher.JobManagerRunnerFactory {
+class TestingJobManagerRunnerFactory implements Dispatcher.JobManagerRunnerFactory {
 
 	private final CompletableFuture<JobGraph> jobGraphFuture;
 	private final CompletableFuture<ArchivedExecutionGraph> resultFuture;
@@ -63,12 +63,13 @@ final class TestingJobManagerRunnerFactory implements Dispatcher.JobManagerRunne
 			BlobServer blobServer,
 			JobManagerSharedServices jobManagerSharedServices,
 			JobManagerJobMetricGroupFactory jobManagerJobMetricGroupFactory,
-			FatalErrorHandler fatalErrorHandler) {
+			FatalErrorHandler fatalErrorHandler) throws Exception {
 		jobGraphFuture.complete(jobGraph);
 
 		final JobManagerRunner mock = mock(JobManagerRunner.class);
 		when(mock.getResultFuture()).thenReturn(resultFuture);
 		when(mock.closeAsync()).thenReturn(terminationFuture);
+		when(mock.getJobGraph()).thenReturn(jobGraph);
 
 		return mock;
 	}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/utils/TestingResourceManagerGateway.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/utils/TestingResourceManagerGateway.java
index c38ea5d..950a4e1 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/utils/TestingResourceManagerGateway.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/utils/TestingResourceManagerGateway.java
@@ -298,7 +298,7 @@ public class TestingResourceManagerGateway implements ResourceManagerGateway {
 
 	@Override
 	public CompletableFuture<ResourceOverview> requestResourceOverview(Time timeout) {
-		return FutureUtils.completedExceptionally(new UnsupportedOperationException("Not yet implemented"));
+		return CompletableFuture.completedFuture(new ResourceOverview(1, 1, 1));
 	}
 
 	@Override