You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2019/01/30 11:11:51 UTC

[flink] 01/02: [FLINK-11415] Introduce JobMasterServiceFactory

This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit cae914f82e6b115be5c362a874160ebb7cea8eec
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Tue Jan 22 16:36:23 2019 +0100

    [FLINK-11415] Introduce JobMasterServiceFactory
    
    The JobMasterServiceFactory controls how the JobMasterService is constructed by
    the JobManagerRunner. This allows for an easier testing of this component.
    
    This closes #7564.
---
 .../dispatcher/DefaultJobManagerRunnerFactory.java | 77 +++++++++++++++++++
 .../flink/runtime/dispatcher/Dispatcher.java       | 53 -------------
 .../runtime/dispatcher/JobDispatcherFactory.java   |  2 +-
 .../dispatcher/JobManagerRunnerFactory.java        | 46 +++++++++++
 .../dispatcher/SessionDispatcherFactory.java       |  2 +-
 .../flink/runtime/jobmaster/JobManagerRunner.java  | 51 ++++---------
 .../factories/DefaultJobMasterServiceFactory.java  | 89 ++++++++++++++++++++++
 .../factories/JobMasterServiceFactory.java         | 34 +++++++++
 .../flink/runtime/minicluster/MiniCluster.java     |  3 +-
 .../runtime/blob/FailingPermanentBlobService.java  | 41 ++++++++++
 .../flink/runtime/dispatcher/DispatcherHATest.java |  7 +-
 .../flink/runtime/dispatcher/DispatcherTest.java   | 17 ++---
 .../dispatcher/TestingJobManagerRunnerFactory.java |  6 +-
 .../dispatcher/ZooKeeperHADispatcherTest.java      |  2 +-
 .../runtime/jobmaster/JobManagerRunnerTest.java    | 71 +++++++----------
 .../runtime/jobmaster/TestingJobMasterService.java | 75 ++++++++++++++++++
 .../factories/TestingJobMasterFactory.java         | 37 +++++++++
 17 files changed, 455 insertions(+), 158 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DefaultJobManagerRunnerFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DefaultJobManagerRunnerFactory.java
new file mode 100644
index 0000000..97afe24
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DefaultJobManagerRunnerFactory.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobmaster.JobManagerRunner;
+import org.apache.flink.runtime.jobmaster.JobManagerSharedServices;
+import org.apache.flink.runtime.jobmaster.JobMasterConfiguration;
+import org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory;
+import org.apache.flink.runtime.jobmaster.factories.JobManagerJobMetricGroupFactory;
+import org.apache.flink.runtime.jobmaster.factories.JobMasterServiceFactory;
+import org.apache.flink.runtime.jobmaster.slotpool.DefaultSlotPoolFactory;
+import org.apache.flink.runtime.jobmaster.slotpool.SlotPoolFactory;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcService;
+
+/**
+ * Singleton default factory for {@link JobManagerRunner}.
+ */
+public enum DefaultJobManagerRunnerFactory implements JobManagerRunnerFactory {
+	INSTANCE;
+
+	@Override
+	public JobManagerRunner createJobManagerRunner(
+			JobGraph jobGraph,
+			Configuration configuration,
+			RpcService rpcService,
+			HighAvailabilityServices highAvailabilityServices,
+			HeartbeatServices heartbeatServices,
+			JobManagerSharedServices jobManagerServices,
+			JobManagerJobMetricGroupFactory jobManagerJobMetricGroupFactory,
+			FatalErrorHandler fatalErrorHandler) throws Exception {
+
+		final JobMasterConfiguration jobMasterConfiguration = JobMasterConfiguration.fromConfiguration(configuration);
+
+		final SlotPoolFactory slotPoolFactory = DefaultSlotPoolFactory.fromConfiguration(
+			configuration,
+			rpcService);
+
+		final JobMasterServiceFactory jobMasterFactory = new DefaultJobMasterServiceFactory(
+			jobMasterConfiguration,
+			slotPoolFactory,
+			rpcService,
+			highAvailabilityServices,
+			jobManagerServices,
+			heartbeatServices,
+			jobManagerJobMetricGroupFactory,
+			fatalErrorHandler);
+
+		return new JobManagerRunner(
+			jobGraph,
+			jobMasterFactory,
+			highAvailabilityServices,
+			jobManagerServices.getLibraryCacheManager(),
+			jobManagerServices.getScheduledExecutorService(),
+			fatalErrorHandler);
+	}
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
index 81b826e..a4651fb 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
@@ -45,7 +45,6 @@ import org.apache.flink.runtime.jobmaster.JobNotFinishedException;
 import org.apache.flink.runtime.jobmaster.JobResult;
 import org.apache.flink.runtime.jobmaster.RescalingBehaviour;
 import org.apache.flink.runtime.jobmaster.factories.DefaultJobManagerJobMetricGroupFactory;
-import org.apache.flink.runtime.jobmaster.factories.JobManagerJobMetricGroupFactory;
 import org.apache.flink.runtime.leaderelection.LeaderContender;
 import org.apache.flink.runtime.leaderelection.LeaderElectionService;
 import org.apache.flink.runtime.messages.Acknowledge;
@@ -308,7 +307,6 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme
 		final CompletableFuture<JobManagerRunner> jobManagerRunnerFuture = CompletableFuture.supplyAsync(
 			CheckedSupplier.unchecked(() ->
 				jobManagerRunnerFactory.createJobManagerRunner(
-					ResourceID.generate(),
 					jobGraph,
 					configuration,
 					rpcService,
@@ -1009,55 +1007,4 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme
 			}
 		});
 	}
