You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/09/14 19:51:20 UTC

[flink] branch master updated (2e21582 -> 199f175)

This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from 2e21582  [FLINK-10325] [State TTL] Refactor TtlListState to use only loops, no java stream API for performance
     new 89ddffe  [hotfix] Replace DispatcherResourceCleanupTest#TestingJobManagerRunnerFactory with TestingJobmanagerRunnerFactory
     new 199f175  [FLINK-10314] Making JobManagerRunner creation non-blocking in Dispatcher

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../apache/flink/util/function/FunctionUtils.java  |  13 ++
 .../flink/runtime/dispatcher/Dispatcher.java       | 135 ++++++++++--------
 .../flink/runtime/dispatcher/DispatcherHATest.java |   2 +-
 .../dispatcher/DispatcherResourceCleanupTest.java  |  33 +----
 .../flink/runtime/dispatcher/DispatcherTest.java   | 156 +++++++++++++++++++--
 .../runtime/dispatcher/MiniDispatcherTest.java     |   2 +-
 .../dispatcher/TestingJobManagerRunnerFactory.java |   9 +-
 .../dispatcher/ZooKeeperHADispatcherTest.java      |  10 +-
 .../utils/TestingResourceManagerGateway.java       |   2 +-
 9 files changed, 252 insertions(+), 110 deletions(-)


[flink] 02/02: [FLINK-10314] Making JobManagerRunner creation non-blocking in Dispatcher

Posted by tr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 199f1758e9bdbdc316ec7168791a8e95373159a6
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Fri Sep 14 12:07:12 2018 +0200

    [FLINK-10314] Making JobManagerRunner creation non-blocking in Dispatcher
    
    The JobManagerRunner creation can be a blocking operation, e.g. if the CheckpointCoordinator
    needs to access a FileSystem. Therefore, this operation should not be executed in the main thread
    of the Dispatcher in order to not block this component.
    
    This closes #6699.
---
 .../apache/flink/util/function/FunctionUtils.java  |  13 ++
 .../flink/runtime/dispatcher/Dispatcher.java       | 135 ++++++++++--------
 .../flink/runtime/dispatcher/DispatcherTest.java   | 156 +++++++++++++++++++--
 .../dispatcher/TestingJobManagerRunnerFactory.java |   5 +-
 .../utils/TestingResourceManagerGateway.java       |   2 +-
 5 files changed, 241 insertions(+), 70 deletions(-)

diff --git a/flink-core/src/main/java/org/apache/flink/util/function/FunctionUtils.java b/flink-core/src/main/java/org/apache/flink/util/function/FunctionUtils.java
index 678ef9f..83846a6 100644
--- a/flink-core/src/main/java/org/apache/flink/util/function/FunctionUtils.java
+++ b/flink-core/src/main/java/org/apache/flink/util/function/FunctionUtils.java
@@ -32,6 +32,19 @@ public class FunctionUtils {
 		throw new UnsupportedOperationException("This class should never be instantiated.");
 	}
 
