You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2019/01/30 11:11:52 UTC

[flink] 02/02: [FLINK-11400] Linearize leadership operations in JobManagerRunner

This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit d4ba83e3baf31e041514e7340d4564ca0bee882a
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Mon Jan 21 16:54:54 2019 +0100

    [FLINK-11400] Linearize leadership operations in JobManagerRunner
    
    Introduce a leadershipOperation future in the JobManagerRunner. This future is completed whenever
    a leadership operation (granting or revoking leadership) has been fully completed. All subsequent
    leadership operations wait for their predecessors to complete before they are processed. This
    guarantees that the JobMaster is properly shut down and there cannot be a race condition between
    revoking and granting leadership.
    
    This closes #7565.
---
 .../flink/runtime/jobmaster/JobManagerRunner.java  | 128 +++++++++++++++------
 .../apache/flink/runtime/jobmaster/JobMaster.java  |  18 +--
 .../flink/runtime/jobmaster/slotpool/SlotPool.java |   4 +-
 .../jobmaster/slotpool/SlotPoolGateway.java        |   2 +-
 .../runtime/jobmaster/JobManagerRunnerTest.java    | 114 ++++++++++++++++--
 .../runtime/jobmaster/TestingJobMasterService.java |  26 ++++-
 ...ry.java => TestingJobMasterServiceFactory.java} |  17 ++-
 .../src/test/resources/log4j-test.properties       |   2 +-
 8 files changed, 247 insertions(+), 64 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
index ed79455..846018b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.jobmaster;
 
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
 import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
@@ -35,13 +36,17 @@ import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.util.AutoCloseableAsync;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.function.FunctionUtils;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nonnull;
+
 import java.io.IOException;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
 import java.util.concurrent.Executor;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
@@ -81,6 +86,8 @@ public class JobManagerRunner implements LeaderContender, OnCompletionActions, A
 
 	private final CompletableFuture<Void> terminationFuture;
 
+	private CompletableFuture<Void> leadershipOperation;
+
 	/** flag marking the runner as shut down. */
 	private volatile boolean shutdown;
 
@@ -105,6 +112,7 @@ public class JobManagerRunner implements LeaderContender, OnCompletionActions, A
 
 		this.resultFuture = new CompletableFuture<>();
 		this.terminationFuture = new CompletableFuture<>();
+		this.leadershipOperation = CompletableFuture.completedFuture(null);
 
 		// make sure we cleanly shut down out JobManager services if initialization fails
 		try {
@@ -277,38 +285,72 @@ public class JobManagerRunner implements LeaderContender, OnCompletionActions, A
 				return;
 			}
 
-			try {
-				verifyJobSchedulingStatusAndStartJobManager(leaderSessionID);
-			} catch (Exception e) {
-				handleJobManagerRunnerError(e);
-			}
+			leadershipOperation = leadershipOperation.thenCompose(
+				(ignored) -> {
+					synchronized (lock) {
+						return verifyJobSchedulingStatusAndStartJobManager(leaderSessionID);
+					}
+				});
+
+			handleException(leadershipOperation, "Could not start the job manager.");
 		}
 	}
 