-
-	//------------------------------------------------------
-	// Factories
-	//------------------------------------------------------
-
-	/**
-	 * Factory for a {@link JobManagerRunner}.
-	 */
-	@FunctionalInterface
-	public interface JobManagerRunnerFactory {
-		JobManagerRunner createJobManagerRunner(
-			ResourceID resourceId,
-			JobGraph jobGraph,
-			Configuration configuration,
-			RpcService rpcService,
-			HighAvailabilityServices highAvailabilityServices,
-			HeartbeatServices heartbeatServices,
-			JobManagerSharedServices jobManagerServices,
-			JobManagerJobMetricGroupFactory jobManagerJobMetricGroupFactory,
-			FatalErrorHandler fatalErrorHandler) throws Exception;
-	}
-
-	/**
-	 * Singleton default factory for {@link JobManagerRunner}.
-	 */
-	public enum DefaultJobManagerRunnerFactory implements JobManagerRunnerFactory {
-		INSTANCE;
-
-		@Override
-		public JobManagerRunner createJobManagerRunner(
-				ResourceID resourceId,
-				JobGraph jobGraph,
-				Configuration configuration,
-				RpcService rpcService,
-				HighAvailabilityServices highAvailabilityServices,
-				HeartbeatServices heartbeatServices,
-				JobManagerSharedServices jobManagerServices,
-				JobManagerJobMetricGroupFactory jobManagerJobMetricGroupFactory,
-				FatalErrorHandler fatalErrorHandler) throws Exception {
-			return new JobManagerRunner(
-				resourceId,
-				jobGraph,
-				configuration,
-				rpcService,
-				highAvailabilityServices,
-				heartbeatServices,
-				jobManagerServices,
-				jobManagerJobMetricGroupFactory,
-				fatalErrorHandler);
-		}
-	}
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/JobDispatcherFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/JobDispatcherFactory.java
index a2a6930..16f2e64 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/JobDispatcherFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/JobDispatcherFactory.java
@@ -75,7 +75,7 @@ public class JobDispatcherFactory implements DispatcherFactory<MiniDispatcher> {
 			jobManagerMetricGroup,
 			metricQueryServicePath,
 			archivedExecutionGraphStore,
-			Dispatcher.DefaultJobManagerRunnerFactory.INSTANCE,
+			DefaultJobManagerRunnerFactory.INSTANCE,
 			fatalErrorHandler,
 			historyServerArchivist,
 			jobGraph,
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/JobManagerRunnerFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/JobManagerRunnerFactory.java
new file mode 100644
index 0000000..9caf64d
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/JobManagerRunnerFactory.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobmaster.JobManagerRunner;
+import org.apache.flink.runtime.jobmaster.JobManagerSharedServices;
+import org.apache.flink.runtime.jobmaster.factories.JobManagerJobMetricGroupFactory;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcService;
+
+/**
+ * Factory for a {@link JobManagerRunner}.
+ */
+@FunctionalInterface
+public interface JobManagerRunnerFactory {
+
+	JobManagerRunner createJobManagerRunner(
+		JobGraph jobGraph,
+		Configuration configuration,
+		RpcService rpcService,
+		HighAvailabilityServices highAvailabilityServices,
+		HeartbeatServices heartbeatServices,
+		JobManagerSharedServices jobManagerServices,
+		JobManagerJobMetricGroupFactory jobManagerJobMetricGroupFactory,
+		FatalErrorHandler fatalErrorHandler) throws Exception;
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/SessionDispatcherFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/SessionDispatcherFactory.java
index 1bcf04e..4d0ec40 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/SessionDispatcherFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/SessionDispatcherFactory.java
@@ -60,7 +60,7 @@ public enum SessionDispatcherFactory implements DispatcherFactory<Dispatcher> {
 			jobManagerMetricGroup,
 			metricQueryServicePath,
 			archivedExecutionGraphStore,
-			Dispatcher.DefaultJobManagerRunnerFactory.INSTANCE,
+			DefaultJobManagerRunnerFactory.INSTANCE,
 			fatalErrorHandler,
 			historyServerArchivist);
 	}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
index 1ac2f80..ed79455 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
@@ -19,25 +19,19 @@
 package org.apache.flink.runtime.jobmaster;
 
 import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.client.JobExecutionException;
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
 import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
-import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.highavailability.RunningJobsRegistry;
 import org.apache.flink.runtime.highavailability.RunningJobsRegistry.JobSchedulingStatus;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobmanager.OnCompletionActions;
-import org.apache.flink.runtime.jobmaster.factories.JobManagerJobMetricGroupFactory;
-import org.apache.flink.runtime.jobmaster.slotpool.DefaultSlotPoolFactory;
-import org.apache.flink.runtime.jobmaster.slotpool.SlotPoolFactory;
+import org.apache.flink.runtime.jobmaster.factories.JobMasterServiceFactory;
 import org.apache.flink.runtime.leaderelection.LeaderContender;
 import org.apache.flink.runtime.leaderelection.LeaderElectionService;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
-import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.util.AutoCloseableAsync;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
@@ -48,6 +42,7 @@ import org.slf4j.LoggerFactory;
 import java.io.IOException;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -74,7 +69,9 @@ public class JobManagerRunner implements LeaderContender, OnCompletionActions, A
 	/** Leader election for this job. */
 	private final LeaderElectionService leaderElectionService;
 
-	private final JobManagerSharedServices jobManagerSharedServices;
+	private final LibraryCacheManager libraryCacheManager;
+
+	private final Executor executor;
 
 	private final JobMasterService jobMasterService;
 
@@ -99,14 +96,11 @@ public class JobManagerRunner implements LeaderContender, OnCompletionActions, A
 	 *                   required services could not be started, or the Job could not be initialized.
 	 */
 	public JobManagerRunner(
-			final ResourceID resourceId,
 			final JobGraph jobGraph,
-			final Configuration configuration,
-			final RpcService rpcService,
+			final JobMasterServiceFactory jobMasterFactory,
 			final HighAvailabilityServices haServices,
-			final HeartbeatServices heartbeatServices,
-			final JobManagerSharedServices jobManagerSharedServices,
-			final JobManagerJobMetricGroupFactory jobManagerJobMetricGroupFactory,
+			final LibraryCacheManager libraryCacheManager,
+			final Executor executor,
 			final FatalErrorHandler fatalErrorHandler) throws Exception {
 
 		this.resultFuture = new CompletableFuture<>();
@@ -115,13 +109,13 @@ public class JobManagerRunner implements LeaderContender, OnCompletionActions, A
 		// make sure we cleanly shut down out JobManager services if initialization fails
 		try {
 			this.jobGraph = checkNotNull(jobGraph);
-			this.jobManagerSharedServices = checkNotNull(jobManagerSharedServices);
+			this.libraryCacheManager = checkNotNull(libraryCacheManager);
+			this.executor = checkNotNull(executor);
 			this.fatalErrorHandler = checkNotNull(fatalErrorHandler);
 
 			checkArgument(jobGraph.getNumberOfVertices() > 0, "The given job is empty");
 
 			// libraries and class loader first
-			final LibraryCacheManager libraryCacheManager = jobManagerSharedServices.getLibraryCacheManager();
 			try {
 				libraryCacheManager.registerJob(
 						jobGraph.getJobID(), jobGraph.getUserJarBlobKeys(), jobGraph.getClasspaths());
@@ -138,28 +132,10 @@ public class JobManagerRunner implements LeaderContender, OnCompletionActions, A
 			this.runningJobsRegistry = haServices.getRunningJobsRegistry();
 			this.leaderElectionService = haServices.getJobManagerLeaderElectionService(jobGraph.getJobID());
 
-			final JobMasterConfiguration jobMasterConfiguration = JobMasterConfiguration.fromConfiguration(configuration);
-
 			this.leaderGatewayFuture = new CompletableFuture<>();
 
-			final SlotPoolFactory slotPoolFactory = DefaultSlotPoolFactory.fromConfiguration(
-				configuration,
-				rpcService);
-
 			// now start the JobManager
-			this.jobMasterService = new JobMaster(
-				rpcService,
-				jobMasterConfiguration,
-				resourceId,
-				jobGraph,
-				haServices,
-				slotPoolFactory,
-				jobManagerSharedServices,
-				heartbeatServices,
-				jobManagerJobMetricGroupFactory,
-				this,
-				fatalErrorHandler,
-				userCodeLoader);
+			this.jobMasterService = jobMasterFactory.createJobMasterService(jobGraph, this, userCodeLoader);
 		}
 		catch (Throwable t) {
 			terminationFuture.completeExceptionally(t);
@@ -217,7 +193,6 @@ public class JobManagerRunner implements LeaderContender, OnCompletionActions, A
 							throwable = ExceptionUtils.firstOrSuppressed(t, ExceptionUtils.stripCompletionException(throwable));
 						}
 
-						final LibraryCacheManager libraryCacheManager = jobManagerSharedServices.getLibraryCacheManager();
 						libraryCacheManager.unregisterJob(jobGraph.getJobID());
 
 						if (throwable != null) {
@@ -333,7 +308,7 @@ public class JobManagerRunner implements LeaderContender, OnCompletionActions, A
 						confirmLeaderSessionIdIfStillLeader(leaderSessionId, currentLeaderGatewayFuture);
 					}
 				},
-				jobManagerSharedServices.getScheduledExecutorService());
+				executor);
 		}
 	}
 
