You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2019/01/30 11:11:50 UTC

[flink] branch master updated (e0ee3e5 -> d4ba83e)

This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from e0ee3e5  [FLINK-10910][e2e] Hardened Kubernetes e2e test.
     new cae914f  [FLINK-11415] Introduce JobMasterServiceFactory
     new d4ba83e  [FLINK-11400] Linearize leadership operations in JobManagerRunner

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../dispatcher/DefaultJobManagerRunnerFactory.java |  77 +++++++++
 .../flink/runtime/dispatcher/Dispatcher.java       |  53 -------
 .../runtime/dispatcher/JobDispatcherFactory.java   |   2 +-
 .../JobManagerRunnerFactory.java}                  |  25 ++-
 .../dispatcher/SessionDispatcherFactory.java       |   2 +-
 .../flink/runtime/jobmaster/JobManagerRunner.java  | 175 ++++++++++++---------
 .../apache/flink/runtime/jobmaster/JobMaster.java  |  18 ++-
 .../factories/DefaultJobMasterServiceFactory.java  |  89 +++++++++++
 ...upFactory.java => JobMasterServiceFactory.java} |  20 +--
 .../flink/runtime/jobmaster/slotpool/SlotPool.java |   4 +-
 .../jobmaster/slotpool/SlotPoolGateway.java        |   2 +-
 .../flink/runtime/minicluster/MiniCluster.java     |   3 +-
 ...rvice.java => FailingPermanentBlobService.java} |  12 +-
 .../flink/runtime/dispatcher/DispatcherHATest.java |   7 +-
 .../flink/runtime/dispatcher/DispatcherTest.java   |  17 +-
 .../dispatcher/TestingJobManagerRunnerFactory.java |   6 +-
 .../dispatcher/ZooKeeperHADispatcherTest.java      |   2 +-
 .../runtime/jobmaster/JobManagerRunnerTest.java    | 159 ++++++++++++++-----
 .../runtime/jobmaster/TestingJobMasterService.java |  93 +++++++++++
 .../factories/TestingJobMasterServiceFactory.java  |  48 ++++++
 .../src/test/resources/log4j-test.properties       |   2 +-
 21 files changed, 585 insertions(+), 231 deletions(-)
 create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DefaultJobManagerRunnerFactory.java
 copy flink-runtime/src/main/java/org/apache/flink/runtime/{entrypoint/component/DispatcherResourceManagerComponentFactory.java => dispatcher/JobManagerRunnerFactory.java} (64%)
 create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/factories/DefaultJobMasterServiceFactory.java
 copy flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/factories/{JobManagerJobMetricGroupFactory.java => JobMasterServiceFactory.java} (66%)
 copy flink-runtime/src/test/java/org/apache/flink/runtime/blob/{VoidPermanentBlobService.java => FailingPermanentBlobService.java} (73%)
 create mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingJobMasterService.java
 create mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/factories/TestingJobMasterServiceFactory.java


[flink] 02/02: [FLINK-11400] Linearize leadership operations in JobManagerRunner

Posted by tr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit d4ba83e3baf31e041514e7340d4564ca0bee882a
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Mon Jan 21 16:54:54 2019 +0100

    [FLINK-11400] Linearize leadership operations in JobManagerRunner
    
    Introduce a leadershipOperation future in the JobManagerRunner. This future is completed whenever
    a leadership operation (granting or revoking leadership) has been fully completed. All subsequent
    leadership operations wait for their predecessors to complete before they are processed. This
    guarantees that the JobMaster is properly shut down and there cannot be a race condition between
    revoking and granting leadership.
    
    This closes #7565.
---
 .../flink/runtime/jobmaster/JobManagerRunner.java  | 128 +++++++++++++++------
 .../apache/flink/runtime/jobmaster/JobMaster.java  |  18 +--
 .../flink/runtime/jobmaster/slotpool/SlotPool.java |   4 +-
 .../jobmaster/slotpool/SlotPoolGateway.java        |   2 +-
 .../runtime/jobmaster/JobManagerRunnerTest.java    | 114 ++++++++++++++++--
 .../runtime/jobmaster/TestingJobMasterService.java |  26 ++++-
 ...ry.java => TestingJobMasterServiceFactory.java} |  17 ++-
 .../src/test/resources/log4j-test.properties       |   2 +-
 8 files changed, 247 insertions(+), 64 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
index ed79455..846018b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.jobmaster;
 
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
 import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
@@ -35,13 +36,17 @@ import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.util.AutoCloseableAsync;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.function.FunctionUtils;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nonnull;
+
 import java.io.IOException;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
 import java.util.concurrent.Executor;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
@@ -81,6 +86,8 @@ public class JobManagerRunner implements LeaderContender, OnCompletionActions, A
 
 	private final CompletableFuture<Void> terminationFuture;
 
+	private CompletableFuture<Void> leadershipOperation;
+
 	/** flag marking the runner as shut down. */
 	private volatile boolean shutdown;
 
@@ -105,6 +112,7 @@ public class JobManagerRunner implements LeaderContender, OnCompletionActions, A
 
 		this.resultFuture = new CompletableFuture<>();
 		this.terminationFuture = new CompletableFuture<>();
+		this.leadershipOperation = CompletableFuture.completedFuture(null);
 
 		// make sure we cleanly shut down out JobManager services if initialization fails
 		try {
@@ -277,38 +285,72 @@ public class JobManagerRunner implements LeaderContender, OnCompletionActions, A
 				return;
 			}
 
-			try {
-				verifyJobSchedulingStatusAndStartJobManager(leaderSessionID);
-			} catch (Exception e) {
-				handleJobManagerRunnerError(e);
-			}
+			leadershipOperation = leadershipOperation.thenCompose(
+				(ignored) -> {
+					synchronized (lock) {
+						return verifyJobSchedulingStatusAndStartJobManager(leaderSessionID);
+					}
+				});
+
+			handleException(leadershipOperation, "Could not start the job manager.");
 		}
 	}
 