-	private void verifyJobSchedulingStatusAndStartJobManager(UUID leaderSessionId) throws Exception {
-		final JobSchedulingStatus jobSchedulingStatus = runningJobsRegistry.getJobSchedulingStatus(jobGraph.getJobID());
+	private CompletableFuture<Void> verifyJobSchedulingStatusAndStartJobManager(UUID leaderSessionId) {
+		final CompletableFuture<JobSchedulingStatus> jobSchedulingStatusFuture = getJobSchedulingStatus();
+
+		return jobSchedulingStatusFuture.thenCompose(
+			jobSchedulingStatus -> {
+				if (jobSchedulingStatus == JobSchedulingStatus.DONE) {
+					return jobAlreadyDone();
+				} else {
+					return startJobMaster(leaderSessionId);
+				}
+			});
+	}
 
-		if (jobSchedulingStatus == JobSchedulingStatus.DONE) {
-			log.info("Granted leader ship but job {} has been finished. ", jobGraph.getJobID());
-			jobFinishedByOther();
-		} else {
-			log.info("JobManager runner for job {} ({}) was granted leadership with session id {} at {}.",
-				jobGraph.getName(), jobGraph.getJobID(), leaderSessionId, getAddress());
+	private CompletionStage<Void> startJobMaster(UUID leaderSessionId) {
+		log.info("JobManager runner for job {} ({}) was granted leadership with session id {} at {}.",
+			jobGraph.getName(), jobGraph.getJobID(), leaderSessionId, getAddress());
 
+		try {
 			runningJobsRegistry.setJobRunning(jobGraph.getJobID());
+		} catch (IOException e) {
+			return FutureUtils.completedExceptionally(
+				new FlinkException(
+					String.format("Failed to set the job %s to running in the running jobs registry.", jobGraph.getJobID()),
+					e));
+		}
 
-			final CompletableFuture<Acknowledge> startFuture = jobMasterService.start(new JobMasterId(leaderSessionId));
-			final CompletableFuture<JobMasterGateway> currentLeaderGatewayFuture = leaderGatewayFuture;
+		final CompletableFuture<Acknowledge> startFuture;
+		try {
+			startFuture = jobMasterService.start(new JobMasterId(leaderSessionId));
+		} catch (Exception e) {
+			return FutureUtils.completedExceptionally(new FlinkException("Failed to start the JobMaster.", e));
+		}
 
-			startFuture.whenCompleteAsync(
-				(Acknowledge ack, Throwable throwable) -> {
-					if (throwable != null) {
-						handleJobManagerRunnerError(new FlinkException("Could not start the job manager.", throwable));
-					} else {
-						confirmLeaderSessionIdIfStillLeader(leaderSessionId, currentLeaderGatewayFuture);
-					}
-				},
-				executor);
+		final CompletableFuture<JobMasterGateway> currentLeaderGatewayFuture = leaderGatewayFuture;
+		return startFuture.thenAcceptAsync(
+			(Acknowledge ack) -> confirmLeaderSessionIdIfStillLeader(leaderSessionId, currentLeaderGatewayFuture),
+			executor);
+	}
+
+	@Nonnull
+	private CompletionStage<Void> jobAlreadyDone() {
+		log.info("Granted leader ship but job {} has been finished. ", jobGraph.getJobID());
+		jobFinishedByOther();
+		return CompletableFuture.completedFuture(null);
+	}
+
+	private CompletableFuture<JobSchedulingStatus> getJobSchedulingStatus() {
+		try {
+			JobSchedulingStatus jobSchedulingStatus = runningJobsRegistry.getJobSchedulingStatus(jobGraph.getJobID());
+			return CompletableFuture.completedFuture(jobSchedulingStatus);
+		} catch (IOException e) {
+			return FutureUtils.completedExceptionally(
+				new FlinkException(
+					String.format("Could not retrieve the job scheduling status for job %s.", jobGraph.getJobID()),
+					e));
 		}
 	}
 
@@ -329,21 +371,35 @@ public class JobManagerRunner implements LeaderContender, OnCompletionActions, A
 				return;
 			}
 
-			log.info("JobManager for job {} ({}) was revoked leadership at {}.",
-				jobGraph.getName(), jobGraph.getJobID(), getAddress());
+			leadershipOperation = leadershipOperation.thenCompose(
+				(ignored) -> {
+					synchronized (lock) {
+						return revokeJobMasterLeadership();
+					}
+				});
+
+			handleException(leadershipOperation, "Could not suspend the job manager.");
+		}
+	}
 