+	private static final Function<Object, Void> NULL_FN = ignored -> null;
+
+	/**
+	 * Function which returns {@code null} (type: Void).
+	 *
+	 * @param <T> input type
+	 * @return Function which returns {@code null}.
+	 */
+	@SuppressWarnings("unchecked")
+	public static <T> Function<T, Void> nullFn() {
+		return (Function<T, Void>) NULL_FN;
+	}
+
 	/**
 	 * Convert at {@link FunctionWithException} into a {@link Function}.
 	 *
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
index 5279e50..a1da213 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
@@ -63,10 +63,11 @@ import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.function.BiConsumerWithException;
 import org.apache.flink.util.function.BiFunctionWithException;
+import org.apache.flink.util.function.CheckedSupplier;
 import org.apache.flink.util.function.FunctionUtils;
-import org.apache.flink.util.function.ThrowingConsumer;
-import org.apache.flink.util.function.ThrowingRunnable;
+import org.apache.flink.util.function.FunctionWithException;
 
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
@@ -110,7 +111,7 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme
 
 	private final FatalErrorHandler fatalErrorHandler;
 
-	private final Map<JobID, JobManagerRunner> jobManagerRunners;
+	private final Map<JobID, CompletableFuture<JobManagerRunner>> jobManagerRunnerFutures;
 
 	private final LeaderElectionService leaderElectionService;
 
@@ -166,7 +167,7 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme
 
 		this.runningJobsRegistry = highAvailabilityServices.getRunningJobsRegistry();
 
-		jobManagerRunners = new HashMap<>(16);
+		jobManagerRunnerFutures = new HashMap<>(16);
 
 		leaderElectionService = highAvailabilityServices.getDispatcherLeaderElectionService();
 
@@ -248,7 +249,7 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme
 			return FutureUtils.completedExceptionally(new FlinkException(String.format("Failed to retrieve job scheduling status for job %s.", jobId), e));
 		}
 
-		if (jobSchedulingStatus == RunningJobsRegistry.JobSchedulingStatus.DONE || jobManagerRunners.containsKey(jobId)) {
+		if (jobSchedulingStatus == RunningJobsRegistry.JobSchedulingStatus.DONE || jobManagerRunnerFutures.containsKey(jobId)) {
 			return FutureUtils.completedExceptionally(
 				new JobSubmissionException(jobId, String.format("Job has already been submitted and is in state %s.", jobSchedulingStatus)));
 		} else {
@@ -257,58 +258,72 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme
 
 			return persistAndRunFuture.exceptionally(
 				(Throwable throwable) -> {
+					final Throwable strippedThrowable = ExceptionUtils.stripCompletionException(throwable);
+					log.error("Failed to submit job {}.", jobId, strippedThrowable);
 					throw new CompletionException(
-						new JobSubmissionException(jobId, "Failed to submit job.", throwable));
+						new JobSubmissionException(jobId, "Failed to submit job.", strippedThrowable));
 				});
 		}
 	}
 
-	private void persistAndRunJob(JobGraph jobGraph) throws Exception {
+	private CompletableFuture<Void> persistAndRunJob(JobGraph jobGraph) throws Exception {
 		submittedJobGraphStore.putJobGraph(new SubmittedJobGraph(jobGraph, null));
 
-		try {
-			runJob(jobGraph);
-		} catch (Exception e) {
-			try {
+		final CompletableFuture<Void> runJobFuture = runJob(jobGraph);
+
+		return runJobFuture.whenComplete(BiConsumerWithException.unchecked((Object ignored, Throwable throwable) -> {
+			if (throwable != null) {
 				submittedJobGraphStore.removeJobGraph(jobGraph.getJobID());
-			} catch (Exception ie) {
-				e.addSuppressed(ie);
 			}
-
-			throw e;
-		}
+		}));
 	}
 
-	private void runJob(JobGraph jobGraph) throws Exception {
-		Preconditions.checkState(!jobManagerRunners.containsKey(jobGraph.getJobID()));
+	private CompletableFuture<Void> runJob(JobGraph jobGraph) {
+		Preconditions.checkState(!jobManagerRunnerFutures.containsKey(jobGraph.getJobID()));
 
-		final JobManagerRunner jobManagerRunner = createJobManagerRunner(jobGraph);
+		final CompletableFuture<JobManagerRunner> jobManagerRunnerFuture = createJobManagerRunner(jobGraph);
 
-		jobManagerRunner.start();
+		jobManagerRunnerFutures.put(jobGraph.getJobID(), jobManagerRunnerFuture);
 
-		jobManagerRunners.put(jobGraph.getJobID(), jobManagerRunner);
+		return jobManagerRunnerFuture
+			.thenApply(FunctionUtils.nullFn())
+			.whenCompleteAsync(
+				(ignored, throwable) -> {
+					if (throwable != null) {
+						jobManagerRunnerFutures.remove(jobGraph.getJobID());
+					}
+				},
+				getMainThreadExecutor());
 	}
 
-	private JobManagerRunner createJobManagerRunner(JobGraph jobGraph) throws Exception {
-		final JobID jobId = jobGraph.getJobID();
-
-		final JobManagerRunner jobManagerRunner = jobManagerRunnerFactory.createJobManagerRunner(
-			ResourceID.generate(),
-			jobGraph,
-			configuration,
-			getRpcService(),
-			highAvailabilityServices,
-			heartbeatServices,
-			blobServer,
-			jobManagerSharedServices,
-			new DefaultJobManagerJobMetricGroupFactory(jobManagerMetricGroup),
-			fatalErrorHandler);
+	private CompletableFuture<JobManagerRunner> createJobManagerRunner(JobGraph jobGraph) {
+		final RpcService rpcService = getRpcService();
+
+		final CompletableFuture<JobManagerRunner> jobManagerRunnerFuture = CompletableFuture.supplyAsync(
+			CheckedSupplier.unchecked(() ->
+				jobManagerRunnerFactory.createJobManagerRunner(
+					ResourceID.generate(),
+					jobGraph,
+					configuration,
+					rpcService,
+					highAvailabilityServices,
+					heartbeatServices,
+					blobServer,
+					jobManagerSharedServices,
+					new DefaultJobManagerJobMetricGroupFactory(jobManagerMetricGroup),
+					fatalErrorHandler)),
+			rpcService.getExecutor());
+
+		return jobManagerRunnerFuture.thenApply(FunctionUtils.uncheckedFunction(this::startJobManagerRunner));
+	}
 
+	private JobManagerRunner startJobManagerRunner(JobManagerRunner jobManagerRunner) throws Exception {
+		final JobID jobId = jobManagerRunner.getJobGraph().getJobID();
 		jobManagerRunner.getResultFuture().whenCompleteAsync(
 			(ArchivedExecutionGraph archivedExecutionGraph, Throwable throwable) -> {
 				// check if we are still the active JobManagerRunner by checking the identity
 				//noinspection ObjectEquality
-				if (jobManagerRunner == jobManagerRunners.get(jobId)) {
+				if (jobManagerRunner == jobManagerRunnerFutures.get(jobId).getNow(null)) {
 					if (archivedExecutionGraph != null) {
 						jobReachedGloballyTerminalState(archivedExecutionGraph);
 					} else {
@@ -325,13 +340,15 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme
 				}
 			}, getMainThreadExecutor());
 
+		jobManagerRunner.start();
+
 		return jobManagerRunner;
 	}
 
 	@Override
 	public CompletableFuture<Collection<JobID>> listJobs(Time timeout) {
 		return CompletableFuture.completedFuture(
-			Collections.unmodifiableSet(new HashSet<>(jobManagerRunners.keySet())));
+			Collections.unmodifiableSet(new HashSet<>(jobManagerRunnerFutures.keySet())));
 	}
 
 	@Override
@@ -481,9 +498,9 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme
 
 	@Override
 	public CompletableFuture<JobResult> requestJobResult(JobID jobId, Time timeout) {
-		final JobManagerRunner jobManagerRunner = jobManagerRunners.get(jobId);
+		final CompletableFuture<JobManagerRunner> jobManagerRunnerFuture = jobManagerRunnerFutures.get(jobId);
 
-		if (jobManagerRunner == null) {
+		if (jobManagerRunnerFuture == null) {
 			final ArchivedExecutionGraph archivedExecutionGraph = archivedExecutionGraphStore.get(jobId);
 
 			if (archivedExecutionGraph == null) {
@@ -492,7 +509,7 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme
 				return CompletableFuture.completedFuture(JobResult.createFrom(archivedExecutionGraph));
 			}
 		} else {
-			return jobManagerRunner.getResultFuture().thenApply(JobResult::createFrom);
+			return jobManagerRunnerFuture.thenCompose(JobManagerRunner::getResultFuture).thenApply(JobResult::createFrom);
 		}
 	}
 
@@ -566,11 +583,11 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme
 	}
 
 	private CompletableFuture<Void> removeJob(JobID jobId, boolean cleanupHA) {
-		JobManagerRunner jobManagerRunner = jobManagerRunners.remove(jobId);
+		CompletableFuture<JobManagerRunner> jobManagerRunnerFuture = jobManagerRunnerFutures.remove(jobId);
 
 		final CompletableFuture<Void> jobManagerRunnerTerminationFuture;
-		if (jobManagerRunner != null) {
-			jobManagerRunnerTerminationFuture = jobManagerRunner.closeAsync();
+		if (jobManagerRunnerFuture != null) {
+			jobManagerRunnerTerminationFuture = jobManagerRunnerFuture.thenCompose(JobManagerRunner::closeAsync);
 		} else {
 			jobManagerRunnerTerminationFuture = CompletableFuture.completedFuture(null);
 		}
@@ -616,7 +633,7 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme
 	private void terminateJobManagerRunners() {
 		log.info("Stopping all currently running jobs of dispatcher {}.", getAddress());
 
-		final HashSet<JobID> jobsToRemove = new HashSet<>(jobManagerRunners.keySet());
+		final HashSet<JobID> jobsToRemove = new HashSet<>(jobManagerRunnerFutures.keySet());
 
 		for (JobID jobId : jobsToRemove) {
 			removeJobAndRegisterTerminationFuture(jobId, false);
@@ -739,16 +756,16 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme
 	}
 
 	private CompletableFuture<JobMasterGateway> getJobMasterGatewayFuture(JobID jobId) {
-		final JobManagerRunner jobManagerRunner = jobManagerRunners.get(jobId);
+		final CompletableFuture<JobManagerRunner> jobManagerRunnerFuture = jobManagerRunnerFutures.get(jobId);
 
-		if (jobManagerRunner == null) {
+		if (jobManagerRunnerFuture == null) {
 			return FutureUtils.completedExceptionally(new FlinkJobNotFoundException(jobId));
 		} else {
-			final CompletableFuture<JobMasterGateway> leaderGatewayFuture = jobManagerRunner.getLeaderGatewayFuture();
+			final CompletableFuture<JobMasterGateway> leaderGatewayFuture = jobManagerRunnerFuture.thenCompose(JobManagerRunner::getLeaderGatewayFuture);
 			return leaderGatewayFuture.thenApplyAsync(
 				(JobMasterGateway jobMasterGateway) -> {
 					// check whether the retrieved JobMasterGateway belongs still to a running JobMaster
-					if (jobManagerRunners.containsKey(jobId)) {
+					if (jobManagerRunnerFutures.containsKey(jobId)) {
 						return jobMasterGateway;
 					} else {
 						throw new CompletionException(new FlinkJobNotFoundException(jobId));
@@ -764,12 +781,12 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme
 
 	@Nonnull
 	private <T> List<CompletableFuture<Optional<T>>> queryJobMastersForInformation(Function<JobMasterGateway, CompletableFuture<T>> queryFunction) {
-		final int numberJobsRunning = jobManagerRunners.size();
+		final int numberJobsRunning = jobManagerRunnerFutures.size();
 
 		ArrayList<CompletableFuture<Optional<T>>> optionalJobInformation = new ArrayList<>(
 			numberJobsRunning);
 
-		for (JobID jobId : jobManagerRunners.keySet()) {
+		for (JobID jobId : jobManagerRunnerFutures.keySet()) {
 			final CompletableFuture<JobMasterGateway> jobMasterGatewayFuture = getJobMasterGatewayFuture(jobId);
 
 			final CompletableFuture<Optional<T>> optionalRequest = jobMasterGatewayFuture
@@ -836,10 +853,10 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme
 			log.debug("Dispatcher {} accepted leadership with fencing token {}. Start recovered jobs.", getAddress(), dispatcherId);
 			setNewFencingToken(dispatcherId);
 
-			Collection<CompletableFuture<Void>> runFutures = new ArrayList<>(recoveredJobs.size());
+			Collection<CompletableFuture<?>> runFutures = new ArrayList<>(recoveredJobs.size());
 
 			for (JobGraph recoveredJob : recoveredJobs) {
-				final CompletableFuture<Void> runFuture = waitForTerminatingJobManager(recoveredJob.getJobID(), recoveredJob, this::runJob);
+				final CompletableFuture<?> runFuture = waitForTerminatingJobManager(recoveredJob.getJobID(), recoveredJob, this::runJob);
 				runFutures.add(runFuture);
 			}
 
@@ -850,7 +867,7 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme
 		}
 	}
 
-	private CompletableFuture<Void> waitForTerminatingJobManager(JobID jobId, JobGraph jobGraph, ThrowingConsumer<JobGraph, ?> action) {
+	private CompletableFuture<Void> waitForTerminatingJobManager(JobID jobId, JobGraph jobGraph, FunctionWithException<JobGraph, CompletableFuture<Void>, ?> action) {
 		final CompletableFuture<Void> jobManagerTerminationFuture = getJobTerminationFuture(jobId)
 			.exceptionally((Throwable throwable) -> {
 				throw new CompletionException(
@@ -858,16 +875,16 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme
 						String.format("Termination of previous JobManager for job %s failed. Cannot submit job under the same job id.", jobId),
 						throwable)); });
 
-		return jobManagerTerminationFuture.thenRunAsync(
-			ThrowingRunnable.unchecked(() -> {
+		return jobManagerTerminationFuture.thenComposeAsync(
+			FunctionUtils.uncheckedFunction((ignored) -> {
 				jobManagerTerminationFutures.remove(jobId);
-				action.accept(jobGraph);
+				return action.apply(jobGraph);
 			}),
 			getMainThreadExecutor());
 	}
 
 	CompletableFuture<Void> getJobTerminationFuture(JobID jobId) {
-		if (jobManagerRunners.containsKey(jobId)) {
+		if (jobManagerRunnerFutures.containsKey(jobId)) {
 			return FutureUtils.completedExceptionally(new DispatcherException(String.format("Job with job id %s is still running.", jobId)));
 		} else {
 			return jobManagerTerminationFutures.getOrDefault(jobId, CompletableFuture.completedFuture(null));
@@ -923,7 +940,7 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme
 	public void onAddedJobGraph(final JobID jobId) {
 		runAsync(
 			() -> {
-				if (!jobManagerRunners.containsKey(jobId)) {
+				if (!jobManagerRunnerFutures.containsKey(jobId)) {
 					// IMPORTANT: onAddedJobGraph can generate false positives and, thus, we must expect that
 					// the specified job is already removed from the SubmittedJobGraphStore. In this case,
 					// SubmittedJobGraphStore.recoverJob returns null.
@@ -962,7 +979,7 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme
 	private CompletableFuture<Boolean> tryRunRecoveredJobGraph(JobGraph jobGraph, DispatcherId dispatcherId) throws Exception {
 		if (leaderElectionService.hasLeadership(dispatcherId.toUUID())) {
 			final JobID jobId = jobGraph.getJobID();
-			if (jobManagerRunners.containsKey(jobId)) {
+			if (jobManagerRunnerFutures.containsKey(jobId)) {
 				// we must not release the job graph lock since it can only be locked once and
 				// is currently being executed. Once we support multiple locks, we must release
 				// the JobGraph here
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
index 1af10b8..2442676 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.BlobServerOptions;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.runtime.blob.BlobServer;
 import org.apache.flink.runtime.blob.VoidBlobStore;
 import org.apache.flink.runtime.checkpoint.Checkpoints;
@@ -64,6 +65,7 @@ import org.apache.flink.runtime.util.TestingFatalErrorHandler;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.function.ThrowingRunnable;
 
 import org.junit.After;
 import org.junit.AfterClass;
@@ -86,7 +88,9 @@ import java.nio.file.Paths;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Map;
+import java.util.Optional;
 import java.util.UUID;
+import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
@@ -144,6 +148,10 @@ public class DispatcherTest extends TestLogger {
 	/** Instance under test. */
 	private TestingDispatcher dispatcher;
 