@@ -367,7 +342,7 @@ public class JobManagerRunner implements LeaderContender, OnCompletionActions, A
 						handleJobManagerRunnerError(new FlinkException("Could not suspend the job manager.", throwable));
 					}
 				},
-				jobManagerSharedServices.getScheduledExecutorService());
+				executor);
 		}
 	}
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/factories/DefaultJobMasterServiceFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/factories/DefaultJobMasterServiceFactory.java
new file mode 100644
index 0000000..58aa948
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/factories/DefaultJobMasterServiceFactory.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.factories;
+
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobmanager.OnCompletionActions;
+import org.apache.flink.runtime.jobmaster.JobManagerSharedServices;
+import org.apache.flink.runtime.jobmaster.JobMaster;
+import org.apache.flink.runtime.jobmaster.JobMasterConfiguration;
+import org.apache.flink.runtime.jobmaster.slotpool.SlotPoolFactory;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcService;
+
+/**
+ * Default implementation of the {@link JobMasterServiceFactory}.
+ */
+public class DefaultJobMasterServiceFactory implements JobMasterServiceFactory {
+
+	private final JobMasterConfiguration jobMasterConfiguration;
+
+	private final SlotPoolFactory slotPoolFactory;
+
+	private final RpcService rpcService;
+
+	private final HighAvailabilityServices haServices;
+
+	private final JobManagerSharedServices jobManagerSharedServices;
+
+	private final HeartbeatServices heartbeatServices;
+
+	private final JobManagerJobMetricGroupFactory jobManagerJobMetricGroupFactory;
+
+	private final FatalErrorHandler fatalErrorHandler;
+
+	public DefaultJobMasterServiceFactory(
+			JobMasterConfiguration jobMasterConfiguration,
+			SlotPoolFactory slotPoolFactory,
+			RpcService rpcService,
+			HighAvailabilityServices haServices,
+			JobManagerSharedServices jobManagerSharedServices,
+			HeartbeatServices heartbeatServices,
+			JobManagerJobMetricGroupFactory jobManagerJobMetricGroupFactory,
+			FatalErrorHandler fatalErrorHandler) {
+		this.jobMasterConfiguration = jobMasterConfiguration;
+		this.slotPoolFactory = slotPoolFactory;
+		this.rpcService = rpcService;
+		this.haServices = haServices;
+		this.jobManagerSharedServices = jobManagerSharedServices;
+		this.heartbeatServices = heartbeatServices;
+		this.jobManagerJobMetricGroupFactory = jobManagerJobMetricGroupFactory;
+		this.fatalErrorHandler = fatalErrorHandler;
+	}
+
+	@Override
+	public JobMaster createJobMasterService(JobGraph jobGraph, OnCompletionActions jobCompletionActions, ClassLoader userCodeClassloader) throws Exception {
+		return new JobMaster(
+			rpcService,
+			jobMasterConfiguration,
+			ResourceID.generate(),
+			jobGraph,
+			haServices,
+			slotPoolFactory,
+			jobManagerSharedServices,
+			heartbeatServices,
+			jobManagerJobMetricGroupFactory,
+			jobCompletionActions,
+			fatalErrorHandler,
+			userCodeClassloader);
+	}
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/factories/JobMasterServiceFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/factories/JobMasterServiceFactory.java
new file mode 100644
index 0000000..e51a889
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/factories/JobMasterServiceFactory.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.factories;
+
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobmanager.OnCompletionActions;
+import org.apache.flink.runtime.jobmaster.JobMasterService;
+
+/**
+ * Factory for a {@link JobMasterService}.
+ */
+public interface JobMasterServiceFactory {
+
+	JobMasterService createJobMasterService(
+		JobGraph jobGraph,
+		OnCompletionActions jobCompletionActions,
+		ClassLoader userCodeClassloader) throws Exception;
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
index f25c73c..b17432b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
@@ -37,6 +37,7 @@ import org.apache.flink.runtime.client.JobStatusMessage;
 import org.apache.flink.runtime.clusterframework.FlinkResourceManager;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.dispatcher.DefaultJobManagerRunnerFactory;
 import org.apache.flink.runtime.dispatcher.Dispatcher;
 import org.apache.flink.runtime.dispatcher.DispatcherGateway;
 import org.apache.flink.runtime.dispatcher.DispatcherId;
@@ -400,7 +401,7 @@ public class MiniCluster implements JobExecutorService, AutoCloseableAsync {
 					jobManagerMetricGroup,
 					metricRegistry.getMetricQueryServicePath(),
 					new MemoryArchivedExecutionGraphStore(),
-					Dispatcher.DefaultJobManagerRunnerFactory.INSTANCE,
+					DefaultJobManagerRunnerFactory.INSTANCE,
 					new ShutDownFatalErrorHandler(),
 					historyServerArchivist);
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/FailingPermanentBlobService.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/FailingPermanentBlobService.java
new file mode 100644
index 0000000..06326d5
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/FailingPermanentBlobService.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.blob;
+
+import org.apache.flink.api.common.JobID;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+
+/**
+ * Testing implementation of {@link PermanentBlobService} which always fails the
+ * {@link #getFile(JobID, PermanentBlobKey)} call.
+ */
+public enum FailingPermanentBlobService implements PermanentBlobService {
+	INSTANCE;
+
+	@Override
+	public File getFile(JobID jobId, PermanentBlobKey key) throws IOException {
+		throw new FileNotFoundException(String.format("Could not find file for blob key %s belonging to job %s.", key, jobId));
+	}
+
+	@Override
+	public void close() {}
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherHATest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherHATest.java
index b965e71..384704d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherHATest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherHATest.java
@@ -293,11 +293,6 @@ public class DispatcherHATest extends TestLogger {
 		return new TestingJobManagerRunnerFactory(new CompletableFuture<>(), new CompletableFuture<>(), CompletableFuture.completedFuture(null));
 	}
 
-	@Nonnull
-	private HATestingDispatcher createDispatcherWithJobManagerRunnerFactory(HighAvailabilityServices highAvailabilityServices, Dispatcher.JobManagerRunnerFactory jobManagerRunnerFactory) throws Exception {
-		return createDispatcher(highAvailabilityServices, null, jobManagerRunnerFactory);
-	}
-
 	private HATestingDispatcher createDispatcher(HighAvailabilityServices haServices) throws Exception {
 		return createDispatcher(
 			haServices,
@@ -309,7 +304,7 @@ public class DispatcherHATest extends TestLogger {
 	private HATestingDispatcher createDispatcher(
 		HighAvailabilityServices highAvailabilityServices,
 		@Nullable Queue<DispatcherId> fencingTokens,
-		Dispatcher.JobManagerRunnerFactory jobManagerRunnerFactory) throws Exception {
+		JobManagerRunnerFactory jobManagerRunnerFactory) throws Exception {
 		final Configuration configuration = new Configuration();
 
 		return new HATestingDispatcher(
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
index e10b8bc..fa4705a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
@@ -28,7 +28,6 @@ import org.apache.flink.runtime.blob.VoidBlobStore;
 import org.apache.flink.runtime.checkpoint.Checkpoints;
 import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
 import org.apache.flink.runtime.checkpoint.savepoint.SavepointV2;
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
 import org.apache.flink.runtime.executiongraph.ErrorInfo;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
@@ -201,7 +200,7 @@ public class DispatcherTest extends TestLogger {
 	}
 
 	@Nonnull
-	private TestingDispatcher createAndStartDispatcher(HeartbeatServices heartbeatServices, TestingHighAvailabilityServices haServices, Dispatcher.JobManagerRunnerFactory jobManagerRunnerFactory) throws Exception {
+	private TestingDispatcher createAndStartDispatcher(HeartbeatServices heartbeatServices, TestingHighAvailabilityServices haServices, JobManagerRunnerFactory jobManagerRunnerFactory) throws Exception {
 		final TestingDispatcher dispatcher = createDispatcher(heartbeatServices, haServices, jobManagerRunnerFactory);
 		dispatcher.start();
 
@@ -209,7 +208,7 @@ public class DispatcherTest extends TestLogger {
 	}
 
 	@Nonnull
-	private TestingDispatcher createDispatcher(HeartbeatServices heartbeatServices, TestingHighAvailabilityServices haServices, Dispatcher.JobManagerRunnerFactory jobManagerRunnerFactory) throws Exception {
+	private TestingDispatcher createDispatcher(HeartbeatServices heartbeatServices, TestingHighAvailabilityServices haServices, JobManagerRunnerFactory jobManagerRunnerFactory) throws Exception {
 		return new TestingDispatcher(
 			rpcService,
 			Dispatcher.DISPATCHER_NAME + '_' + name.getMethodName(),
@@ -668,7 +667,7 @@ public class DispatcherTest extends TestLogger {
 		final InMemorySubmittedJobGraphStore submittedJobGraphStore = new InMemorySubmittedJobGraphStore();
 		haServices.setSubmittedJobGraphStore(submittedJobGraphStore);
 
-		dispatcher = createAndStartDispatcher(heartbeatServices, haServices, Dispatcher.DefaultJobManagerRunnerFactory.INSTANCE);
+		dispatcher = createAndStartDispatcher(heartbeatServices, haServices, DefaultJobManagerRunnerFactory.INSTANCE);
 
 		// grant leadership and submit a single job
 		final DispatcherId expectedDispatcherId = DispatcherId.generate();
@@ -697,10 +696,10 @@ public class DispatcherTest extends TestLogger {
 		}
 
 		@Override
-		public JobManagerRunner createJobManagerRunner(ResourceID resourceId, JobGraph jobGraph, Configuration configuration, RpcService rpcService, HighAvailabilityServices highAvailabilityServices, HeartbeatServices heartbeatServices, JobManagerSharedServices jobManagerSharedServices, JobManagerJobMetricGroupFactory jobManagerJobMetricGroupFactory, FatalErrorHandler fatalErrorHandler) throws Exception {
+		public JobManagerRunner createJobManagerRunner(JobGraph jobGraph, Configuration configuration, RpcService rpcService, HighAvailabilityServices highAvailabilityServices, HeartbeatServices heartbeatServices, JobManagerSharedServices jobManagerSharedServices, JobManagerJobMetricGroupFactory jobManagerJobMetricGroupFactory, FatalErrorHandler fatalErrorHandler) throws Exception {
 			jobManagerRunnerCreationLatch.run();
 
-			return super.createJobManagerRunner(resourceId, jobGraph, configuration, rpcService, highAvailabilityServices, heartbeatServices, jobManagerSharedServices, jobManagerJobMetricGroupFactory, fatalErrorHandler);
+			return super.createJobManagerRunner(jobGraph, configuration, rpcService, highAvailabilityServices, heartbeatServices, jobManagerSharedServices, jobManagerJobMetricGroupFactory, fatalErrorHandler);
 		}
 	}
 
@@ -735,7 +734,7 @@ public class DispatcherTest extends TestLogger {
 		}
 	}
 
-	private static final class ExpectedJobIdJobManagerRunnerFactory implements Dispatcher.JobManagerRunnerFactory {
+	private static final class ExpectedJobIdJobManagerRunnerFactory implements JobManagerRunnerFactory {
 
 		private final JobID expectedJobId;
 
@@ -748,7 +747,6 @@ public class DispatcherTest extends TestLogger {
 
 		@Override
 		public JobManagerRunner createJobManagerRunner(
-				ResourceID resourceId,
 				JobGraph jobGraph,
 				Configuration configuration,
 				RpcService rpcService,
@@ -761,8 +759,7 @@ public class DispatcherTest extends TestLogger {
 
 			createdJobManagerRunnerLatch.countDown();
 
-			return Dispatcher.DefaultJobManagerRunnerFactory.INSTANCE.createJobManagerRunner(
-				resourceId,
+			return DefaultJobManagerRunnerFactory.INSTANCE.createJobManagerRunner(
 				jobGraph,
 				configuration,
 				rpcService,
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingJobManagerRunnerFactory.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingJobManagerRunnerFactory.java
index 30e4af6..c19038c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingJobManagerRunnerFactory.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingJobManagerRunnerFactory.java
@@ -19,7 +19,6 @@
 package org.apache.flink.runtime.dispatcher;
 
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
@@ -38,10 +37,10 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
 /**
- * {@link org.apache.flink.runtime.dispatcher.Dispatcher.JobManagerRunnerFactory} implementation for
+ * {@link JobManagerRunnerFactory} implementation for
  * testing purposes.
  */
-class TestingJobManagerRunnerFactory implements Dispatcher.JobManagerRunnerFactory {
+class TestingJobManagerRunnerFactory implements JobManagerRunnerFactory {
 
 	private final CompletableFuture<JobGraph> jobGraphFuture;
 	private final CompletableFuture<ArchivedExecutionGraph> resultFuture;
@@ -68,7 +67,6 @@ class TestingJobManagerRunnerFactory implements Dispatcher.JobManagerRunnerFacto
 
 	@Override
 	public JobManagerRunner createJobManagerRunner(
-			ResourceID resourceId,
 			JobGraph jobGraph,
 			Configuration configuration,
 			RpcService rpcService,
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/ZooKeeperHADispatcherTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/ZooKeeperHADispatcherTest.java
index 9c23f9d..d3d80d4 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/ZooKeeperHADispatcherTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/ZooKeeperHADispatcherTest.java
@@ -325,7 +325,7 @@ public class ZooKeeperHADispatcherTest extends TestLogger {
 	}
 
 	@Nonnull
-	private TestingDispatcher createDispatcher(HighAvailabilityServices highAvailabilityServices, Dispatcher.JobManagerRunnerFactory jobManagerRunnerFactory) throws Exception {
+	private TestingDispatcher createDispatcher(HighAvailabilityServices highAvailabilityServices, JobManagerRunnerFactory jobManagerRunnerFactory) throws Exception {
 		return new TestingDispatcher(
 			rpcService,
 			Dispatcher.DISPATCHER_NAME + '_' + name.getMethodName() + UUID.randomUUID(),
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerTest.java
index da5a9ff..584307b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerTest.java
@@ -19,24 +19,23 @@
 package org.apache.flink.runtime.jobmaster;
 
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.configuration.BlobServerOptions;
-import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.blob.FailingPermanentBlobService;
 import org.apache.flink.runtime.blob.VoidPermanentBlobService;
 import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
 import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders;
+import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
 import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
-import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertex;
-import org.apache.flink.runtime.jobmaster.factories.UnregisteredJobManagerJobMetricGroupFactory;
+import org.apache.flink.runtime.jobmaster.factories.JobMasterServiceFactory;
+import org.apache.flink.runtime.jobmaster.factories.TestingJobMasterFactory;
 import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
 import org.apache.flink.runtime.leaderretrieval.SettableLeaderRetrievalService;
 import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder;
-import org.apache.flink.runtime.rpc.TestingRpcService;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.runtime.testtasks.NoOpInvokable;
 import org.apache.flink.runtime.util.TestingFatalErrorHandler;
 import org.apache.flink.util.ExceptionUtils;
@@ -68,30 +67,26 @@ public class JobManagerRunnerTest extends TestLogger {
 	@ClassRule
 	public static TemporaryFolder temporaryFolder = new TemporaryFolder();
 
-	private static Configuration configuration;
-
-	private static TestingRpcService rpcService;
-
-	private static HeartbeatServices heartbeatServices = new HeartbeatServices(1000L, 1000L);
-
-	private static JobManagerSharedServices jobManagerSharedServices;
-
 	private static JobGraph jobGraph;
 
 	private static ArchivedExecutionGraph archivedExecutionGraph;
 
+	private static LibraryCacheManager libraryCacheManager;
+
+	private static JobMasterServiceFactory jobMasterFactory;
+
 	private TestingHighAvailabilityServices haServices;
 
 	private TestingFatalErrorHandler fatalErrorHandler;
 
 	@BeforeClass
-	public static void setupClass() throws Exception {
-		configuration = new Configuration();
-		rpcService = new TestingRpcService();
-
-		configuration.setString(BlobServerOptions.STORAGE_DIRECTORY, temporaryFolder.newFolder().getAbsolutePath());
+	public static void setupClass() {
+		libraryCacheManager = new BlobLibraryCacheManager(
+			FailingPermanentBlobService.INSTANCE,
+			FlinkUserCodeClassLoaders.ResolveOrder.CHILD_FIRST,
+			new String[]{});
 
-		jobManagerSharedServices = new TestingJobManagerSharedServicesBuilder().build();
+		jobMasterFactory = TestingJobMasterFactory.INSTANCE;
 
 		final JobVertex jobVertex = new JobVertex("Test vertex");
 		jobVertex.setInvokableClass(NoOpInvokable.class);
@@ -119,13 +114,9 @@ public class JobManagerRunnerTest extends TestLogger {
 	}
 
 	@AfterClass
-	public static void tearDownClass() throws Exception {
-		if (jobManagerSharedServices != null) {
-			jobManagerSharedServices.shutdown();
-		}
-
-		if (rpcService != null) {
-			rpcService.stopService();
+	public static void tearDownClass() {
+		if (libraryCacheManager != null) {
+			libraryCacheManager.shutdown();
 		}
 	}
 
@@ -202,10 +193,7 @@ public class JobManagerRunnerTest extends TestLogger {
 			VoidPermanentBlobService.INSTANCE,
 			FlinkUserCodeClassLoaders.ResolveOrder.CHILD_FIRST,
 			new String[]{});
-		final JobManagerSharedServices jobManagerSharedServices = new TestingJobManagerSharedServicesBuilder()
-			.setLibraryCacheManager(libraryCacheManager)
-			.build();
-		final JobManagerRunner jobManagerRunner = createJobManagerRunner(jobManagerSharedServices);
+		final JobManagerRunner jobManagerRunner = createJobManagerRunner(libraryCacheManager);
 
 		try {
 			jobManagerRunner.start();
@@ -222,21 +210,18 @@ public class JobManagerRunnerTest extends TestLogger {
 	}
 
 	@Nonnull
-	private JobManagerRunner createJobManagerRunner() throws Exception {
-		return createJobManagerRunner(jobManagerSharedServices);
-	}
-
-	@Nonnull
-	private JobManagerRunner createJobManagerRunner(final JobManagerSharedServices jobManagerSharedServices) throws Exception {
+	private JobManagerRunner createJobManagerRunner(LibraryCacheManager libraryCacheManager) throws Exception {
 		return new JobManagerRunner(
-			ResourceID.generate(),
 			jobGraph,
-			configuration,
-			rpcService,
+			jobMasterFactory,
 			haServices,
-			heartbeatServices,
-			jobManagerSharedServices,
-			UnregisteredJobManagerJobMetricGroupFactory.INSTANCE,
+			libraryCacheManager,
+			TestingUtils.defaultExecutor(),
 			fatalErrorHandler);
 	}
+
+	@Nonnull
+	private JobManagerRunner createJobManagerRunner() throws Exception {
+		return createJobManagerRunner(libraryCacheManager);
+	}
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingJobMasterService.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingJobMasterService.java
new file mode 100644
index 0000000..7e65da1
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingJobMasterService.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster;
+
+import org.apache.flink.runtime.jobmaster.utils.TestingJobMasterGatewayBuilder;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nonnull;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Implementation of the {@link JobMasterService} for testing purposes.
+ */
+public class TestingJobMasterService implements JobMasterService {
+
+	@Nonnull
+	private final String address;
+
+	private JobMasterGateway jobMasterGateway;
+
+	public TestingJobMasterService(@Nonnull String address) {
+		this.address = address;
+	}
+
+	public TestingJobMasterService() {
+		this("localhost");
+	}
+
+	@Override
+	public CompletableFuture<Acknowledge> start(JobMasterId jobMasterId) {
+			jobMasterGateway = new TestingJobMasterGatewayBuilder().build();
+			return CompletableFuture.completedFuture(Acknowledge.get());
+	}
+
+	@Override
+	public CompletableFuture<Acknowledge> suspend(Exception cause) {
+		jobMasterGateway = null;
+		return CompletableFuture.completedFuture(Acknowledge.get());
+	}
+
+	@Override
+	public JobMasterGateway getGateway() {
+		Preconditions.checkNotNull(jobMasterGateway, "TestingJobMasterService has not been started yet.");
+		return jobMasterGateway;
+	}
+
+	@Override
+	public String getAddress() {
+		return address;
+	}
+
+	@Override
+	public CompletableFuture<Void> closeAsync() {
+		jobMasterGateway = null;
+		return CompletableFuture.completedFuture(null);
+	}
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/factories/TestingJobMasterFactory.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/factories/TestingJobMasterFactory.java
new file mode 100644
index 0000000..ba7f1c8
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/factories/TestingJobMasterFactory.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.factories;
+
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobmanager.OnCompletionActions;
+import org.apache.flink.runtime.jobmaster.JobMaster;
+import org.apache.flink.runtime.jobmaster.JobMasterService;
+import org.apache.flink.runtime.jobmaster.TestingJobMasterService;
+
+/**
+ * Testing implementation of the {@link JobMasterServiceFactory} which returns a {@link JobMaster} mock.
+ */
+public enum TestingJobMasterFactory implements JobMasterServiceFactory {
+	INSTANCE;
+
+	@Override
+	public JobMasterService createJobMasterService(JobGraph jobGraph, OnCompletionActions jobCompletionActions, ClassLoader userCodeClassloader) {
+		return new TestingJobMasterService();
+	}
+}