-	private void verifyJobSchedulingStatusAndStartJobManager(UUID leaderSessionId) throws Exception {
-		final JobSchedulingStatus jobSchedulingStatus = runningJobsRegistry.getJobSchedulingStatus(jobGraph.getJobID());
+	private CompletableFuture<Void> verifyJobSchedulingStatusAndStartJobManager(UUID leaderSessionId) {
+		final CompletableFuture<JobSchedulingStatus> jobSchedulingStatusFuture = getJobSchedulingStatus();
+
+		return jobSchedulingStatusFuture.thenCompose(
+			jobSchedulingStatus -> {
+				if (jobSchedulingStatus == JobSchedulingStatus.DONE) {
+					return jobAlreadyDone();
+				} else {
+					return startJobMaster(leaderSessionId);
+				}
+			});
+	}
 
-		if (jobSchedulingStatus == JobSchedulingStatus.DONE) {
-			log.info("Granted leader ship but job {} has been finished. ", jobGraph.getJobID());
-			jobFinishedByOther();
-		} else {
-			log.info("JobManager runner for job {} ({}) was granted leadership with session id {} at {}.",
-				jobGraph.getName(), jobGraph.getJobID(), leaderSessionId, getAddress());
+	private CompletionStage<Void> startJobMaster(UUID leaderSessionId) {
+		log.info("JobManager runner for job {} ({}) was granted leadership with session id {} at {}.",
+			jobGraph.getName(), jobGraph.getJobID(), leaderSessionId, getAddress());
 
+		try {
 			runningJobsRegistry.setJobRunning(jobGraph.getJobID());
+		} catch (IOException e) {
+			return FutureUtils.completedExceptionally(
+				new FlinkException(
+					String.format("Failed to set the job %s to running in the running jobs registry.", jobGraph.getJobID()),
+					e));
+		}
 
-			final CompletableFuture<Acknowledge> startFuture = jobMasterService.start(new JobMasterId(leaderSessionId));
-			final CompletableFuture<JobMasterGateway> currentLeaderGatewayFuture = leaderGatewayFuture;
+		final CompletableFuture<Acknowledge> startFuture;
+		try {
+			startFuture = jobMasterService.start(new JobMasterId(leaderSessionId));
+		} catch (Exception e) {
+			return FutureUtils.completedExceptionally(new FlinkException("Failed to start the JobMaster.", e));
+		}
 
-			startFuture.whenCompleteAsync(
-				(Acknowledge ack, Throwable throwable) -> {
-					if (throwable != null) {
-						handleJobManagerRunnerError(new FlinkException("Could not start the job manager.", throwable));
-					} else {
-						confirmLeaderSessionIdIfStillLeader(leaderSessionId, currentLeaderGatewayFuture);
-					}
-				},
-				executor);
+		final CompletableFuture<JobMasterGateway> currentLeaderGatewayFuture = leaderGatewayFuture;
+		return startFuture.thenAcceptAsync(
+			(Acknowledge ack) -> confirmLeaderSessionIdIfStillLeader(leaderSessionId, currentLeaderGatewayFuture),
+			executor);
+	}
+
+	@Nonnull
+	private CompletionStage<Void> jobAlreadyDone() {
+		log.info("Granted leader ship but job {} has been finished. ", jobGraph.getJobID());
+		jobFinishedByOther();
+		return CompletableFuture.completedFuture(null);
+	}
+
+	private CompletableFuture<JobSchedulingStatus> getJobSchedulingStatus() {
+		try {
+			JobSchedulingStatus jobSchedulingStatus = runningJobsRegistry.getJobSchedulingStatus(jobGraph.getJobID());
+			return CompletableFuture.completedFuture(jobSchedulingStatus);
+		} catch (IOException e) {
+			return FutureUtils.completedExceptionally(
+				new FlinkException(
+					String.format("Could not retrieve the job scheduling status for job %s.", jobGraph.getJobID()),
+					e));
 		}
 	}
 
@@ -329,21 +371,35 @@ public class JobManagerRunner implements LeaderContender, OnCompletionActions, A
 				return;
 			}
 
-			log.info("JobManager for job {} ({}) was revoked leadership at {}.",
-				jobGraph.getName(), jobGraph.getJobID(), getAddress());
+			leadershipOperation = leadershipOperation.thenCompose(
+				(ignored) -> {
+					synchronized (lock) {
+						return revokeJobMasterLeadership();
+					}
+				});
+
+			handleException(leadershipOperation, "Could not suspend the job manager.");
+		}
+	}
 