-			setNewLeaderGatewayFuture();
+	private CompletableFuture<Void> revokeJobMasterLeadership() {
+		log.info("JobManager for job {} ({}) was revoked leadership at {}.",
+			jobGraph.getName(), jobGraph.getJobID(), getAddress());
 
-			CompletableFuture<Acknowledge>  suspendFuture = jobMasterService.suspend(new FlinkException("JobManager is no longer the leader."));
+		setNewLeaderGatewayFuture();
 
-			suspendFuture.whenCompleteAsync(
-				(Acknowledge ack, Throwable throwable) -> {
-					if (throwable != null) {
-						handleJobManagerRunnerError(new FlinkException("Could not suspend the job manager.", throwable));
-					}
-				},
-				executor);
-		}
+		return jobMasterService
+			.suspend(new FlinkException("JobManager is no longer the leader."))
+			.thenApply(FunctionUtils.nullFn());
+	}
+
+	private void handleException(CompletableFuture<Void> leadershipOperation, String message) {
+		leadershipOperation.whenComplete(
+			(ignored, throwable) -> {
+				if (throwable != null) {
+					handleJobManagerRunnerError(new FlinkException(message, throwable));
+				}
+			});
 	}
 
 	private void setNewLeaderGatewayFuture() {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index 62446b0..447b59a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -131,6 +131,7 @@ import java.util.concurrent.CompletionException;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeoutException;
+import java.util.function.Function;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.apache.flink.util.Preconditions.checkState;
@@ -334,11 +335,12 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast
 	 * @return Future acknowledge indicating that the job has been suspended. Otherwise the future contains an exception
 	 */
 	public CompletableFuture<Acknowledge> suspend(final Exception cause) {
-		CompletableFuture<Acknowledge> suspendFuture = callAsyncWithoutFencing(() -> suspendExecution(cause), RpcUtils.INF_TIMEOUT);
+		CompletableFuture<Acknowledge> suspendFuture = callAsyncWithoutFencing(
+				() -> suspendExecution(cause),
+				RpcUtils.INF_TIMEOUT)
+			.thenCompose(Function.identity());
 
-		stop();
-
-		return suspendFuture;
+		return suspendFuture.whenComplete((acknowledge, throwable) -> stop());
 	}
 
 	/**
@@ -1061,12 +1063,12 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast
 	 *
 	 * @param cause The reason of why this job been suspended.
 	 */
-	private Acknowledge suspendExecution(final Exception cause) {
+	private CompletableFuture<Acknowledge> suspendExecution(final Exception cause) {
 		validateRunsInMainThread();
 
 		if (getFencingToken() == null) {
 			log.debug("Job has already been suspended or shutdown.");
-			return Acknowledge.get();
+			return CompletableFuture.completedFuture(null);
 		}
 
 		// not leader anymore --> set the JobMasterId to null
@@ -1081,12 +1083,12 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast
 		suspendAndClearExecutionGraphFields(cause);
 
 		// the slot pool stops receiving messages and clears its pooled slots
-		slotPoolGateway.suspend();
+		CompletableFuture<Acknowledge> slotPoolSuspendFuture = slotPoolGateway.suspend();
 
 		// disconnect from resource manager:
 		closeResourceManagerConnection(cause);
 
-		return Acknowledge.get();
+		return slotPoolSuspendFuture;
 	}
 
 	private void assignExecutionGraph(
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
index 34d5cdc..b3fb36c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
@@ -242,7 +242,7 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway, AllocatedS
 	 * Suspends this pool, meaning it has lost its authority to accept and distribute slots.
 	 */
 	@Override
-	public void suspend() {
+	public CompletableFuture<Acknowledge> suspend() {
 		log.info("Suspending SlotPool.");
 
 		validateRunsInMainThread();
@@ -265,6 +265,8 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway, AllocatedS
 		// Clear (but not release!) the available slots. The TaskManagers should re-register them
 		// at the new leader JobManager/SlotPool
 		clear();
+
+		return CompletableFuture.completedFuture(Acknowledge.get());
 	}
 
 	// ------------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolGateway.java
index 3e546ff..4aa8614 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolGateway.java
@@ -48,7 +48,7 @@ public interface SlotPoolGateway extends AllocatedSlotActions, RpcGateway {
 	//  shutdown
 	// ------------------------------------------------------------------------
 
-	void suspend();
+	CompletableFuture<Acknowledge> suspend();
 
 	// ------------------------------------------------------------------------
 	//  resource manager connection
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerTest.java
index 584307b..b560600 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerTest.java
@@ -31,9 +31,10 @@ import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobmaster.factories.JobMasterServiceFactory;
-import org.apache.flink.runtime.jobmaster.factories.TestingJobMasterFactory;
+import org.apache.flink.runtime.jobmaster.factories.TestingJobMasterServiceFactory;
 import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
 import org.apache.flink.runtime.leaderretrieval.SettableLeaderRetrievalService;
+import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.runtime.testtasks.NoOpInvokable;
@@ -51,8 +52,11 @@ import org.junit.rules.TemporaryFolder;
 
 import javax.annotation.Nonnull;
 
+import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
 import static org.hamcrest.Matchers.instanceOf;
 import static org.hamcrest.Matchers.is;
@@ -73,10 +77,12 @@ public class JobManagerRunnerTest extends TestLogger {
 
 	private static LibraryCacheManager libraryCacheManager;
 
-	private static JobMasterServiceFactory jobMasterFactory;
+	private static JobMasterServiceFactory defaultJobMasterServiceFactory;
 
 	private TestingHighAvailabilityServices haServices;
 
+	private TestingLeaderElectionService leaderElectionService;
+
 	private TestingFatalErrorHandler fatalErrorHandler;
 
 	@BeforeClass
@@ -86,7 +92,7 @@ public class JobManagerRunnerTest extends TestLogger {
 			FlinkUserCodeClassLoaders.ResolveOrder.CHILD_FIRST,
 			new String[]{});
 
-		jobMasterFactory = TestingJobMasterFactory.INSTANCE;
+		defaultJobMasterServiceFactory = new TestingJobMasterServiceFactory();
 
 		final JobVertex jobVertex = new JobVertex("Test vertex");
 		jobVertex.setInvokableClass(NoOpInvokable.class);
@@ -100,8 +106,9 @@ public class JobManagerRunnerTest extends TestLogger {
 
 	@Before
 	public void setup() {
+		leaderElectionService = new TestingLeaderElectionService();
 		haServices = new TestingHighAvailabilityServices();
-		haServices.setJobMasterLeaderElectionService(jobGraph.getJobID(), new TestingLeaderElectionService());
+		haServices.setJobMasterLeaderElectionService(jobGraph.getJobID(), leaderElectionService);
 		haServices.setResourceManagerLeaderRetriever(new SettableLeaderRetrievalService());
 		haServices.setCheckpointRecoveryFactory(new StandaloneCheckpointRecoveryFactory());
 
@@ -209,19 +216,106 @@ public class JobManagerRunnerTest extends TestLogger {
 		}
 	}
 
+	/**
+	 * Tests that the {@link JobManagerRunner} always waits for the previous leadership operation
+	 * (granting or revoking leadership) to finish before starting a new leadership operation.
+	 */
+	@Test
+	public void testConcurrentLeadershipOperationsBlockingSuspend() throws Exception {
+		final CompletableFuture<Acknowledge> suspendedFuture = new CompletableFuture<>();
+
+		TestingJobMasterServiceFactory jobMasterServiceFactory = new TestingJobMasterServiceFactory(
+			() -> new TestingJobMasterService(
+				"localhost",
+				e -> suspendedFuture));
+		JobManagerRunner jobManagerRunner = createJobManagerRunner(jobMasterServiceFactory);
+
+		jobManagerRunner.start();
+
+		leaderElectionService.isLeader(UUID.randomUUID()).get();
+
+		leaderElectionService.notLeader();
+
+		final CompletableFuture<UUID> leaderFuture = leaderElectionService.isLeader(UUID.randomUUID());
+
+		// the new leadership should wait first for the suspension to happen
+		assertThat(leaderFuture.isDone(), is(false));
+
+		try {
+			leaderFuture.get(1L, TimeUnit.MILLISECONDS);
+			fail("Granted leadership even though the JobMaster has not been suspended.");
+		} catch (TimeoutException expected) {
+			// expected
+		}
+
+		suspendedFuture.complete(Acknowledge.get());
+
+		leaderFuture.get();
+	}
+
+	/**
+	 * Tests that the {@link JobManagerRunner} always waits for the previous leadership operation
+	 * (granting or revoking leadership) to finish before starting a new leadership operation.
+	 */
+	@Test
+	public void testConcurrentLeadershipOperationsBlockingGainLeadership() throws Exception {
+		final CompletableFuture<Exception> suspendFuture = new CompletableFuture<>();
+		final CompletableFuture<Acknowledge> startFuture = new CompletableFuture<>();
+
+		TestingJobMasterServiceFactory jobMasterServiceFactory = new TestingJobMasterServiceFactory(
+			() -> new TestingJobMasterService(
+				"localhost",
+				e -> {
+					suspendFuture.complete(e);
+					return CompletableFuture.completedFuture(Acknowledge.get());
+				},
+				ignored -> startFuture));
+		JobManagerRunner jobManagerRunner = createJobManagerRunner(jobMasterServiceFactory);
+
+		jobManagerRunner.start();
+
+		leaderElectionService.isLeader(UUID.randomUUID());
+
+		leaderElectionService.notLeader();
+
+		// suspending should wait for the start to happen first
+		assertThat(suspendFuture.isDone(), is(false));
+
+		try {
+			suspendFuture.get(1L, TimeUnit.MILLISECONDS);
+			fail("Suspended leadership even though the JobMaster has not been started.");
+		} catch (TimeoutException expected) {
+			// expected
+		}
+
+		startFuture.complete(Acknowledge.get());
+
+		suspendFuture.get();
+	}
+
 	@Nonnull
 	private JobManagerRunner createJobManagerRunner(LibraryCacheManager libraryCacheManager) throws Exception {
+		return createJobManagerRunner(defaultJobMasterServiceFactory, libraryCacheManager);
+	}
+
+	@Nonnull
+	private JobManagerRunner createJobManagerRunner() throws Exception {
+		return createJobManagerRunner(defaultJobMasterServiceFactory, libraryCacheManager);
+	}
+
+	@Nonnull
+	private JobManagerRunner createJobManagerRunner(JobMasterServiceFactory jobMasterServiceFactory) throws Exception {
+		return createJobManagerRunner(jobMasterServiceFactory, libraryCacheManager);
+	}
+
+	@Nonnull
+	private JobManagerRunner createJobManagerRunner(JobMasterServiceFactory jobMasterServiceFactory, LibraryCacheManager libraryCacheManager) throws Exception{
 		return new JobManagerRunner(
 			jobGraph,
-			jobMasterFactory,
+			jobMasterServiceFactory,
 			haServices,
 			libraryCacheManager,
 			TestingUtils.defaultExecutor(),
 			fatalErrorHandler);
 	}
-
-	@Nonnull
-	private JobManagerRunner createJobManagerRunner() throws Exception {
-		return createJobManagerRunner(libraryCacheManager);
-	}
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingJobMasterService.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingJobMasterService.java
index 7e65da1..02ff8eb 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingJobMasterService.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingJobMasterService.java
@@ -25,6 +25,7 @@ import org.apache.flink.util.Preconditions;
 import javax.annotation.Nonnull;
 
 import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
 
 /**
  * Implementation of the {@link JobMasterService} for testing purposes.
@@ -34,26 +35,43 @@ public class TestingJobMasterService implements JobMasterService {
 	@Nonnull
 	private final String address;
 
+	@Nonnull
+	private final Function<Exception, CompletableFuture<Acknowledge>> suspendFunction;
+
+	@Nonnull
+	private final Function<JobMasterId, CompletableFuture<Acknowledge>> startFunction;
+
 	private JobMasterGateway jobMasterGateway;
 
-	public TestingJobMasterService(@Nonnull String address) {
+	public TestingJobMasterService(@Nonnull String address, @Nonnull Function<Exception, CompletableFuture<Acknowledge>> suspendFunction) {
+		this(address, suspendFunction, ignored -> CompletableFuture.completedFuture(Acknowledge.get()));
+	}
+
+	public TestingJobMasterService(
+		@Nonnull String address,
+		@Nonnull Function<Exception, CompletableFuture<Acknowledge>> suspendFunction,
+		@Nonnull Function<JobMasterId, CompletableFuture<Acknowledge>> startFunction) {
 		this.address = address;
+		this.suspendFunction = suspendFunction;
+		this.startFunction = startFunction;
 	}
 
 	public TestingJobMasterService() {
-		this("localhost");
+		this(
+			"localhost",
+			e -> CompletableFuture.completedFuture(Acknowledge.get()));
 	}
 
 	@Override
 	public CompletableFuture<Acknowledge> start(JobMasterId jobMasterId) {
 			jobMasterGateway = new TestingJobMasterGatewayBuilder().build();
-			return CompletableFuture.completedFuture(Acknowledge.get());
+			return startFunction.apply(jobMasterId);
 	}
 
 	@Override
 	public CompletableFuture<Acknowledge> suspend(Exception cause) {
 		jobMasterGateway = null;
-		return CompletableFuture.completedFuture(Acknowledge.get());
+		return suspendFunction.apply(cause);
 	}
 
 	@Override
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/factories/TestingJobMasterFactory.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/factories/TestingJobMasterServiceFactory.java
similarity index 75%
rename from flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/factories/TestingJobMasterFactory.java
rename to flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/factories/TestingJobMasterServiceFactory.java
index ba7f1c8..e2de73b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/factories/TestingJobMasterFactory.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/factories/TestingJobMasterServiceFactory.java
@@ -24,14 +24,25 @@ import org.apache.flink.runtime.jobmaster.JobMaster;
 import org.apache.flink.runtime.jobmaster.JobMasterService;
 import org.apache.flink.runtime.jobmaster.TestingJobMasterService;
 
+import java.util.function.Supplier;
+
 /**
  * Testing implementation of the {@link JobMasterServiceFactory} which returns a {@link JobMaster} mock.
  */
-public enum TestingJobMasterFactory implements JobMasterServiceFactory {
-	INSTANCE;
+public class TestingJobMasterServiceFactory implements JobMasterServiceFactory {
+
+	private final Supplier<JobMasterService> jobMasterServiceSupplier;
+
+	public TestingJobMasterServiceFactory(Supplier<JobMasterService> jobMasterServiceSupplier) {
+		this.jobMasterServiceSupplier = jobMasterServiceSupplier;
+	}
+
+	public TestingJobMasterServiceFactory() {
+		this(TestingJobMasterService::new);
+	}
 
 	@Override
 	public JobMasterService createJobMasterService(JobGraph jobGraph, OnCompletionActions jobCompletionActions, ClassLoader userCodeClassloader) {
-		return new TestingJobMasterService();
+		return jobMasterServiceSupplier.get();
 	}
 }
diff --git a/flink-runtime/src/test/resources/log4j-test.properties b/flink-runtime/src/test/resources/log4j-test.properties
index 2778b7f..bc7fb42 100644
--- a/flink-runtime/src/test/resources/log4j-test.properties
+++ b/flink-runtime/src/test/resources/log4j-test.properties
@@ -18,7 +18,7 @@
 
 # Set root logger level to OFF to not flood build logs
 # set manually to INFO for debugging purposes
-log4j.rootLogger=OFF, testlogger
+log4j.rootLogger=DEBUG, testlogger
 
 # testlogger is set to be a ConsoleAppender.
 log4j.appender.testlogger=org.apache.log4j.ConsoleAppender