+	private TestingHighAvailabilityServices haServices;
+
+	private HeartbeatServices heartbeatServices;
+
 	@BeforeClass
 	public static void setupClass() {
 		rpcService = new TestingRpcService();
@@ -166,13 +174,13 @@ public class DispatcherTest extends TestLogger {
 		jobGraph.setAllowQueuedScheduling(true);
 
 		fatalErrorHandler = new TestingFatalErrorHandler();
-		final HeartbeatServices heartbeatServices = new HeartbeatServices(1000L, 10000L);
+		heartbeatServices = new HeartbeatServices(1000L, 10000L);
 		submittedJobGraphStore = new FaultySubmittedJobGraphStore();
 
 		dispatcherLeaderElectionService = new TestingLeaderElectionService();
 		jobMasterLeaderElectionService = new TestingLeaderElectionService();
 
-		final TestingHighAvailabilityServices haServices = new TestingHighAvailabilityServices();
+		haServices = new TestingHighAvailabilityServices();
 		haServices.setDispatcherLeaderElectionService(dispatcherLeaderElectionService);
 		haServices.setSubmittedJobGraphStore(submittedJobGraphStore);
 		haServices.setJobMasterLeaderElectionService(TEST_JOB_ID, jobMasterLeaderElectionService);
@@ -188,14 +196,18 @@ public class DispatcherTest extends TestLogger {
 
 		createdJobManagerRunnerLatch = new CountDownLatch(2);
 		blobServer = new BlobServer(configuration, new VoidBlobStore());
+	}
 
-		dispatcher = createDispatcher(heartbeatServices, haServices);
-
+	@Nonnull
+	private TestingDispatcher createAndStartDispatcher(HeartbeatServices heartbeatServices, TestingHighAvailabilityServices haServices, Dispatcher.JobManagerRunnerFactory jobManagerRunnerFactory) throws Exception {
+		final TestingDispatcher dispatcher = createDispatcher(heartbeatServices, haServices, jobManagerRunnerFactory);
 		dispatcher.start();
+
+		return dispatcher;
 	}
 
 	@Nonnull
-	private TestingDispatcher createDispatcher(HeartbeatServices heartbeatServices, TestingHighAvailabilityServices haServices) throws Exception {
+	private TestingDispatcher createDispatcher(HeartbeatServices heartbeatServices, TestingHighAvailabilityServices haServices, Dispatcher.JobManagerRunnerFactory jobManagerRunnerFactory) throws Exception {
 		return new TestingDispatcher(
 			rpcService,
 			Dispatcher.DISPATCHER_NAME + '_' + name.getMethodName(),
@@ -207,7 +219,7 @@ public class DispatcherTest extends TestLogger {
 			UnregisteredMetricGroups.createUnregisteredJobManagerMetricGroup(),
 			null,
 			new MemoryArchivedExecutionGraphStore(),
-			new ExpectedJobIdJobManagerRunnerFactory(TEST_JOB_ID, createdJobManagerRunnerLatch),
+			jobManagerRunnerFactory,
 			fatalErrorHandler);
 	}
 
@@ -216,7 +228,13 @@ public class DispatcherTest extends TestLogger {
 		try {
 			fatalErrorHandler.rethrowError();
 		} finally {
-			RpcUtils.terminateRpcEndpoint(dispatcher, TIMEOUT);
+			if (dispatcher != null) {
+				RpcUtils.terminateRpcEndpoint(dispatcher, TIMEOUT);
+			}
+		}
+
+		if (haServices != null) {
+			haServices.closeAndCleanupAllData();
 		}
 
 		if (blobServer != null) {
@@ -230,6 +248,8 @@ public class DispatcherTest extends TestLogger {
 	 */
 	@Test
 	public void testJobSubmission() throws Exception {
+		dispatcher = createAndStartDispatcher(heartbeatServices, haServices, new ExpectedJobIdJobManagerRunnerFactory(TEST_JOB_ID, createdJobManagerRunnerLatch));
+
 		CompletableFuture<UUID> leaderFuture = dispatcherLeaderElectionService.isLeader(UUID.randomUUID());
 
 		// wait for the leader to be elected
@@ -251,6 +271,8 @@ public class DispatcherTest extends TestLogger {
 	 */
 	@Test
 	public void testLeaderElection() throws Exception {
+		dispatcher = createAndStartDispatcher(heartbeatServices, haServices, new ExpectedJobIdJobManagerRunnerFactory(TEST_JOB_ID, createdJobManagerRunnerLatch));
+
 		CompletableFuture<Void> jobIdsFuture = new CompletableFuture<>();
 		submittedJobGraphStore.setJobIdsFunction(
 			(Collection<JobID> jobIds) -> {
@@ -270,6 +292,8 @@ public class DispatcherTest extends TestLogger {
 	 */
 	@Test
 	public void testSubmittedJobGraphListener() throws Exception {
+		dispatcher = createAndStartDispatcher(heartbeatServices, haServices, new ExpectedJobIdJobManagerRunnerFactory(TEST_JOB_ID, createdJobManagerRunnerLatch));
+
 		dispatcherLeaderElectionService.isLeader(UUID.randomUUID()).get();
 
 		final DispatcherGateway dispatcherGateway = dispatcher.getSelfGateway(DispatcherGateway.class);
@@ -294,6 +318,8 @@ public class DispatcherTest extends TestLogger {
 
 	@Test
 	public void testOnAddedJobGraphRecoveryFailure() throws Exception {
+		dispatcher = createAndStartDispatcher(heartbeatServices, haServices, new ExpectedJobIdJobManagerRunnerFactory(TEST_JOB_ID, createdJobManagerRunnerLatch));
+
 		final FlinkException expectedFailure = new FlinkException("Expected failure");
 		submittedJobGraphStore.setRecoveryFailure(expectedFailure);
 
@@ -313,6 +339,8 @@ public class DispatcherTest extends TestLogger {
 
 	@Test
 	public void testOnAddedJobGraphWithFinishedJob() throws Throwable {
+		dispatcher = createAndStartDispatcher(heartbeatServices, haServices, new ExpectedJobIdJobManagerRunnerFactory(TEST_JOB_ID, createdJobManagerRunnerLatch));
+
 		dispatcherLeaderElectionService.isLeader(UUID.randomUUID()).get();
 
 		submittedJobGraphStore.putJobGraph(new SubmittedJobGraph(jobGraph, null));
@@ -333,6 +361,8 @@ public class DispatcherTest extends TestLogger {
 	 */
 	@Test
 	public void testCacheJobExecutionResult() throws Exception {
+		dispatcher = createAndStartDispatcher(heartbeatServices, haServices, new ExpectedJobIdJobManagerRunnerFactory(TEST_JOB_ID, createdJobManagerRunnerLatch));
+
 		dispatcherLeaderElectionService.isLeader(UUID.randomUUID()).get();
 
 		final DispatcherGateway dispatcherGateway = dispatcher.getSelfGateway(DispatcherGateway.class);
@@ -358,6 +388,8 @@ public class DispatcherTest extends TestLogger {
 
 	@Test
 	public void testThrowExceptionIfJobExecutionResultNotFound() throws Exception {
+		dispatcher = createAndStartDispatcher(heartbeatServices, haServices, new ExpectedJobIdJobManagerRunnerFactory(TEST_JOB_ID, createdJobManagerRunnerLatch));
+
 		dispatcherLeaderElectionService.isLeader(UUID.randomUUID()).get();
 
 		final DispatcherGateway dispatcherGateway = dispatcher.getSelfGateway(DispatcherGateway.class);
@@ -374,6 +406,8 @@ public class DispatcherTest extends TestLogger {
 	 */
 	@Test
 	public void testJobRecovery() throws Exception {
+		dispatcher = createAndStartDispatcher(heartbeatServices, haServices, new ExpectedJobIdJobManagerRunnerFactory(TEST_JOB_ID, createdJobManagerRunnerLatch));
+
 		final DispatcherGateway dispatcherGateway = dispatcher.getSelfGateway(DispatcherGateway.class);
 
 		// elect the initial dispatcher as the leader
@@ -413,6 +447,8 @@ public class DispatcherTest extends TestLogger {
 		final URI externalPointer = createTestingSavepoint();
 		final Path savepointPath = Paths.get(externalPointer);
 
+		dispatcher = createAndStartDispatcher(heartbeatServices, haServices, new ExpectedJobIdJobManagerRunnerFactory(TEST_JOB_ID, createdJobManagerRunnerLatch));
+
 		final DispatcherGateway dispatcherGateway = dispatcher.getSelfGateway(DispatcherGateway.class);
 
 		dispatcherLeaderElectionService.isLeader(UUID.randomUUID()).get();
@@ -447,7 +483,9 @@ public class DispatcherTest extends TestLogger {
 	 * to it. See FLINK-8887.
 	 */
 	@Test
-	public void testWaitingForJobMasterLeadership() throws ExecutionException, InterruptedException {
+	public void testWaitingForJobMasterLeadership() throws Exception {
+		dispatcher = createAndStartDispatcher(heartbeatServices, haServices, new ExpectedJobIdJobManagerRunnerFactory(TEST_JOB_ID, createdJobManagerRunnerLatch));
+
 		final DispatcherGateway dispatcherGateway = dispatcher.getSelfGateway(DispatcherGateway.class);
 
 		dispatcherLeaderElectionService.isLeader(UUID.randomUUID()).get();
@@ -476,6 +514,8 @@ public class DispatcherTest extends TestLogger {
 	 */
 	@Test
 	public void testFatalErrorAfterJobIdRecoveryFailure() throws Exception {
+		dispatcher = createAndStartDispatcher(heartbeatServices, haServices, new ExpectedJobIdJobManagerRunnerFactory(TEST_JOB_ID, createdJobManagerRunnerLatch));
+
 		final FlinkException testException = new FlinkException("Test exception");
 		submittedJobGraphStore.setJobIdsFunction(
 			(Collection<JobID> jobIds) -> {
@@ -500,6 +540,8 @@ public class DispatcherTest extends TestLogger {
 	public void testFatalErrorAfterJobRecoveryFailure() throws Exception {
 		final FlinkException testException = new FlinkException("Test exception");
 
+		dispatcher = createAndStartDispatcher(heartbeatServices, haServices, new ExpectedJobIdJobManagerRunnerFactory(TEST_JOB_ID, createdJobManagerRunnerLatch));
+
 		final SubmittedJobGraph submittedJobGraph = new SubmittedJobGraph(jobGraph, null);
 		submittedJobGraphStore.putJobGraph(submittedJobGraph);
 
@@ -526,6 +568,8 @@ public class DispatcherTest extends TestLogger {
 	public void testJobSubmissionErrorAfterJobRecovery() throws Exception {
 		final FlinkException testException = new FlinkException("Test exception");
 
+		dispatcher = createAndStartDispatcher(heartbeatServices, haServices, new ExpectedJobIdJobManagerRunnerFactory(TEST_JOB_ID, createdJobManagerRunnerLatch));
+
 		final JobGraph failingJobGraph = createFailingJobGraph(testException);
 
 		final SubmittedJobGraph submittedJobGraph = new SubmittedJobGraph(failingJobGraph, null);
@@ -540,6 +584,102 @@ public class DispatcherTest extends TestLogger {
 		fatalErrorHandler.clearError();
 	}
 
+	/**
+	 * Tests that a blocking {@link JobManagerRunner} creation, e.g. due to blocking FileSystem access,
+	 * does not block the {@link Dispatcher}.
+	 *
+	 * <p>See FLINK-10314
+	 */
+	@Test
+	public void testBlockingJobManagerRunner() throws Exception {
+		final OneShotLatch jobManagerRunnerCreationLatch = new OneShotLatch();
+		dispatcher = createAndStartDispatcher(
+			heartbeatServices,
+			haServices,
+			new BlockingJobManagerRunnerFactory(jobManagerRunnerCreationLatch::await));
+
+		dispatcherLeaderElectionService.isLeader(UUID.randomUUID()).get();
+
+		final DispatcherGateway dispatcherGateway = dispatcher.getSelfGateway(DispatcherGateway.class);
+
+		final CompletableFuture<Acknowledge> submissionFuture = dispatcherGateway.submitJob(jobGraph, TIMEOUT);
+
+		assertThat(submissionFuture.isDone(), is(false));
+
+		final CompletableFuture<Collection<String>> metricQueryServicePathsFuture = dispatcherGateway.requestMetricQueryServicePaths(Time.seconds(5L));
+
+		assertThat(metricQueryServicePathsFuture.get(), is(empty()));
+
+		assertThat(submissionFuture.isDone(), is(false));
+
+		jobManagerRunnerCreationLatch.trigger();
+
+		submissionFuture.get();
+	}
+
+	/**
+	 * Tests that a failing {@link JobManagerRunner} will be properly cleaned up.
+	 */
+	@Test
+	public void testFailingJobManagerRunnerCleanup() throws Exception {
+		final FlinkException testException = new FlinkException("Test exception.");
+		final ArrayBlockingQueue<Optional<Exception>> queue = new ArrayBlockingQueue<>(2);
+
+		dispatcher = createAndStartDispatcher(
+			heartbeatServices,
+			haServices,
+			new BlockingJobManagerRunnerFactory(() -> {
+				final Optional<Exception> take = queue.take();
+				final Exception exception = take.orElse(null);
+
+				if (exception != null) {
+					throw exception;
+				}
+			}));
+
+		dispatcherLeaderElectionService.isLeader(UUID.randomUUID()).get();
+
+		final DispatcherGateway dispatcherGateway = dispatcher.getSelfGateway(DispatcherGateway.class);
+
+		CompletableFuture<Acknowledge> submissionFuture = dispatcherGateway.submitJob(jobGraph, TIMEOUT);
+
+		assertThat(submissionFuture.isDone(), is(false));
+
+		queue.offer(Optional.of(testException));
+
+		try {
+			submissionFuture.get();
+			fail("Should fail because we could not instantiate the JobManagerRunner.");
+		} catch (Exception e) {
+			assertThat(ExceptionUtils.findThrowable(e, t -> t.equals(testException)).isPresent(), is(true));
+		}
+
+		submissionFuture = dispatcherGateway.submitJob(jobGraph, TIMEOUT);
+
+		queue.offer(Optional.empty());
+
+		submissionFuture.get();
+	}
+
+	private final class BlockingJobManagerRunnerFactory extends TestingJobManagerRunnerFactory {
+
+		@Nonnull
+		private final ThrowingRunnable<Exception> jobManagerRunnerCreationLatch;
+
+		BlockingJobManagerRunnerFactory(@Nonnull ThrowingRunnable<Exception> jobManagerRunnerCreationLatch) {
+			super(new CompletableFuture<>(), new CompletableFuture<>(), CompletableFuture.completedFuture(null));
+
+			this.jobManagerRunnerCreationLatch = jobManagerRunnerCreationLatch;
+		}
+
+		@Override
+		public JobManagerRunner createJobManagerRunner(ResourceID resourceId, JobGraph jobGraph, Configuration configuration, RpcService rpcService, HighAvailabilityServices highAvailabilityServices, HeartbeatServices heartbeatServices, BlobServer blobServer, JobManagerSharedServices jobManagerSharedServices, JobManagerJobMetricGroupFactory jobManagerJobMetricGroupFactory, FatalErrorHandler fatalErrorHandler) throws Exception {
+			jobManagerRunnerCreationLatch.run();
+
+			return super.createJobManagerRunner(resourceId, jobGraph, configuration, rpcService, highAvailabilityServices, heartbeatServices, blobServer, jobManagerSharedServices, jobManagerJobMetricGroupFactory, fatalErrorHandler);
+		}
+	}
+
 	private void electDispatcher() {
 		UUID expectedLeaderSessionId = UUID.randomUUID();
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingJobManagerRunnerFactory.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingJobManagerRunnerFactory.java
index cb48648..992f087 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingJobManagerRunnerFactory.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingJobManagerRunnerFactory.java
@@ -40,7 +40,7 @@ import static org.mockito.Mockito.when;
  * {@link org.apache.flink.runtime.dispatcher.Dispatcher.JobManagerRunnerFactory} implementation for
  * testing purposes.
  */
-final class TestingJobManagerRunnerFactory implements Dispatcher.JobManagerRunnerFactory {
+class TestingJobManagerRunnerFactory implements Dispatcher.JobManagerRunnerFactory {
 
 	private final CompletableFuture<JobGraph> jobGraphFuture;
 	private final CompletableFuture<ArchivedExecutionGraph> resultFuture;
@@ -63,12 +63,13 @@ final class TestingJobManagerRunnerFactory implements Dispatcher.JobManagerRunne
 			BlobServer blobServer,
 			JobManagerSharedServices jobManagerSharedServices,
 			JobManagerJobMetricGroupFactory jobManagerJobMetricGroupFactory,
-			FatalErrorHandler fatalErrorHandler) {
+			FatalErrorHandler fatalErrorHandler) throws Exception {
 		jobGraphFuture.complete(jobGraph);
 
 		final JobManagerRunner mock = mock(JobManagerRunner.class);
 		when(mock.getResultFuture()).thenReturn(resultFuture);
 		when(mock.closeAsync()).thenReturn(terminationFuture);
+		when(mock.getJobGraph()).thenReturn(jobGraph);
 
 		return mock;
 	}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/utils/TestingResourceManagerGateway.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/utils/TestingResourceManagerGateway.java
index c38ea5d..950a4e1 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/utils/TestingResourceManagerGateway.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/utils/TestingResourceManagerGateway.java
@@ -298,7 +298,7 @@ public class TestingResourceManagerGateway implements ResourceManagerGateway {
 
 	@Override
 	public CompletableFuture<ResourceOverview> requestResourceOverview(Time timeout) {
-		return FutureUtils.completedExceptionally(new UnsupportedOperationException("Not yet implemented"));
+		return CompletableFuture.completedFuture(new ResourceOverview(1, 1, 1));
 	}
 
 	@Override


[flink] 01/02: [hotfix] Replace DispatcherResourceCleanupTest#TestingJobManagerRunnerFactory with TestingJobmanagerRunnerFactory

Posted by tr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 89ddffee2ab396b0beaadc694ab1f765852c8f64
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Fri Sep 14 11:42:11 2018 +0200

    [hotfix] Replace DispatcherResourceCleanupTest#TestingJobManagerRunnerFactory with TestingJobmanagerRunnerFactory
---
 .../flink/runtime/dispatcher/DispatcherHATest.java |  2 +-
 .../dispatcher/DispatcherResourceCleanupTest.java  | 33 +---------------------
 .../runtime/dispatcher/MiniDispatcherTest.java     |  2 +-
 .../dispatcher/TestingJobManagerRunnerFactory.java |  8 ++++--
 .../dispatcher/ZooKeeperHADispatcherTest.java      | 10 +++----
 5 files changed, 13 insertions(+), 42 deletions(-)

diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherHATest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherHATest.java
index 335199a..c825451 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherHATest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherHATest.java
@@ -163,7 +163,7 @@ public class DispatcherHATest extends TestLogger {
 			UnregisteredMetricGroups.createUnregisteredJobManagerMetricGroup(),
 			null,
 			new MemoryArchivedExecutionGraphStore(),
-			new TestingJobManagerRunnerFactory(new CompletableFuture<>(), new CompletableFuture<>()),
+			new TestingJobManagerRunnerFactory(new CompletableFuture<>(), new CompletableFuture<>(), CompletableFuture.completedFuture(null)),
 			testingFatalErrorHandler,
 			fencingTokens);
 	}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java
index d09ab8d..5c4ac34 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java
@@ -29,26 +29,19 @@ import org.apache.flink.runtime.blob.BlobStore;
 import org.apache.flink.runtime.blob.PermanentBlobKey;
 import org.apache.flink.runtime.blob.TestingBlobStore;
 import org.apache.flink.runtime.blob.TestingBlobStoreBuilder;
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
-import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.highavailability.RunningJobsRegistry;
 import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertex;
-import org.apache.flink.runtime.jobmaster.JobManagerRunner;
-import org.apache.flink.runtime.jobmaster.JobManagerSharedServices;
 import org.apache.flink.runtime.jobmaster.JobNotFinishedException;
-import org.apache.flink.runtime.jobmaster.factories.JobManagerJobMetricGroupFactory;
 import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
 import org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway;
 import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder;
-import org.apache.flink.runtime.rpc.FatalErrorHandler;
-import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.runtime.rpc.TestingRpcService;
 import org.apache.flink.runtime.testtasks.NoOpInvokable;
 import org.apache.flink.runtime.util.TestingFatalErrorHandler;
@@ -79,8 +72,6 @@ import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.is;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.fail;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
 
 /**
  * Tests the resource cleanup by the {@link Dispatcher}.
@@ -188,7 +179,7 @@ public class DispatcherResourceCleanupTest extends TestLogger {
 			UnregisteredMetricGroups.createUnregisteredJobManagerMetricGroup(),
 			null,
 			new MemoryArchivedExecutionGraphStore(),
-			new TestingJobManagerRunnerFactory(resultFuture, terminationFuture),
+			new TestingJobManagerRunnerFactory(new CompletableFuture<>(), resultFuture, terminationFuture),
 			fatalErrorHandler);
 
 		dispatcher.start();
@@ -447,28 +438,6 @@ public class DispatcherResourceCleanupTest extends TestLogger {
 		assertThat(deleteAllHABlobsFuture.get(), equalTo(jobId));
 	}
 
-	private static final class TestingJobManagerRunnerFactory implements Dispatcher.JobManagerRunnerFactory {
-
-		private final CompletableFuture<ArchivedExecutionGraph> resultFuture;
-
-		private final CompletableFuture<Void> terminationFuture;
-
-		private TestingJobManagerRunnerFactory(CompletableFuture<ArchivedExecutionGraph> resultFuture, CompletableFuture<Void> terminationFuture) {
-			this.resultFuture = resultFuture;
-			this.terminationFuture = terminationFuture;
-		}
-
-		@Override
-		public JobManagerRunner createJobManagerRunner(ResourceID resourceId, JobGraph jobGraph, Configuration configuration, RpcService rpcService, HighAvailabilityServices highAvailabilityServices, HeartbeatServices heartbeatServices, BlobServer blobServer, JobManagerSharedServices jobManagerServices, JobManagerJobMetricGroupFactory jobManagerJobMetricGroupFactory, FatalErrorHandler fatalErrorHandler) {
-			final JobManagerRunner jobManagerRunnerMock = mock(JobManagerRunner.class);
-
-			when(jobManagerRunnerMock.getResultFuture()).thenReturn(resultFuture);
-			when(jobManagerRunnerMock.closeAsync()).thenReturn(terminationFuture);
-
-			return jobManagerRunnerMock;
-		}
-	}
-
 	private static final class TestingBlobServer extends BlobServer {
 
 		private final CompletableFuture<JobID> cleanupJobFuture;
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java
index 39c06f3..eed23ed 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java
@@ -125,7 +125,7 @@ public class MiniDispatcherTest extends TestLogger {
 		jobGraphFuture = new CompletableFuture<>();
 		resultFuture = new CompletableFuture<>();
 
-		testingJobManagerRunnerFactory = new TestingJobManagerRunnerFactory(jobGraphFuture, resultFuture);
+		testingJobManagerRunnerFactory = new TestingJobManagerRunnerFactory(jobGraphFuture, resultFuture, CompletableFuture.completedFuture(null));
 	}
 
 	@After
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingJobManagerRunnerFactory.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingJobManagerRunnerFactory.java
index f9be888..cb48648 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingJobManagerRunnerFactory.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingJobManagerRunnerFactory.java
@@ -44,10 +44,12 @@ final class TestingJobManagerRunnerFactory implements Dispatcher.JobManagerRunne
 
 	private final CompletableFuture<JobGraph> jobGraphFuture;
 	private final CompletableFuture<ArchivedExecutionGraph> resultFuture;
+	private final CompletableFuture<Void> terminationFuture;
 
-	TestingJobManagerRunnerFactory(CompletableFuture<JobGraph> jobGraphFuture, CompletableFuture<ArchivedExecutionGraph> resultFuture) {
+	TestingJobManagerRunnerFactory(CompletableFuture<JobGraph> jobGraphFuture, CompletableFuture<ArchivedExecutionGraph> resultFuture, CompletableFuture<Void> terminationFuture) {
 		this.jobGraphFuture = jobGraphFuture;
 		this.resultFuture = resultFuture;
+		this.terminationFuture = terminationFuture;
 	}
 
 	@Override
@@ -61,12 +63,12 @@ final class TestingJobManagerRunnerFactory implements Dispatcher.JobManagerRunne
 			BlobServer blobServer,
 			JobManagerSharedServices jobManagerSharedServices,
 			JobManagerJobMetricGroupFactory jobManagerJobMetricGroupFactory,
-			FatalErrorHandler fatalErrorHandler) throws Exception {
+			FatalErrorHandler fatalErrorHandler) {
 		jobGraphFuture.complete(jobGraph);
 
 		final JobManagerRunner mock = mock(JobManagerRunner.class);
 		when(mock.getResultFuture()).thenReturn(resultFuture);
-		when(mock.closeAsync()).thenReturn(CompletableFuture.completedFuture(null));
+		when(mock.closeAsync()).thenReturn(terminationFuture);
 
 		return mock;
 	}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/ZooKeeperHADispatcherTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/ZooKeeperHADispatcherTest.java
index b5662c0..9c23f9d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/ZooKeeperHADispatcherTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/ZooKeeperHADispatcherTest.java
@@ -154,7 +154,7 @@ public class ZooKeeperHADispatcherTest extends TestLogger {
 
 			final TestingDispatcher dispatcher = createDispatcher(
 				testingHighAvailabilityServices,
-				new TestingJobManagerRunnerFactory(new CompletableFuture<>(), new CompletableFuture<>()));
+				new TestingJobManagerRunnerFactory(new CompletableFuture<>(), new CompletableFuture<>(), CompletableFuture.completedFuture(null)));
 
 			dispatcher.start();
 
@@ -223,11 +223,11 @@ public class ZooKeeperHADispatcherTest extends TestLogger {
 			final CompletableFuture<ArchivedExecutionGraph> resultFuture = new CompletableFuture<>();
 			final TestingDispatcher dispatcher1 = createDispatcher(
 				haServices1,
-				new TestingJobManagerRunnerFactory(jobGraphFuture, resultFuture));
+				new TestingJobManagerRunnerFactory(jobGraphFuture, resultFuture, CompletableFuture.completedFuture(null)));
 
 			final TestingDispatcher dispatcher2 = createDispatcher(
 				haServices2,
-				new TestingJobManagerRunnerFactory(new CompletableFuture<>(), new CompletableFuture<>()));
+				new TestingJobManagerRunnerFactory(new CompletableFuture<>(), new CompletableFuture<>(), CompletableFuture.completedFuture(null)));
 
 			try {
 				dispatcher1.start();
@@ -285,11 +285,11 @@ public class ZooKeeperHADispatcherTest extends TestLogger {
 				final CompletableFuture<JobGraph> jobGraphFuture1 = new CompletableFuture<>();
 				dispatcher1 = createDispatcher(
 					haServices,
-					new TestingJobManagerRunnerFactory(jobGraphFuture1, new CompletableFuture<>()));
+					new TestingJobManagerRunnerFactory(jobGraphFuture1, new CompletableFuture<>(), CompletableFuture.completedFuture(null)));
 				final CompletableFuture<JobGraph> jobGraphFuture2 = new CompletableFuture<>();
 				dispatcher2 = createDispatcher(
 					haServices,
-					new TestingJobManagerRunnerFactory(jobGraphFuture2, new CompletableFuture<>()));
+					new TestingJobManagerRunnerFactory(jobGraphFuture2, new CompletableFuture<>(), CompletableFuture.completedFuture(null)));
 
 				dispatcher1.start();
 				dispatcher2.start();