-			setNewLeaderGatewayFuture();
+	private CompletableFuture<Void> revokeJobMasterLeadership() {
+		log.info("JobManager for job {} ({}) was revoked leadership at {}.",
+			jobGraph.getName(), jobGraph.getJobID(), getAddress());
 
-			CompletableFuture<Acknowledge>  suspendFuture = jobMasterService.suspend(new FlinkException("JobManager is no longer the leader."));
+		setNewLeaderGatewayFuture();
 
-			suspendFuture.whenCompleteAsync(
-				(Acknowledge ack, Throwable throwable) -> {
-					if (throwable != null) {
-						handleJobManagerRunnerError(new FlinkException("Could not suspend the job manager.", throwable));
-					}
-				},
-				executor);
-		}
+		return jobMasterService
+			.suspend(new FlinkException("JobManager is no longer the leader."))
+			.thenApply(FunctionUtils.nullFn());
+	}
+
+	private void handleException(CompletableFuture<Void> leadershipOperation, String message) {
+		leadershipOperation.whenComplete(
+			(ignored, throwable) -> {
+				if (throwable != null) {
+					handleJobManagerRunnerError(new FlinkException(message, throwable));
+				}
+			});
 	}
 
 	private void setNewLeaderGatewayFuture() {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index 62446b0..447b59a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -131,6 +131,7 @@ import java.util.concurrent.CompletionException;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeoutException;
+import java.util.function.Function;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.apache.flink.util.Preconditions.checkState;
@@ -334,11 +335,12 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast
 	 * @return Future acknowledge indicating that the job has been suspended. Otherwise the future contains an exception
 	 */
 	public CompletableFuture<Acknowledge> suspend(final Exception cause) {
-		CompletableFuture<Acknowledge> suspendFuture = callAsyncWithoutFencing(() -> suspendExecution(cause), RpcUtils.INF_TIMEOUT);
+		CompletableFuture<Acknowledge> suspendFuture = callAsyncWithoutFencing(
+				() -> suspendExecution(cause),
+				RpcUtils.INF_TIMEOUT)
+			.thenCompose(Function.identity());
 
-		stop();
-
-		return suspendFuture;
+		return suspendFuture.whenComplete((acknowledge, throwable) -> stop());
 	}
 
 	/**
@@ -1061,12 +1063,12 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast
 	 *
 	 * @param cause The reason of why this job been suspended.
 	 */
-	private Acknowledge suspendExecution(final Exception cause) {
+	private CompletableFuture<Acknowledge> suspendExecution(final Exception cause) {
 		validateRunsInMainThread();
 
 		if (getFencingToken() == null) {
 			log.debug("Job has already been suspended or shutdown.");
-			return Acknowledge.get();
+			return CompletableFuture.completedFuture(null);
 		}
 
 		// not leader anymore --> set the JobMasterId to null
@@ -1081,12 +1083,12 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast
 		suspendAndClearExecutionGraphFields(cause);
 
 		// the slot pool stops receiving messages and clears its pooled slots
-		slotPoolGateway.suspend();
+		CompletableFuture<Acknowledge> slotPoolSuspendFuture = slotPoolGateway.suspend();
 
 		// disconnect from resource manager:
 		closeResourceManagerConnection(cause);
 
-		return Acknowledge.get();
+		return slotPoolSuspendFuture;
 	}
 
 	private void assignExecutionGraph(
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
index 34d5cdc..b3fb36c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
@@ -242,7 +242,7 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway, AllocatedS
 	 * Suspends this pool, meaning it has lost its authority to accept and distribute slots.
 	 */
 	@Override
-	public void suspend() {
+	public CompletableFuture<Acknowledge> suspend() {
 		log.info("Suspending SlotPool.");
 
 		validateRunsInMainThread();
@@ -265,6 +265,8 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway, AllocatedS
 		// Clear (but not release!) the available slots. The TaskManagers should re-register them
 		// at the new leader JobManager/SlotPool
 		clear();
+
+		return CompletableFuture.completedFuture(Acknowledge.get());
 	}
 
 	// ------------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolGateway.java
index 3e546ff..4aa8614 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolGateway.java
@@ -48,7 +48,7 @@ public interface SlotPoolGateway extends AllocatedSlotActions, RpcGateway {
 	//  shutdown
 	// ------------------------------------------------------------------------
 
-	void suspend();
+	CompletableFuture<Acknowledge> suspend();
 
 	// ------------------------------------------------------------------------
 	//  resource manager connection
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerTest.java
index 584307b..b560600 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerTest.java
@@ -31,9 +31,10 @@ import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobmaster.factories.JobMasterServiceFactory;
-import org.apache.flink.runtime.jobmaster.factories.TestingJobMasterFactory;
+import org.apache.flink.runtime.jobmaster.factories.TestingJobMasterServiceFactory;
 import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
 import org.apache.flink.runtime.leaderretrieval.SettableLeaderRetrievalService;
+import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.runtime.testtasks.NoOpInvokable;
@@ -51,8 +52,11 @@ import org.junit.rules.TemporaryFolder;
 
 import javax.annotation.Nonnull;
 
+import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
 import static org.hamcrest.Matchers.instanceOf;
 import static org.hamcrest.Matchers.is;
@@ -73,10 +77,12 @@ public class JobManagerRunnerTest extends TestLogger {
 
 	private static LibraryCacheManager libraryCacheManager;
 
-	private static JobMasterServiceFactory jobMasterFactory;
+	private static JobMasterServiceFactory defaultJobMasterServiceFactory;
 
 	private TestingHighAvailabilityServices haServices;
 
+	private TestingLeaderElectionService leaderElectionService;
+
 	private TestingFatalErrorHandler fatalErrorHandler;
 
 	@BeforeClass
@@ -86,7 +92,7 @@ public class JobManagerRunnerTest extends TestLogger {
 			FlinkUserCodeClassLoaders.ResolveOrder.CHILD_FIRST,
 			new String[]{});
 
-		jobMasterFactory = TestingJobMasterFactory.INSTANCE;
+		defaultJobMasterServiceFactory = new TestingJobMasterServiceFactory();
 
 		final JobVertex jobVertex = new JobVertex("Test vertex");
 		jobVertex.setInvokableClass(NoOpInvokable.class);
@@ -100,8 +106,9 @@ public class JobManagerRunnerTest extends TestLogger {
 
 	@Before
 	public void setup() {
+		leaderElectionService = new TestingLeaderElectionService();
 		haServices = new TestingHighAvailabilityServices();
-		haServices.setJobMasterLeaderElectionService(jobGraph.getJobID(), new TestingLeaderElectionService());
+		haServices.setJobMasterLeaderElectionService(jobGraph.getJobID(), leaderElectionService);
 		haServices.setResourceManagerLeaderRetriever(new SettableLeaderRetrievalService());
 		haServices.setCheckpointRecoveryFactory(new StandaloneCheckpointRecoveryFactory());
 
@@ -209,19 +216,106 @@ public class JobManagerRunnerTest extends TestLogger {
 		}
 	}
 
+	/**
+	 * Tests that the {@link JobManagerRunner} always waits for the previous leadership operation
+	 * (granting or revoking leadership) to finish before starting a new leadership operation.
+	 */
+	@Test
+	public void testConcurrentLeadershipOperationsBlockingSuspend() throws Exception {
+		final CompletableFuture<Acknowledge> suspendedFuture = new CompletableFuture<>();
+
+		TestingJobMasterServiceFactory jobMasterServiceFactory = new TestingJobMasterServiceFactory(
+			() -> new TestingJobMasterService(
+				"localhost",
+				e -> suspendedFuture));
+		JobManagerRunner jobManagerRunner = createJobManagerRunner(jobMasterServiceFactory);
+
+		jobManagerRunner.start();
+
+		leaderElectionService.isLeader(UUID.randomUUID()).get();
+
+		leaderElectionService.notLeader();
+
+		final CompletableFuture<UUID> leaderFuture = leaderElectionService.isLeader(UUID.randomUUID());
+
+		// the new leadership should wait first for the suspension to happen
+		assertThat(leaderFuture.isDone(), is(false));
+
+		try {
+			leaderFuture.get(1L, TimeUnit.MILLISECONDS);
+			fail("Granted leadership even though the JobMaster has not been suspended.");
+		} catch (TimeoutException expected) {
+			// expected
+		}
+
+		suspendedFuture.complete(Acknowledge.get());
+
+		leaderFuture.get();
+	}
+
+	/**
+	 * Tests that the {@link JobManagerRunner} always waits for the previous leadership operation
+	 * (granting or revoking leadership) to finish before starting a new leadership operation.
+	 */
+	@Test
+	public void testConcurrentLeadershipOperationsBlockingGainLeadership() throws Exception {
+		final CompletableFuture<Exception> suspendFuture = new CompletableFuture<>();
+		final CompletableFuture<Acknowledge> startFuture = new CompletableFuture<>();
+
+		TestingJobMasterServiceFactory jobMasterServiceFactory = new TestingJobMasterServiceFactory(
+			() -> new TestingJobMasterService(
+				"localhost",
+				e -> {
+					suspendFuture.complete(e);
+					return CompletableFuture.completedFuture(Acknowledge.get());
+				},
+				ignored -> startFuture));
+		JobManagerRunner jobManagerRunner = createJobManagerRunner(jobMasterServiceFactory);
+
+		jobManagerRunner.start();
+
+		leaderElectionService.isLeader(UUID.randomUUID());
+
+		leaderElectionService.notLeader();
+
+		// suspending should wait for the start to happen first
+		assertThat(suspendFuture.isDone(), is(false));
+
+		try {
+			suspendFuture.get(1L, TimeUnit.MILLISECONDS);
+			fail("Suspended leadership even though the JobMaster has not been started.");
+		} catch (TimeoutException expected) {
+			// expected
+		}
+
+		startFuture.complete(Acknowledge.get());
+
+		suspendFuture.get();
+	}
+
 	@Nonnull
 	private JobManagerRunner createJobManagerRunner(LibraryCacheManager libraryCacheManager) throws Exception {
+		return createJobManagerRunner(defaultJobMasterServiceFactory, libraryCacheManager);
+	}
+
+	@Nonnull
+	private JobManagerRunner createJobManagerRunner() throws Exception {
+		return createJobManagerRunner(defaultJobMasterServiceFactory, libraryCacheManager);
+	}
+
+	@Nonnull
+	private JobManagerRunner createJobManagerRunner(JobMasterServiceFactory jobMasterServiceFactory) throws Exception {
+		return createJobManagerRunner(jobMasterServiceFactory, libraryCacheManager);
+	}
+
+	@Nonnull
+	private JobManagerRunner createJobManagerRunner(JobMasterServiceFactory jobMasterServiceFactory, LibraryCacheManager libraryCacheManager) throws Exception{
 		return new JobManagerRunner(
 			jobGraph,
-			jobMasterFactory,
+			jobMasterServiceFactory,
 			haServices,
 			libraryCacheManager,
 			TestingUtils.defaultExecutor(),
 			fatalErrorHandler);
 	}
-
-	@Nonnull
-	private JobManagerRunner createJobManagerRunner() throws Exception {
-		return createJobManagerRunner(libraryCacheManager);
-	}
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingJobMasterService.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingJobMasterService.java
index 7e65da1..02ff8eb 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingJobMasterService.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingJobMasterService.java
@@ -25,6 +25,7 @@ import org.apache.flink.util.Preconditions;
 import javax.annotation.Nonnull;
 
 import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
 
 /**
  * Implementation of the {@link JobMasterService} for testing purposes.
@@ -34,26 +35,43 @@ public class TestingJobMasterService implements JobMasterService {
 	@Nonnull
 	private final String address;
 
+	@Nonnull
+	private final Function<Exception, CompletableFuture<Acknowledge>> suspendFunction;
+
+	@Nonnull
+	private final Function<JobMasterId, CompletableFuture<Acknowledge>> startFunction;
+
 	private JobMasterGateway jobMasterGateway;
 
-	public TestingJobMasterService(@Nonnull String address) {
+	public TestingJobMasterService(@Nonnull String address, @Nonnull Function<Exception, CompletableFuture<Acknowledge>> suspendFunction) {
+		this(address, suspendFunction, ignored -> CompletableFuture.completedFuture(Acknowledge.get()));
+	}
+
+	public TestingJobMasterService(
+		@Nonnull String address,
+		@Nonnull Function<Exception, CompletableFuture<Acknowledge>> suspendFunction,
+		@Nonnull Function<JobMasterId, CompletableFuture<Acknowledge>> startFunction) {
 		this.address = address;
+		this.suspendFunction = suspendFunction;
+		this.startFunction = startFunction;
 	}
 
 	public TestingJobMasterService() {
-		this("localhost");
+		this(
+			"localhost",
+			e -> CompletableFuture.completedFuture(Acknowledge.get()));
 	}
 
 	@Override
 	public CompletableFuture<Acknowledge> start(JobMasterId jobMasterId) {
 			jobMasterGateway = new TestingJobMasterGatewayBuilder().build();
-			return CompletableFuture.completedFuture(Acknowledge.get());
+			return startFunction.apply(jobMasterId);
 	}
 
 	@Override
 	public CompletableFuture<Acknowledge> suspend(Exception cause) {
 		jobMasterGateway = null;
-		return CompletableFuture.completedFuture(Acknowledge.get());
+		return suspendFunction.apply(cause);
 	}
 
 	@Override
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/factories/TestingJobMasterFactory.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/factories/TestingJobMasterServiceFactory.java
similarity index 75%
rename from flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/factories/TestingJobMasterFactory.java
rename to flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/factories/TestingJobMasterServiceFactory.java
index ba7f1c8..e2de73b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/factories/TestingJobMasterFactory.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/factories/TestingJobMasterServiceFactory.java
@@ -24,14 +24,25 @@ import org.apache.flink.runtime.jobmaster.JobMaster;
 import org.apache.flink.runtime.jobmaster.JobMasterService;
 import org.apache.flink.runtime.jobmaster.TestingJobMasterService;
 
+import java.util.function.Supplier;
+
 /**
  * Testing implementation of the {@link JobMasterServiceFactory} which returns a {@link JobMaster} mock.
  */
-public enum TestingJobMasterFactory implements JobMasterServiceFactory {
-	INSTANCE;
+public class TestingJobMasterServiceFactory implements JobMasterServiceFactory {
+
+	private final Supplier<JobMasterService> jobMasterServiceSupplier;
+
+	public TestingJobMasterServiceFactory(Supplier<JobMasterService> jobMasterServiceSupplier) {
+		this.jobMasterServiceSupplier = jobMasterServiceSupplier;
+	}
+
+	public TestingJobMasterServiceFactory() {
+		this(TestingJobMasterService::new);
+	}
 
 	@Override
 	public JobMasterService createJobMasterService(JobGraph jobGraph, OnCompletionActions jobCompletionActions, ClassLoader userCodeClassloader) {
-		return new TestingJobMasterService();
+		return jobMasterServiceSupplier.get();
 	}
 }
diff --git a/flink-runtime/src/test/resources/log4j-test.properties b/flink-runtime/src/test/resources/log4j-test.properties
index 2778b7f..bc7fb42 100644
--- a/flink-runtime/src/test/resources/log4j-test.properties
+++ b/flink-runtime/src/test/resources/log4j-test.properties
@@ -18,7 +18,7 @@
 
 # Set root logger level to OFF to not flood build logs
 # set manually to INFO for debugging purposes
-log4j.rootLogger=OFF, testlogger
+log4j.rootLogger=DEBUG, testlogger
 
 # testlogger is set to be a ConsoleAppender.
 log4j.appender.testlogger=org.apache.log4j.ConsoleAppender


[flink] 01/02: [FLINK-11415] Introduce JobMasterServiceFactory

Posted by tr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit cae914f82e6b115be5c362a874160ebb7cea8eec
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Tue Jan 22 16:36:23 2019 +0100

    [FLINK-11415] Introduce JobMasterServiceFactory
    
    The JobMasterServiceFactory controls how the JobMasterService is constructed by
    the JobManagerRunner. This allows for an easier testing of this component.
    
    This closes #7564.
---
 .../dispatcher/DefaultJobManagerRunnerFactory.java | 77 +++++++++++++++++++
 .../flink/runtime/dispatcher/Dispatcher.java       | 53 -------------
 .../runtime/dispatcher/JobDispatcherFactory.java   |  2 +-
 .../dispatcher/JobManagerRunnerFactory.java        | 46 +++++++++++
 .../dispatcher/SessionDispatcherFactory.java       |  2 +-
 .../flink/runtime/jobmaster/JobManagerRunner.java  | 51 ++++---------
 .../factories/DefaultJobMasterServiceFactory.java  | 89 ++++++++++++++++++++++
 .../factories/JobMasterServiceFactory.java         | 34 +++++++++
 .../flink/runtime/minicluster/MiniCluster.java     |  3 +-
 .../runtime/blob/FailingPermanentBlobService.java  | 41 ++++++++++
 .../flink/runtime/dispatcher/DispatcherHATest.java |  7 +-
 .../flink/runtime/dispatcher/DispatcherTest.java   | 17 ++---
 .../dispatcher/TestingJobManagerRunnerFactory.java |  6 +-
 .../dispatcher/ZooKeeperHADispatcherTest.java      |  2 +-
 .../runtime/jobmaster/JobManagerRunnerTest.java    | 71 +++++++----------
 .../runtime/jobmaster/TestingJobMasterService.java | 75 ++++++++++++++++++
 .../factories/TestingJobMasterFactory.java         | 37 +++++++++
 17 files changed, 455 insertions(+), 158 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DefaultJobManagerRunnerFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DefaultJobManagerRunnerFactory.java
new file mode 100644
index 0000000..97afe24
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DefaultJobManagerRunnerFactory.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobmaster.JobManagerRunner;
+import org.apache.flink.runtime.jobmaster.JobManagerSharedServices;
+import org.apache.flink.runtime.jobmaster.JobMasterConfiguration;
+import org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory;
+import org.apache.flink.runtime.jobmaster.factories.JobManagerJobMetricGroupFactory;
+import org.apache.flink.runtime.jobmaster.factories.JobMasterServiceFactory;
+import org.apache.flink.runtime.jobmaster.slotpool.DefaultSlotPoolFactory;
+import org.apache.flink.runtime.jobmaster.slotpool.SlotPoolFactory;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcService;
+
+/**
+ * Singleton default factory for {@link JobManagerRunner}.
+ */
+public enum DefaultJobManagerRunnerFactory implements JobManagerRunnerFactory {
+	INSTANCE;
+
+	@Override
+	public JobManagerRunner createJobManagerRunner(
+			JobGraph jobGraph,
+			Configuration configuration,
+			RpcService rpcService,
+			HighAvailabilityServices highAvailabilityServices,
+			HeartbeatServices heartbeatServices,
+			JobManagerSharedServices jobManagerServices,
+			JobManagerJobMetricGroupFactory jobManagerJobMetricGroupFactory,
+			FatalErrorHandler fatalErrorHandler) throws Exception {
+
+		final JobMasterConfiguration jobMasterConfiguration = JobMasterConfiguration.fromConfiguration(configuration);
+
+		final SlotPoolFactory slotPoolFactory = DefaultSlotPoolFactory.fromConfiguration(
+			configuration,
+			rpcService);
+
+		final JobMasterServiceFactory jobMasterFactory = new DefaultJobMasterServiceFactory(
+			jobMasterConfiguration,
+			slotPoolFactory,
+			rpcService,
+			highAvailabilityServices,
+			jobManagerServices,
+			heartbeatServices,
+			jobManagerJobMetricGroupFactory,
+			fatalErrorHandler);
+
+		return new JobManagerRunner(
+			jobGraph,
+			jobMasterFactory,
+			highAvailabilityServices,
+			jobManagerServices.getLibraryCacheManager(),
+			jobManagerServices.getScheduledExecutorService(),
+			fatalErrorHandler);
+	}
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
index 81b826e..a4651fb 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
@@ -45,7 +45,6 @@ import org.apache.flink.runtime.jobmaster.JobNotFinishedException;
 import org.apache.flink.runtime.jobmaster.JobResult;
 import org.apache.flink.runtime.jobmaster.RescalingBehaviour;
 import org.apache.flink.runtime.jobmaster.factories.DefaultJobManagerJobMetricGroupFactory;
-import org.apache.flink.runtime.jobmaster.factories.JobManagerJobMetricGroupFactory;
 import org.apache.flink.runtime.leaderelection.LeaderContender;
 import org.apache.flink.runtime.leaderelection.LeaderElectionService;
 import org.apache.flink.runtime.messages.Acknowledge;
@@ -308,7 +307,6 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme
 		final CompletableFuture<JobManagerRunner> jobManagerRunnerFuture = CompletableFuture.supplyAsync(
 			CheckedSupplier.unchecked(() ->
 				jobManagerRunnerFactory.createJobManagerRunner(
-					ResourceID.generate(),
 					jobGraph,
 					configuration,
 					rpcService,
@@ -1009,55 +1007,4 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme
 			}
 		});
 	}
-
-	//------------------------------------------------------
-	// Factories
-	//------------------------------------------------------
-
-	/**
-	 * Factory for a {@link JobManagerRunner}.
-	 */
-	@FunctionalInterface
-	public interface JobManagerRunnerFactory {
-		JobManagerRunner createJobManagerRunner(
-			ResourceID resourceId,
-			JobGraph jobGraph,
-			Configuration configuration,
-			RpcService rpcService,
-			HighAvailabilityServices highAvailabilityServices,
-			HeartbeatServices heartbeatServices,
-			JobManagerSharedServices jobManagerServices,
-			JobManagerJobMetricGroupFactory jobManagerJobMetricGroupFactory,
-			FatalErrorHandler fatalErrorHandler) throws Exception;
-	}
-
-	/**
-	 * Singleton default factory for {@link JobManagerRunner}.
-	 */
-	public enum DefaultJobManagerRunnerFactory implements JobManagerRunnerFactory {
-		INSTANCE;
-
-		@Override
-		public JobManagerRunner createJobManagerRunner(
-				ResourceID resourceId,
-				JobGraph jobGraph,
-				Configuration configuration,
-				RpcService rpcService,
-				HighAvailabilityServices highAvailabilityServices,
-				HeartbeatServices heartbeatServices,
-				JobManagerSharedServices jobManagerServices,
-				JobManagerJobMetricGroupFactory jobManagerJobMetricGroupFactory,
-				FatalErrorHandler fatalErrorHandler) throws Exception {
-			return new JobManagerRunner(
-				resourceId,
-				jobGraph,
-				configuration,
-				rpcService,
-				highAvailabilityServices,
-				heartbeatServices,
-				jobManagerServices,
-				jobManagerJobMetricGroupFactory,
-				fatalErrorHandler);
-		}
-	}
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/JobDispatcherFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/JobDispatcherFactory.java
index a2a6930..16f2e64 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/JobDispatcherFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/JobDispatcherFactory.java
@@ -75,7 +75,7 @@ public class JobDispatcherFactory implements DispatcherFactory<MiniDispatcher> {
 			jobManagerMetricGroup,
 			metricQueryServicePath,
 			archivedExecutionGraphStore,
-			Dispatcher.DefaultJobManagerRunnerFactory.INSTANCE,
+			DefaultJobManagerRunnerFactory.INSTANCE,
 			fatalErrorHandler,
 			historyServerArchivist,
 			jobGraph,
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/JobManagerRunnerFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/JobManagerRunnerFactory.java
new file mode 100644
index 0000000..9caf64d
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/JobManagerRunnerFactory.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobmaster.JobManagerRunner;
+import org.apache.flink.runtime.jobmaster.JobManagerSharedServices;
+import org.apache.flink.runtime.jobmaster.factories.JobManagerJobMetricGroupFactory;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcService;
+
+/**
+ * Factory for a {@link JobManagerRunner}.
+ */
+@FunctionalInterface
+public interface JobManagerRunnerFactory {
+
+	JobManagerRunner createJobManagerRunner(
+		JobGraph jobGraph,
+		Configuration configuration,
+		RpcService rpcService,
+		HighAvailabilityServices highAvailabilityServices,
+		HeartbeatServices heartbeatServices,
+		JobManagerSharedServices jobManagerServices,
+		JobManagerJobMetricGroupFactory jobManagerJobMetricGroupFactory,
+		FatalErrorHandler fatalErrorHandler) throws Exception;
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/SessionDispatcherFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/SessionDispatcherFactory.java
index 1bcf04e..4d0ec40 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/SessionDispatcherFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/SessionDispatcherFactory.java
@@ -60,7 +60,7 @@ public enum SessionDispatcherFactory implements DispatcherFactory<Dispatcher> {
 			jobManagerMetricGroup,
 			metricQueryServicePath,
 			archivedExecutionGraphStore,
-			Dispatcher.DefaultJobManagerRunnerFactory.INSTANCE,
+			DefaultJobManagerRunnerFactory.INSTANCE,
 			fatalErrorHandler,
 			historyServerArchivist);
 	}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
index 1ac2f80..ed79455 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
@@ -19,25 +19,19 @@
 package org.apache.flink.runtime.jobmaster;
 
 import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.client.JobExecutionException;
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
 import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
-import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.highavailability.RunningJobsRegistry;
 import org.apache.flink.runtime.highavailability.RunningJobsRegistry.JobSchedulingStatus;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobmanager.OnCompletionActions;
-import org.apache.flink.runtime.jobmaster.factories.JobManagerJobMetricGroupFactory;
-import org.apache.flink.runtime.jobmaster.slotpool.DefaultSlotPoolFactory;
-import org.apache.flink.runtime.jobmaster.slotpool.SlotPoolFactory;
+import org.apache.flink.runtime.jobmaster.factories.JobMasterServiceFactory;
 import org.apache.flink.runtime.leaderelection.LeaderContender;
 import org.apache.flink.runtime.leaderelection.LeaderElectionService;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
-import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.util.AutoCloseableAsync;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
@@ -48,6 +42,7 @@ import org.slf4j.LoggerFactory;
 import java.io.IOException;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -74,7 +69,9 @@ public class JobManagerRunner implements LeaderContender, OnCompletionActions, A
 	/** Leader election for this job. */
 	private final LeaderElectionService leaderElectionService;
 
-	private final JobManagerSharedServices jobManagerSharedServices;
+	private final LibraryCacheManager libraryCacheManager;
+
+	private final Executor executor;
 
 	private final JobMasterService jobMasterService;
 
@@ -99,14 +96,11 @@ public class JobManagerRunner implements LeaderContender, OnCompletionActions, A
 	 *                   required services could not be started, or the Job could not be initialized.
 	 */
 	public JobManagerRunner(
-			final ResourceID resourceId,
 			final JobGraph jobGraph,
-			final Configuration configuration,
-			final RpcService rpcService,
+			final JobMasterServiceFactory jobMasterFactory,
 			final HighAvailabilityServices haServices,
-			final HeartbeatServices heartbeatServices,
-			final JobManagerSharedServices jobManagerSharedServices,
-			final JobManagerJobMetricGroupFactory jobManagerJobMetricGroupFactory,
+			final LibraryCacheManager libraryCacheManager,
+			final Executor executor,
 			final FatalErrorHandler fatalErrorHandler) throws Exception {
 
 		this.resultFuture = new CompletableFuture<>();
@@ -115,13 +109,13 @@ public class JobManagerRunner implements LeaderContender, OnCompletionActions, A
 		// make sure we cleanly shut down out JobManager services if initialization fails
 		try {
 			this.jobGraph = checkNotNull(jobGraph);
-			this.jobManagerSharedServices = checkNotNull(jobManagerSharedServices);
+			this.libraryCacheManager = checkNotNull(libraryCacheManager);
+			this.executor = checkNotNull(executor);
 			this.fatalErrorHandler = checkNotNull(fatalErrorHandler);
 
 			checkArgument(jobGraph.getNumberOfVertices() > 0, "The given job is empty");
 
 			// libraries and class loader first
-			final LibraryCacheManager libraryCacheManager = jobManagerSharedServices.getLibraryCacheManager();
 			try {
 				libraryCacheManager.registerJob(
 						jobGraph.getJobID(), jobGraph.getUserJarBlobKeys(), jobGraph.getClasspaths());
@@ -138,28 +132,10 @@ public class JobManagerRunner implements LeaderContender, OnCompletionActions, A
 			this.runningJobsRegistry = haServices.getRunningJobsRegistry();
 			this.leaderElectionService = haServices.getJobManagerLeaderElectionService(jobGraph.getJobID());
 
-			final JobMasterConfiguration jobMasterConfiguration = JobMasterConfiguration.fromConfiguration(configuration);
-
 			this.leaderGatewayFuture = new CompletableFuture<>();
 
-			final SlotPoolFactory slotPoolFactory = DefaultSlotPoolFactory.fromConfiguration(
-				configuration,
-				rpcService);
-
 			// now start the JobManager
-			this.jobMasterService = new JobMaster(
-				rpcService,
-				jobMasterConfiguration,
-				resourceId,
-				jobGraph,
-				haServices,
-				slotPoolFactory,
-				jobManagerSharedServices,
-				heartbeatServices,
-				jobManagerJobMetricGroupFactory,
-				this,
-				fatalErrorHandler,
-				userCodeLoader);
+			this.jobMasterService = jobMasterFactory.createJobMasterService(jobGraph, this, userCodeLoader);
 		}
 		catch (Throwable t) {
 			terminationFuture.completeExceptionally(t);
@@ -217,7 +193,6 @@ public class JobManagerRunner implements LeaderContender, OnCompletionActions, A
 							throwable = ExceptionUtils.firstOrSuppressed(t, ExceptionUtils.stripCompletionException(throwable));
 						}
 
-						final LibraryCacheManager libraryCacheManager = jobManagerSharedServices.getLibraryCacheManager();
 						libraryCacheManager.unregisterJob(jobGraph.getJobID());
 
 						if (throwable != null) {
@@ -333,7 +308,7 @@ public class JobManagerRunner implements LeaderContender, OnCompletionActions, A
 						confirmLeaderSessionIdIfStillLeader(leaderSessionId, currentLeaderGatewayFuture);
 					}
 				},
-				jobManagerSharedServices.getScheduledExecutorService());
+				executor);
 		}
 	}
 
@@ -367,7 +342,7 @@ public class JobManagerRunner implements LeaderContender, OnCompletionActions, A
 						handleJobManagerRunnerError(new FlinkException("Could not suspend the job manager.", throwable));
 					}
 				},
-				jobManagerSharedServices.getScheduledExecutorService());
+				executor);
 		}
 	}
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/factories/DefaultJobMasterServiceFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/factories/DefaultJobMasterServiceFactory.java
new file mode 100644
index 0000000..58aa948
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/factories/DefaultJobMasterServiceFactory.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.factories;
+
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobmanager.OnCompletionActions;
+import org.apache.flink.runtime.jobmaster.JobManagerSharedServices;
+import org.apache.flink.runtime.jobmaster.JobMaster;
+import org.apache.flink.runtime.jobmaster.JobMasterConfiguration;
+import org.apache.flink.runtime.jobmaster.slotpool.SlotPoolFactory;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcService;
+
+/**
+ * Default implementation of the {@link JobMasterServiceFactory}.
+ */
+public class DefaultJobMasterServiceFactory implements JobMasterServiceFactory {
+
+	private final JobMasterConfiguration jobMasterConfiguration;
+
+	private final SlotPoolFactory slotPoolFactory;
+
+	private final RpcService rpcService;
+
+	private final HighAvailabilityServices haServices;
+
+	private final JobManagerSharedServices jobManagerSharedServices;
+
+	private final HeartbeatServices heartbeatServices;
+
+	private final JobManagerJobMetricGroupFactory jobManagerJobMetricGroupFactory;
+
+	private final FatalErrorHandler fatalErrorHandler;
+
+	public DefaultJobMasterServiceFactory(
+			JobMasterConfiguration jobMasterConfiguration,
+			SlotPoolFactory slotPoolFactory,
+			RpcService rpcService,
+			HighAvailabilityServices haServices,
+			JobManagerSharedServices jobManagerSharedServices,
+			HeartbeatServices heartbeatServices,
+			JobManagerJobMetricGroupFactory jobManagerJobMetricGroupFactory,
+			FatalErrorHandler fatalErrorHandler) {
+		this.jobMasterConfiguration = jobMasterConfiguration;
+		this.slotPoolFactory = slotPoolFactory;
+		this.rpcService = rpcService;
+		this.haServices = haServices;
+		this.jobManagerSharedServices = jobManagerSharedServices;
+		this.heartbeatServices = heartbeatServices;
+		this.jobManagerJobMetricGroupFactory = jobManagerJobMetricGroupFactory;
+		this.fatalErrorHandler = fatalErrorHandler;
+	}
+
+	@Override
+	public JobMaster createJobMasterService(JobGraph jobGraph, OnCompletionActions jobCompletionActions, ClassLoader userCodeClassloader) throws Exception {
+		return new JobMaster(
+			rpcService,
+			jobMasterConfiguration,
+			ResourceID.generate(),
+			jobGraph,
+			haServices,
+			slotPoolFactory,
+			jobManagerSharedServices,
+			heartbeatServices,
+			jobManagerJobMetricGroupFactory,
+			jobCompletionActions,
+			fatalErrorHandler,
+			userCodeClassloader);
+	}
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/factories/JobMasterServiceFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/factories/JobMasterServiceFactory.java
new file mode 100644
index 0000000..e51a889
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/factories/JobMasterServiceFactory.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.factories;
+
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobmanager.OnCompletionActions;
+import org.apache.flink.runtime.jobmaster.JobMasterService;
+
+/**
+ * Factory for a {@link JobMasterService}.
+ */
+public interface JobMasterServiceFactory {
+
+	JobMasterService createJobMasterService(
+		JobGraph jobGraph,
+		OnCompletionActions jobCompletionActions,
+		ClassLoader userCodeClassloader) throws Exception;
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
index f25c73c..b17432b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
@@ -37,6 +37,7 @@ import org.apache.flink.runtime.client.JobStatusMessage;
 import org.apache.flink.runtime.clusterframework.FlinkResourceManager;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.dispatcher.DefaultJobManagerRunnerFactory;
 import org.apache.flink.runtime.dispatcher.Dispatcher;
 import org.apache.flink.runtime.dispatcher.DispatcherGateway;
 import org.apache.flink.runtime.dispatcher.DispatcherId;
@@ -400,7 +401,7 @@ public class MiniCluster implements JobExecutorService, AutoCloseableAsync {
 					jobManagerMetricGroup,
 					metricRegistry.getMetricQueryServicePath(),
 					new MemoryArchivedExecutionGraphStore(),
-					Dispatcher.DefaultJobManagerRunnerFactory.INSTANCE,
+					DefaultJobManagerRunnerFactory.INSTANCE,
 					new ShutDownFatalErrorHandler(),
 					historyServerArchivist);
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/FailingPermanentBlobService.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/FailingPermanentBlobService.java
new file mode 100644
index 0000000..06326d5
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/FailingPermanentBlobService.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.blob;
+
+import org.apache.flink.api.common.JobID;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+
+/**
+ * Testing implementation of {@link PermanentBlobService} which always fails the
+ * {@link #getFile(JobID, PermanentBlobKey)} call.
+ */
+public enum FailingPermanentBlobService implements PermanentBlobService {
+	INSTANCE;
+
+	@Override
+	public File getFile(JobID jobId, PermanentBlobKey key) throws IOException {
+		throw new FileNotFoundException(String.format("Could not find file for blob key %s belonging to job %s.", key, jobId));
+	}
+
+	@Override
+	public void close() {}
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherHATest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherHATest.java
index b965e71..384704d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherHATest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherHATest.java
@@ -293,11 +293,6 @@ public class DispatcherHATest extends TestLogger {
 		return new TestingJobManagerRunnerFactory(new CompletableFuture<>(), new CompletableFuture<>(), CompletableFuture.completedFuture(null));
 	}
 
-	@Nonnull
-	private HATestingDispatcher createDispatcherWithJobManagerRunnerFactory(HighAvailabilityServices highAvailabilityServices, Dispatcher.JobManagerRunnerFactory jobManagerRunnerFactory) throws Exception {
-		return createDispatcher(highAvailabilityServices, null, jobManagerRunnerFactory);
-	}
-
 	private HATestingDispatcher createDispatcher(HighAvailabilityServices haServices) throws Exception {
 		return createDispatcher(
 			haServices,
@@ -309,7 +304,7 @@ public class DispatcherHATest extends TestLogger {
 	private HATestingDispatcher createDispatcher(
 		HighAvailabilityServices highAvailabilityServices,
 		@Nullable Queue<DispatcherId> fencingTokens,
-		Dispatcher.JobManagerRunnerFactory jobManagerRunnerFactory) throws Exception {
+		JobManagerRunnerFactory jobManagerRunnerFactory) throws Exception {
 		final Configuration configuration = new Configuration();
 
 		return new HATestingDispatcher(
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
index e10b8bc..fa4705a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
@@ -28,7 +28,6 @@ import org.apache.flink.runtime.blob.VoidBlobStore;
 import org.apache.flink.runtime.checkpoint.Checkpoints;
 import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
 import org.apache.flink.runtime.checkpoint.savepoint.SavepointV2;
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
 import org.apache.flink.runtime.executiongraph.ErrorInfo;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
@@ -201,7 +200,7 @@ public class DispatcherTest extends TestLogger {
 	}
 
 	@Nonnull
-	private TestingDispatcher createAndStartDispatcher(HeartbeatServices heartbeatServices, TestingHighAvailabilityServices haServices, Dispatcher.JobManagerRunnerFactory jobManagerRunnerFactory) throws Exception {
+	private TestingDispatcher createAndStartDispatcher(HeartbeatServices heartbeatServices, TestingHighAvailabilityServices haServices, JobManagerRunnerFactory jobManagerRunnerFactory) throws Exception {
 		final TestingDispatcher dispatcher = createDispatcher(heartbeatServices, haServices, jobManagerRunnerFactory);
 		dispatcher.start();
 
@@ -209,7 +208,7 @@ public class DispatcherTest extends TestLogger {
 	}
 
 	@Nonnull
-	private TestingDispatcher createDispatcher(HeartbeatServices heartbeatServices, TestingHighAvailabilityServices haServices, Dispatcher.JobManagerRunnerFactory jobManagerRunnerFactory) throws Exception {
+	private TestingDispatcher createDispatcher(HeartbeatServices heartbeatServices, TestingHighAvailabilityServices haServices, JobManagerRunnerFactory jobManagerRunnerFactory) throws Exception {
 		return new TestingDispatcher(
 			rpcService,
 			Dispatcher.DISPATCHER_NAME + '_' + name.getMethodName(),
@@ -668,7 +667,7 @@ public class DispatcherTest extends TestLogger {
 		final InMemorySubmittedJobGraphStore submittedJobGraphStore = new InMemorySubmittedJobGraphStore();
 		haServices.setSubmittedJobGraphStore(submittedJobGraphStore);
 
-		dispatcher = createAndStartDispatcher(heartbeatServices, haServices, Dispatcher.DefaultJobManagerRunnerFactory.INSTANCE);
+		dispatcher = createAndStartDispatcher(heartbeatServices, haServices, DefaultJobManagerRunnerFactory.INSTANCE);
 
 		// grant leadership and submit a single job
 		final DispatcherId expectedDispatcherId = DispatcherId.generate();
@@ -697,10 +696,10 @@ public class DispatcherTest extends TestLogger {
 		}
 
 		@Override
-		public JobManagerRunner createJobManagerRunner(ResourceID resourceId, JobGraph jobGraph, Configuration configuration, RpcService rpcService, HighAvailabilityServices highAvailabilityServices, HeartbeatServices heartbeatServices, JobManagerSharedServices jobManagerSharedServices, JobManagerJobMetricGroupFactory jobManagerJobMetricGroupFactory, FatalErrorHandler fatalErrorHandler) throws Exception {
+		public JobManagerRunner createJobManagerRunner(JobGraph jobGraph, Configuration configuration, RpcService rpcService, HighAvailabilityServices highAvailabilityServices, HeartbeatServices heartbeatServices, JobManagerSharedServices jobManagerSharedServices, JobManagerJobMetricGroupFactory jobManagerJobMetricGroupFactory, FatalErrorHandler fatalErrorHandler) throws Exception {
 			jobManagerRunnerCreationLatch.run();
 
-			return super.createJobManagerRunner(resourceId, jobGraph, configuration, rpcService, highAvailabilityServices, heartbeatServices, jobManagerSharedServices, jobManagerJobMetricGroupFactory, fatalErrorHandler);
+			return super.createJobManagerRunner(jobGraph, configuration, rpcService, highAvailabilityServices, heartbeatServices, jobManagerSharedServices, jobManagerJobMetricGroupFactory, fatalErrorHandler);
 		}
 	}
 
@@ -735,7 +734,7 @@ public class DispatcherTest extends TestLogger {
 		}
 	}
 
-	private static final class ExpectedJobIdJobManagerRunnerFactory implements Dispatcher.JobManagerRunnerFactory {
+	private static final class ExpectedJobIdJobManagerRunnerFactory implements JobManagerRunnerFactory {
 
 		private final JobID expectedJobId;
 
@@ -748,7 +747,6 @@ public class DispatcherTest extends TestLogger {
 
 		@Override
 		public JobManagerRunner createJobManagerRunner(
-				ResourceID resourceId,
 				JobGraph jobGraph,
 				Configuration configuration,
 				RpcService rpcService,
@@ -761,8 +759,7 @@ public class DispatcherTest extends TestLogger {
 
 			createdJobManagerRunnerLatch.countDown();
 
-			return Dispatcher.DefaultJobManagerRunnerFactory.INSTANCE.createJobManagerRunner(
-				resourceId,
+			return DefaultJobManagerRunnerFactory.INSTANCE.createJobManagerRunner(
 				jobGraph,
 				configuration,
 				rpcService,
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingJobManagerRunnerFactory.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingJobManagerRunnerFactory.java
index 30e4af6..c19038c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingJobManagerRunnerFactory.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingJobManagerRunnerFactory.java
@@ -19,7 +19,6 @@
 package org.apache.flink.runtime.dispatcher;
 
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
@@ -38,10 +37,10 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
 /**
- * {@link org.apache.flink.runtime.dispatcher.Dispatcher.JobManagerRunnerFactory} implementation for
+ * {@link JobManagerRunnerFactory} implementation for
  * testing purposes.
  */
-class TestingJobManagerRunnerFactory implements Dispatcher.JobManagerRunnerFactory {
+class TestingJobManagerRunnerFactory implements JobManagerRunnerFactory {
 
 	private final CompletableFuture<JobGraph> jobGraphFuture;
 	private final CompletableFuture<ArchivedExecutionGraph> resultFuture;
@@ -68,7 +67,6 @@ class TestingJobManagerRunnerFactory implements Dispatcher.JobManagerRunnerFacto
 
 	@Override
 	public JobManagerRunner createJobManagerRunner(
-			ResourceID resourceId,
 			JobGraph jobGraph,
 			Configuration configuration,
 			RpcService rpcService,
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/ZooKeeperHADispatcherTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/ZooKeeperHADispatcherTest.java
index 9c23f9d..d3d80d4 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/ZooKeeperHADispatcherTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/ZooKeeperHADispatcherTest.java
@@ -325,7 +325,7 @@ public class ZooKeeperHADispatcherTest extends TestLogger {
 	}
 
 	@Nonnull
-	private TestingDispatcher createDispatcher(HighAvailabilityServices highAvailabilityServices, Dispatcher.JobManagerRunnerFactory jobManagerRunnerFactory) throws Exception {
+	private TestingDispatcher createDispatcher(HighAvailabilityServices highAvailabilityServices, JobManagerRunnerFactory jobManagerRunnerFactory) throws Exception {
 		return new TestingDispatcher(
 			rpcService,
 			Dispatcher.DISPATCHER_NAME + '_' + name.getMethodName() + UUID.randomUUID(),
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerTest.java
index da5a9ff..584307b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerTest.java
@@ -19,24 +19,23 @@
 package org.apache.flink.runtime.jobmaster;
 
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.configuration.BlobServerOptions;
-import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.blob.FailingPermanentBlobService;
 import org.apache.flink.runtime.blob.VoidPermanentBlobService;
 import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
 import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders;
+import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
 import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
-import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertex;
-import org.apache.flink.runtime.jobmaster.factories.UnregisteredJobManagerJobMetricGroupFactory;
+import org.apache.flink.runtime.jobmaster.factories.JobMasterServiceFactory;
+import org.apache.flink.runtime.jobmaster.factories.TestingJobMasterFactory;
 import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
 import org.apache.flink.runtime.leaderretrieval.SettableLeaderRetrievalService;
 import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder;
-import org.apache.flink.runtime.rpc.TestingRpcService;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.runtime.testtasks.NoOpInvokable;
 import org.apache.flink.runtime.util.TestingFatalErrorHandler;
 import org.apache.flink.util.ExceptionUtils;
@@ -68,30 +67,26 @@ public class JobManagerRunnerTest extends TestLogger {
 	@ClassRule
 	public static TemporaryFolder temporaryFolder = new TemporaryFolder();
 
-	private static Configuration configuration;
-
-	private static TestingRpcService rpcService;
-
-	private static HeartbeatServices heartbeatServices = new HeartbeatServices(1000L, 1000L);
-
-	private static JobManagerSharedServices jobManagerSharedServices;
-
 	private static JobGraph jobGraph;
 
 	private static ArchivedExecutionGraph archivedExecutionGraph;
 
+	private static LibraryCacheManager libraryCacheManager;
+
+	private static JobMasterServiceFactory jobMasterFactory;
+
 	private TestingHighAvailabilityServices haServices;
 
 	private TestingFatalErrorHandler fatalErrorHandler;
 
 	@BeforeClass
-	public static void setupClass() throws Exception {
-		configuration = new Configuration();
-		rpcService = new TestingRpcService();
-
-		configuration.setString(BlobServerOptions.STORAGE_DIRECTORY, temporaryFolder.newFolder().getAbsolutePath());
+	public static void setupClass() {
+		libraryCacheManager = new BlobLibraryCacheManager(
+			FailingPermanentBlobService.INSTANCE,
+			FlinkUserCodeClassLoaders.ResolveOrder.CHILD_FIRST,
+			new String[]{});
 
-		jobManagerSharedServices = new TestingJobManagerSharedServicesBuilder().build();
+		jobMasterFactory = TestingJobMasterFactory.INSTANCE;
 
 		final JobVertex jobVertex = new JobVertex("Test vertex");
 		jobVertex.setInvokableClass(NoOpInvokable.class);
@@ -119,13 +114,9 @@ public class JobManagerRunnerTest extends TestLogger {
 	}
 
 	@AfterClass
-	public static void tearDownClass() throws Exception {
-		if (jobManagerSharedServices != null) {
-			jobManagerSharedServices.shutdown();
-		}
-
-		if (rpcService != null) {
-			rpcService.stopService();
+	public static void tearDownClass() {
+		if (libraryCacheManager != null) {
+			libraryCacheManager.shutdown();
 		}
 	}
 
@@ -202,10 +193,7 @@ public class JobManagerRunnerTest extends TestLogger {
 			VoidPermanentBlobService.INSTANCE,
 			FlinkUserCodeClassLoaders.ResolveOrder.CHILD_FIRST,
 			new String[]{});
-		final JobManagerSharedServices jobManagerSharedServices = new TestingJobManagerSharedServicesBuilder()
-			.setLibraryCacheManager(libraryCacheManager)
-			.build();
-		final JobManagerRunner jobManagerRunner = createJobManagerRunner(jobManagerSharedServices);
+		final JobManagerRunner jobManagerRunner = createJobManagerRunner(libraryCacheManager);
 
 		try {
 			jobManagerRunner.start();
@@ -222,21 +210,18 @@ public class JobManagerRunnerTest extends TestLogger {
 	}
 
 	@Nonnull
-	private JobManagerRunner createJobManagerRunner() throws Exception {
-		return createJobManagerRunner(jobManagerSharedServices);
-	}
-
-	@Nonnull
-	private JobManagerRunner createJobManagerRunner(final JobManagerSharedServices jobManagerSharedServices) throws Exception {
+	private JobManagerRunner createJobManagerRunner(LibraryCacheManager libraryCacheManager) throws Exception {
 		return new JobManagerRunner(
-			ResourceID.generate(),
 			jobGraph,
-			configuration,
-			rpcService,
+			jobMasterFactory,
 			haServices,
-			heartbeatServices,
-			jobManagerSharedServices,
-			UnregisteredJobManagerJobMetricGroupFactory.INSTANCE,
+			libraryCacheManager,
+			TestingUtils.defaultExecutor(),
 			fatalErrorHandler);
 	}
+
+	@Nonnull
+	private JobManagerRunner createJobManagerRunner() throws Exception {
+		return createJobManagerRunner(libraryCacheManager);
+	}
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingJobMasterService.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingJobMasterService.java
new file mode 100644
index 0000000..7e65da1
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingJobMasterService.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster;
+
+import org.apache.flink.runtime.jobmaster.utils.TestingJobMasterGatewayBuilder;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nonnull;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Implementation of the {@link JobMasterService} for testing purposes.
+ */
+public class TestingJobMasterService implements JobMasterService {
+
+	@Nonnull
+	private final String address;
+
+	private JobMasterGateway jobMasterGateway;
+
+	public TestingJobMasterService(@Nonnull String address) {
+		this.address = address;
+	}
+
+	public TestingJobMasterService() {
+		this("localhost");
+	}
+
+	@Override
+	public CompletableFuture<Acknowledge> start(JobMasterId jobMasterId) {
+			jobMasterGateway = new TestingJobMasterGatewayBuilder().build();
+			return CompletableFuture.completedFuture(Acknowledge.get());
+	}
+
+	@Override
+	public CompletableFuture<Acknowledge> suspend(Exception cause) {
+		jobMasterGateway = null;
+		return CompletableFuture.completedFuture(Acknowledge.get());
+	}
+
+	@Override
+	public JobMasterGateway getGateway() {
+		Preconditions.checkNotNull(jobMasterGateway, "TestingJobMasterService has not been started yet.");
+		return jobMasterGateway;
+	}
+
+	@Override
+	public String getAddress() {
+		return address;
+	}
+
+	@Override
+	public CompletableFuture<Void> closeAsync() {
+		jobMasterGateway = null;
+		return CompletableFuture.completedFuture(null);
+	}
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/factories/TestingJobMasterFactory.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/factories/TestingJobMasterFactory.java
new file mode 100644
index 0000000..ba7f1c8
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/factories/TestingJobMasterFactory.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.factories;
+
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobmanager.OnCompletionActions;
+import org.apache.flink.runtime.jobmaster.JobMaster;
+import org.apache.flink.runtime.jobmaster.JobMasterService;
+import org.apache.flink.runtime.jobmaster.TestingJobMasterService;
+
+/**
+ * Testing implementation of the {@link JobMasterServiceFactory} which returns a {@link JobMaster} mock.
+ */
+public enum TestingJobMasterFactory implements JobMasterServiceFactory {
+	INSTANCE;
+
+	@Override
+	public JobMasterService createJobMasterService(JobGraph jobGraph, OnCompletionActions jobCompletionActions, ClassLoader userCodeClassloader) {
+		return new TestingJobMasterService();
+	}
+}