You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2019/01/30 11:11:51 UTC
[flink] 01/02: [FLINK-11415] Introduce JobMasterServiceFactory
This is an automated email from the ASF dual-hosted git repository.
trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit cae914f82e6b115be5c362a874160ebb7cea8eec
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Tue Jan 22 16:36:23 2019 +0100
[FLINK-11415] Introduce JobMasterServiceFactory
The JobMasterServiceFactory controls how the JobMasterService is constructed by
the JobManagerRunner. This allows for an easier testing of this component.
This closes #7564.
---
.../dispatcher/DefaultJobManagerRunnerFactory.java | 77 +++++++++++++++++++
.../flink/runtime/dispatcher/Dispatcher.java | 53 -------------
.../runtime/dispatcher/JobDispatcherFactory.java | 2 +-
.../dispatcher/JobManagerRunnerFactory.java | 46 +++++++++++
.../dispatcher/SessionDispatcherFactory.java | 2 +-
.../flink/runtime/jobmaster/JobManagerRunner.java | 51 ++++---------
.../factories/DefaultJobMasterServiceFactory.java | 89 ++++++++++++++++++++++
.../factories/JobMasterServiceFactory.java | 34 +++++++++
.../flink/runtime/minicluster/MiniCluster.java | 3 +-
.../runtime/blob/FailingPermanentBlobService.java | 41 ++++++++++
.../flink/runtime/dispatcher/DispatcherHATest.java | 7 +-
.../flink/runtime/dispatcher/DispatcherTest.java | 17 ++---
.../dispatcher/TestingJobManagerRunnerFactory.java | 6 +-
.../dispatcher/ZooKeeperHADispatcherTest.java | 2 +-
.../runtime/jobmaster/JobManagerRunnerTest.java | 71 +++++++----------
.../runtime/jobmaster/TestingJobMasterService.java | 75 ++++++++++++++++++
.../factories/TestingJobMasterFactory.java | 37 +++++++++
17 files changed, 455 insertions(+), 158 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DefaultJobManagerRunnerFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DefaultJobManagerRunnerFactory.java
new file mode 100644
index 0000000..97afe24
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DefaultJobManagerRunnerFactory.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobmaster.JobManagerRunner;
+import org.apache.flink.runtime.jobmaster.JobManagerSharedServices;
+import org.apache.flink.runtime.jobmaster.JobMasterConfiguration;
+import org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory;
+import org.apache.flink.runtime.jobmaster.factories.JobManagerJobMetricGroupFactory;
+import org.apache.flink.runtime.jobmaster.factories.JobMasterServiceFactory;
+import org.apache.flink.runtime.jobmaster.slotpool.DefaultSlotPoolFactory;
+import org.apache.flink.runtime.jobmaster.slotpool.SlotPoolFactory;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcService;
+
+/**
+ * Singleton default factory for {@link JobManagerRunner}.
+ */
+public enum DefaultJobManagerRunnerFactory implements JobManagerRunnerFactory {
+ INSTANCE;
+
+ @Override
+ public JobManagerRunner createJobManagerRunner(
+ JobGraph jobGraph,
+ Configuration configuration,
+ RpcService rpcService,
+ HighAvailabilityServices highAvailabilityServices,
+ HeartbeatServices heartbeatServices,
+ JobManagerSharedServices jobManagerServices,
+ JobManagerJobMetricGroupFactory jobManagerJobMetricGroupFactory,
+ FatalErrorHandler fatalErrorHandler) throws Exception {
+
+ final JobMasterConfiguration jobMasterConfiguration = JobMasterConfiguration.fromConfiguration(configuration);
+
+ final SlotPoolFactory slotPoolFactory = DefaultSlotPoolFactory.fromConfiguration(
+ configuration,
+ rpcService);
+
+ final JobMasterServiceFactory jobMasterFactory = new DefaultJobMasterServiceFactory(
+ jobMasterConfiguration,
+ slotPoolFactory,
+ rpcService,
+ highAvailabilityServices,
+ jobManagerServices,
+ heartbeatServices,
+ jobManagerJobMetricGroupFactory,
+ fatalErrorHandler);
+
+ return new JobManagerRunner(
+ jobGraph,
+ jobMasterFactory,
+ highAvailabilityServices,
+ jobManagerServices.getLibraryCacheManager(),
+ jobManagerServices.getScheduledExecutorService(),
+ fatalErrorHandler);
+ }
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
index 81b826e..a4651fb 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
@@ -45,7 +45,6 @@ import org.apache.flink.runtime.jobmaster.JobNotFinishedException;
import org.apache.flink.runtime.jobmaster.JobResult;
import org.apache.flink.runtime.jobmaster.RescalingBehaviour;
import org.apache.flink.runtime.jobmaster.factories.DefaultJobManagerJobMetricGroupFactory;
-import org.apache.flink.runtime.jobmaster.factories.JobManagerJobMetricGroupFactory;
import org.apache.flink.runtime.leaderelection.LeaderContender;
import org.apache.flink.runtime.leaderelection.LeaderElectionService;
import org.apache.flink.runtime.messages.Acknowledge;
@@ -308,7 +307,6 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme
final CompletableFuture<JobManagerRunner> jobManagerRunnerFuture = CompletableFuture.supplyAsync(
CheckedSupplier.unchecked(() ->
jobManagerRunnerFactory.createJobManagerRunner(
- ResourceID.generate(),
jobGraph,
configuration,
rpcService,
@@ -1009,55 +1007,4 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme
}
});
}
-
- //------------------------------------------------------
- // Factories
- //------------------------------------------------------
-
- /**
- * Factory for a {@link JobManagerRunner}.
- */
- @FunctionalInterface
- public interface JobManagerRunnerFactory {
- JobManagerRunner createJobManagerRunner(
- ResourceID resourceId,
- JobGraph jobGraph,
- Configuration configuration,
- RpcService rpcService,
- HighAvailabilityServices highAvailabilityServices,
- HeartbeatServices heartbeatServices,
- JobManagerSharedServices jobManagerServices,
- JobManagerJobMetricGroupFactory jobManagerJobMetricGroupFactory,
- FatalErrorHandler fatalErrorHandler) throws Exception;
- }
-
- /**
- * Singleton default factory for {@link JobManagerRunner}.
- */
- public enum DefaultJobManagerRunnerFactory implements JobManagerRunnerFactory {
- INSTANCE;
-
- @Override
- public JobManagerRunner createJobManagerRunner(
- ResourceID resourceId,
- JobGraph jobGraph,
- Configuration configuration,
- RpcService rpcService,
- HighAvailabilityServices highAvailabilityServices,
- HeartbeatServices heartbeatServices,
- JobManagerSharedServices jobManagerServices,
- JobManagerJobMetricGroupFactory jobManagerJobMetricGroupFactory,
- FatalErrorHandler fatalErrorHandler) throws Exception {
- return new JobManagerRunner(
- resourceId,
- jobGraph,
- configuration,
- rpcService,
- highAvailabilityServices,
- heartbeatServices,
- jobManagerServices,
- jobManagerJobMetricGroupFactory,
- fatalErrorHandler);
- }
- }
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/JobDispatcherFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/JobDispatcherFactory.java
index a2a6930..16f2e64 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/JobDispatcherFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/JobDispatcherFactory.java
@@ -75,7 +75,7 @@ public class JobDispatcherFactory implements DispatcherFactory<MiniDispatcher> {
jobManagerMetricGroup,
metricQueryServicePath,
archivedExecutionGraphStore,
- Dispatcher.DefaultJobManagerRunnerFactory.INSTANCE,
+ DefaultJobManagerRunnerFactory.INSTANCE,
fatalErrorHandler,
historyServerArchivist,
jobGraph,
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/JobManagerRunnerFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/JobManagerRunnerFactory.java
new file mode 100644
index 0000000..9caf64d
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/JobManagerRunnerFactory.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobmaster.JobManagerRunner;
+import org.apache.flink.runtime.jobmaster.JobManagerSharedServices;
+import org.apache.flink.runtime.jobmaster.factories.JobManagerJobMetricGroupFactory;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcService;
+
+/**
+ * Factory for a {@link JobManagerRunner}.
+ */
+@FunctionalInterface
+public interface JobManagerRunnerFactory {
+
+ JobManagerRunner createJobManagerRunner(
+ JobGraph jobGraph,
+ Configuration configuration,
+ RpcService rpcService,
+ HighAvailabilityServices highAvailabilityServices,
+ HeartbeatServices heartbeatServices,
+ JobManagerSharedServices jobManagerServices,
+ JobManagerJobMetricGroupFactory jobManagerJobMetricGroupFactory,
+ FatalErrorHandler fatalErrorHandler) throws Exception;
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/SessionDispatcherFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/SessionDispatcherFactory.java
index 1bcf04e..4d0ec40 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/SessionDispatcherFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/SessionDispatcherFactory.java
@@ -60,7 +60,7 @@ public enum SessionDispatcherFactory implements DispatcherFactory<Dispatcher> {
jobManagerMetricGroup,
metricQueryServicePath,
archivedExecutionGraphStore,
- Dispatcher.DefaultJobManagerRunnerFactory.INSTANCE,
+ DefaultJobManagerRunnerFactory.INSTANCE,
fatalErrorHandler,
historyServerArchivist);
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
index 1ac2f80..ed79455 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
@@ -19,25 +19,19 @@
package org.apache.flink.runtime.jobmaster;
import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.client.JobExecutionException;
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
-import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.highavailability.RunningJobsRegistry;
import org.apache.flink.runtime.highavailability.RunningJobsRegistry.JobSchedulingStatus;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobmanager.OnCompletionActions;
-import org.apache.flink.runtime.jobmaster.factories.JobManagerJobMetricGroupFactory;
-import org.apache.flink.runtime.jobmaster.slotpool.DefaultSlotPoolFactory;
-import org.apache.flink.runtime.jobmaster.slotpool.SlotPoolFactory;
+import org.apache.flink.runtime.jobmaster.factories.JobMasterServiceFactory;
import org.apache.flink.runtime.leaderelection.LeaderContender;
import org.apache.flink.runtime.leaderelection.LeaderElectionService;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
-import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.util.AutoCloseableAsync;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
@@ -48,6 +42,7 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -74,7 +69,9 @@ public class JobManagerRunner implements LeaderContender, OnCompletionActions, A
/** Leader election for this job. */
private final LeaderElectionService leaderElectionService;
- private final JobManagerSharedServices jobManagerSharedServices;
+ private final LibraryCacheManager libraryCacheManager;
+
+ private final Executor executor;
private final JobMasterService jobMasterService;
@@ -99,14 +96,11 @@ public class JobManagerRunner implements LeaderContender, OnCompletionActions, A
* required services could not be started, or the Job could not be initialized.
*/
public JobManagerRunner(
- final ResourceID resourceId,
final JobGraph jobGraph,
- final Configuration configuration,
- final RpcService rpcService,
+ final JobMasterServiceFactory jobMasterFactory,
final HighAvailabilityServices haServices,
- final HeartbeatServices heartbeatServices,
- final JobManagerSharedServices jobManagerSharedServices,
- final JobManagerJobMetricGroupFactory jobManagerJobMetricGroupFactory,
+ final LibraryCacheManager libraryCacheManager,
+ final Executor executor,
final FatalErrorHandler fatalErrorHandler) throws Exception {
this.resultFuture = new CompletableFuture<>();
@@ -115,13 +109,13 @@ public class JobManagerRunner implements LeaderContender, OnCompletionActions, A
// make sure we cleanly shut down out JobManager services if initialization fails
try {
this.jobGraph = checkNotNull(jobGraph);
- this.jobManagerSharedServices = checkNotNull(jobManagerSharedServices);
+ this.libraryCacheManager = checkNotNull(libraryCacheManager);
+ this.executor = checkNotNull(executor);
this.fatalErrorHandler = checkNotNull(fatalErrorHandler);
checkArgument(jobGraph.getNumberOfVertices() > 0, "The given job is empty");
// libraries and class loader first
- final LibraryCacheManager libraryCacheManager = jobManagerSharedServices.getLibraryCacheManager();
try {
libraryCacheManager.registerJob(
jobGraph.getJobID(), jobGraph.getUserJarBlobKeys(), jobGraph.getClasspaths());
@@ -138,28 +132,10 @@ public class JobManagerRunner implements LeaderContender, OnCompletionActions, A
this.runningJobsRegistry = haServices.getRunningJobsRegistry();
this.leaderElectionService = haServices.getJobManagerLeaderElectionService(jobGraph.getJobID());
- final JobMasterConfiguration jobMasterConfiguration = JobMasterConfiguration.fromConfiguration(configuration);
-
this.leaderGatewayFuture = new CompletableFuture<>();
- final SlotPoolFactory slotPoolFactory = DefaultSlotPoolFactory.fromConfiguration(
- configuration,
- rpcService);
-
// now start the JobManager
- this.jobMasterService = new JobMaster(
- rpcService,
- jobMasterConfiguration,
- resourceId,
- jobGraph,
- haServices,
- slotPoolFactory,
- jobManagerSharedServices,
- heartbeatServices,
- jobManagerJobMetricGroupFactory,
- this,
- fatalErrorHandler,
- userCodeLoader);
+ this.jobMasterService = jobMasterFactory.createJobMasterService(jobGraph, this, userCodeLoader);
}
catch (Throwable t) {
terminationFuture.completeExceptionally(t);
@@ -217,7 +193,6 @@ public class JobManagerRunner implements LeaderContender, OnCompletionActions, A
throwable = ExceptionUtils.firstOrSuppressed(t, ExceptionUtils.stripCompletionException(throwable));
}
- final LibraryCacheManager libraryCacheManager = jobManagerSharedServices.getLibraryCacheManager();
libraryCacheManager.unregisterJob(jobGraph.getJobID());
if (throwable != null) {
@@ -333,7 +308,7 @@ public class JobManagerRunner implements LeaderContender, OnCompletionActions, A
confirmLeaderSessionIdIfStillLeader(leaderSessionId, currentLeaderGatewayFuture);
}
},
- jobManagerSharedServices.getScheduledExecutorService());
+ executor);
}
}
@@ -367,7 +342,7 @@ public class JobManagerRunner implements LeaderContender, OnCompletionActions, A
handleJobManagerRunnerError(new FlinkException("Could not suspend the job manager.", throwable));
}
},
- jobManagerSharedServices.getScheduledExecutorService());
+ executor);
}
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/factories/DefaultJobMasterServiceFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/factories/DefaultJobMasterServiceFactory.java
new file mode 100644
index 0000000..58aa948
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/factories/DefaultJobMasterServiceFactory.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.factories;
+
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobmanager.OnCompletionActions;
+import org.apache.flink.runtime.jobmaster.JobManagerSharedServices;
+import org.apache.flink.runtime.jobmaster.JobMaster;
+import org.apache.flink.runtime.jobmaster.JobMasterConfiguration;
+import org.apache.flink.runtime.jobmaster.slotpool.SlotPoolFactory;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcService;
+
+/**
+ * Default implementation of the {@link JobMasterServiceFactory}.
+ */
+public class DefaultJobMasterServiceFactory implements JobMasterServiceFactory {
+
+ private final JobMasterConfiguration jobMasterConfiguration;
+
+ private final SlotPoolFactory slotPoolFactory;
+
+ private final RpcService rpcService;
+
+ private final HighAvailabilityServices haServices;
+
+ private final JobManagerSharedServices jobManagerSharedServices;
+
+ private final HeartbeatServices heartbeatServices;
+
+ private final JobManagerJobMetricGroupFactory jobManagerJobMetricGroupFactory;
+
+ private final FatalErrorHandler fatalErrorHandler;
+
+ public DefaultJobMasterServiceFactory(
+ JobMasterConfiguration jobMasterConfiguration,
+ SlotPoolFactory slotPoolFactory,
+ RpcService rpcService,
+ HighAvailabilityServices haServices,
+ JobManagerSharedServices jobManagerSharedServices,
+ HeartbeatServices heartbeatServices,
+ JobManagerJobMetricGroupFactory jobManagerJobMetricGroupFactory,
+ FatalErrorHandler fatalErrorHandler) {
+ this.jobMasterConfiguration = jobMasterConfiguration;
+ this.slotPoolFactory = slotPoolFactory;
+ this.rpcService = rpcService;
+ this.haServices = haServices;
+ this.jobManagerSharedServices = jobManagerSharedServices;
+ this.heartbeatServices = heartbeatServices;
+ this.jobManagerJobMetricGroupFactory = jobManagerJobMetricGroupFactory;
+ this.fatalErrorHandler = fatalErrorHandler;
+ }
+
+ @Override
+ public JobMaster createJobMasterService(JobGraph jobGraph, OnCompletionActions jobCompletionActions, ClassLoader userCodeClassloader) throws Exception {
+ return new JobMaster(
+ rpcService,
+ jobMasterConfiguration,
+ ResourceID.generate(),
+ jobGraph,
+ haServices,
+ slotPoolFactory,
+ jobManagerSharedServices,
+ heartbeatServices,
+ jobManagerJobMetricGroupFactory,
+ jobCompletionActions,
+ fatalErrorHandler,
+ userCodeClassloader);
+ }
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/factories/JobMasterServiceFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/factories/JobMasterServiceFactory.java
new file mode 100644
index 0000000..e51a889
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/factories/JobMasterServiceFactory.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.factories;
+
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobmanager.OnCompletionActions;
+import org.apache.flink.runtime.jobmaster.JobMasterService;
+
+/**
+ * Factory for a {@link JobMasterService}.
+ */
+public interface JobMasterServiceFactory {
+
+ JobMasterService createJobMasterService(
+ JobGraph jobGraph,
+ OnCompletionActions jobCompletionActions,
+ ClassLoader userCodeClassloader) throws Exception;
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
index f25c73c..b17432b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
@@ -37,6 +37,7 @@ import org.apache.flink.runtime.client.JobStatusMessage;
import org.apache.flink.runtime.clusterframework.FlinkResourceManager;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.dispatcher.DefaultJobManagerRunnerFactory;
import org.apache.flink.runtime.dispatcher.Dispatcher;
import org.apache.flink.runtime.dispatcher.DispatcherGateway;
import org.apache.flink.runtime.dispatcher.DispatcherId;
@@ -400,7 +401,7 @@ public class MiniCluster implements JobExecutorService, AutoCloseableAsync {
jobManagerMetricGroup,
metricRegistry.getMetricQueryServicePath(),
new MemoryArchivedExecutionGraphStore(),
- Dispatcher.DefaultJobManagerRunnerFactory.INSTANCE,
+ DefaultJobManagerRunnerFactory.INSTANCE,
new ShutDownFatalErrorHandler(),
historyServerArchivist);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/FailingPermanentBlobService.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/FailingPermanentBlobService.java
new file mode 100644
index 0000000..06326d5
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/FailingPermanentBlobService.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.blob;
+
+import org.apache.flink.api.common.JobID;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+
+/**
+ * Testing implementation of {@link PermanentBlobService} which always fails the
+ * {@link #getFile(JobID, PermanentBlobKey)} call.
+ */
+public enum FailingPermanentBlobService implements PermanentBlobService {
+ INSTANCE;
+
+ @Override
+ public File getFile(JobID jobId, PermanentBlobKey key) throws IOException {
+ throw new FileNotFoundException(String.format("Could not find file for blob key %s belonging to job %s.", key, jobId));
+ }
+
+ @Override
+ public void close() {}
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherHATest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherHATest.java
index b965e71..384704d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherHATest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherHATest.java
@@ -293,11 +293,6 @@ public class DispatcherHATest extends TestLogger {
return new TestingJobManagerRunnerFactory(new CompletableFuture<>(), new CompletableFuture<>(), CompletableFuture.completedFuture(null));
}
- @Nonnull
- private HATestingDispatcher createDispatcherWithJobManagerRunnerFactory(HighAvailabilityServices highAvailabilityServices, Dispatcher.JobManagerRunnerFactory jobManagerRunnerFactory) throws Exception {
- return createDispatcher(highAvailabilityServices, null, jobManagerRunnerFactory);
- }
-
private HATestingDispatcher createDispatcher(HighAvailabilityServices haServices) throws Exception {
return createDispatcher(
haServices,
@@ -309,7 +304,7 @@ public class DispatcherHATest extends TestLogger {
private HATestingDispatcher createDispatcher(
HighAvailabilityServices highAvailabilityServices,
@Nullable Queue<DispatcherId> fencingTokens,
- Dispatcher.JobManagerRunnerFactory jobManagerRunnerFactory) throws Exception {
+ JobManagerRunnerFactory jobManagerRunnerFactory) throws Exception {
final Configuration configuration = new Configuration();
return new HATestingDispatcher(
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
index e10b8bc..fa4705a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
@@ -28,7 +28,6 @@ import org.apache.flink.runtime.blob.VoidBlobStore;
import org.apache.flink.runtime.checkpoint.Checkpoints;
import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
import org.apache.flink.runtime.checkpoint.savepoint.SavepointV2;
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
import org.apache.flink.runtime.executiongraph.ErrorInfo;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
@@ -201,7 +200,7 @@ public class DispatcherTest extends TestLogger {
}
@Nonnull
- private TestingDispatcher createAndStartDispatcher(HeartbeatServices heartbeatServices, TestingHighAvailabilityServices haServices, Dispatcher.JobManagerRunnerFactory jobManagerRunnerFactory) throws Exception {
+ private TestingDispatcher createAndStartDispatcher(HeartbeatServices heartbeatServices, TestingHighAvailabilityServices haServices, JobManagerRunnerFactory jobManagerRunnerFactory) throws Exception {
final TestingDispatcher dispatcher = createDispatcher(heartbeatServices, haServices, jobManagerRunnerFactory);
dispatcher.start();
@@ -209,7 +208,7 @@ public class DispatcherTest extends TestLogger {
}
@Nonnull
- private TestingDispatcher createDispatcher(HeartbeatServices heartbeatServices, TestingHighAvailabilityServices haServices, Dispatcher.JobManagerRunnerFactory jobManagerRunnerFactory) throws Exception {
+ private TestingDispatcher createDispatcher(HeartbeatServices heartbeatServices, TestingHighAvailabilityServices haServices, JobManagerRunnerFactory jobManagerRunnerFactory) throws Exception {
return new TestingDispatcher(
rpcService,
Dispatcher.DISPATCHER_NAME + '_' + name.getMethodName(),
@@ -668,7 +667,7 @@ public class DispatcherTest extends TestLogger {
final InMemorySubmittedJobGraphStore submittedJobGraphStore = new InMemorySubmittedJobGraphStore();
haServices.setSubmittedJobGraphStore(submittedJobGraphStore);
- dispatcher = createAndStartDispatcher(heartbeatServices, haServices, Dispatcher.DefaultJobManagerRunnerFactory.INSTANCE);
+ dispatcher = createAndStartDispatcher(heartbeatServices, haServices, DefaultJobManagerRunnerFactory.INSTANCE);
// grant leadership and submit a single job
final DispatcherId expectedDispatcherId = DispatcherId.generate();
@@ -697,10 +696,10 @@ public class DispatcherTest extends TestLogger {
}
@Override
- public JobManagerRunner createJobManagerRunner(ResourceID resourceId, JobGraph jobGraph, Configuration configuration, RpcService rpcService, HighAvailabilityServices highAvailabilityServices, HeartbeatServices heartbeatServices, JobManagerSharedServices jobManagerSharedServices, JobManagerJobMetricGroupFactory jobManagerJobMetricGroupFactory, FatalErrorHandler fatalErrorHandler) throws Exception {
+ public JobManagerRunner createJobManagerRunner(JobGraph jobGraph, Configuration configuration, RpcService rpcService, HighAvailabilityServices highAvailabilityServices, HeartbeatServices heartbeatServices, JobManagerSharedServices jobManagerSharedServices, JobManagerJobMetricGroupFactory jobManagerJobMetricGroupFactory, FatalErrorHandler fatalErrorHandler) throws Exception {
jobManagerRunnerCreationLatch.run();
- return super.createJobManagerRunner(resourceId, jobGraph, configuration, rpcService, highAvailabilityServices, heartbeatServices, jobManagerSharedServices, jobManagerJobMetricGroupFactory, fatalErrorHandler);
+ return super.createJobManagerRunner(jobGraph, configuration, rpcService, highAvailabilityServices, heartbeatServices, jobManagerSharedServices, jobManagerJobMetricGroupFactory, fatalErrorHandler);
}
}
@@ -735,7 +734,7 @@ public class DispatcherTest extends TestLogger {
}
}
- private static final class ExpectedJobIdJobManagerRunnerFactory implements Dispatcher.JobManagerRunnerFactory {
+ private static final class ExpectedJobIdJobManagerRunnerFactory implements JobManagerRunnerFactory {
private final JobID expectedJobId;
@@ -748,7 +747,6 @@ public class DispatcherTest extends TestLogger {
@Override
public JobManagerRunner createJobManagerRunner(
- ResourceID resourceId,
JobGraph jobGraph,
Configuration configuration,
RpcService rpcService,
@@ -761,8 +759,7 @@ public class DispatcherTest extends TestLogger {
createdJobManagerRunnerLatch.countDown();
- return Dispatcher.DefaultJobManagerRunnerFactory.INSTANCE.createJobManagerRunner(
- resourceId,
+ return DefaultJobManagerRunnerFactory.INSTANCE.createJobManagerRunner(
jobGraph,
configuration,
rpcService,
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingJobManagerRunnerFactory.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingJobManagerRunnerFactory.java
index 30e4af6..c19038c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingJobManagerRunnerFactory.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingJobManagerRunnerFactory.java
@@ -19,7 +19,6 @@
package org.apache.flink.runtime.dispatcher;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
@@ -38,10 +37,10 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
/**
- * {@link org.apache.flink.runtime.dispatcher.Dispatcher.JobManagerRunnerFactory} implementation for
+ * {@link JobManagerRunnerFactory} implementation for
* testing purposes.
*/
-class TestingJobManagerRunnerFactory implements Dispatcher.JobManagerRunnerFactory {
+class TestingJobManagerRunnerFactory implements JobManagerRunnerFactory {
private final CompletableFuture<JobGraph> jobGraphFuture;
private final CompletableFuture<ArchivedExecutionGraph> resultFuture;
@@ -68,7 +67,6 @@ class TestingJobManagerRunnerFactory implements Dispatcher.JobManagerRunnerFacto
@Override
public JobManagerRunner createJobManagerRunner(
- ResourceID resourceId,
JobGraph jobGraph,
Configuration configuration,
RpcService rpcService,
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/ZooKeeperHADispatcherTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/ZooKeeperHADispatcherTest.java
index 9c23f9d..d3d80d4 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/ZooKeeperHADispatcherTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/ZooKeeperHADispatcherTest.java
@@ -325,7 +325,7 @@ public class ZooKeeperHADispatcherTest extends TestLogger {
}
@Nonnull
- private TestingDispatcher createDispatcher(HighAvailabilityServices highAvailabilityServices, Dispatcher.JobManagerRunnerFactory jobManagerRunnerFactory) throws Exception {
+ private TestingDispatcher createDispatcher(HighAvailabilityServices highAvailabilityServices, JobManagerRunnerFactory jobManagerRunnerFactory) throws Exception {
return new TestingDispatcher(
rpcService,
Dispatcher.DISPATCHER_NAME + '_' + name.getMethodName() + UUID.randomUUID(),
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerTest.java
index da5a9ff..584307b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerTest.java
@@ -19,24 +19,23 @@
package org.apache.flink.runtime.jobmaster;
import org.apache.flink.api.common.JobID;
-import org.apache.flink.configuration.BlobServerOptions;
-import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.blob.FailingPermanentBlobService;
import org.apache.flink.runtime.blob.VoidPermanentBlobService;
import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders;
+import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
-import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobgraph.JobVertex;
-import org.apache.flink.runtime.jobmaster.factories.UnregisteredJobManagerJobMetricGroupFactory;
+import org.apache.flink.runtime.jobmaster.factories.JobMasterServiceFactory;
+import org.apache.flink.runtime.jobmaster.factories.TestingJobMasterFactory;
import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
import org.apache.flink.runtime.leaderretrieval.SettableLeaderRetrievalService;
import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder;
-import org.apache.flink.runtime.rpc.TestingRpcService;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.runtime.testtasks.NoOpInvokable;
import org.apache.flink.runtime.util.TestingFatalErrorHandler;
import org.apache.flink.util.ExceptionUtils;
@@ -68,30 +67,26 @@ public class JobManagerRunnerTest extends TestLogger {
@ClassRule
public static TemporaryFolder temporaryFolder = new TemporaryFolder();
- private static Configuration configuration;
-
- private static TestingRpcService rpcService;
-
- private static HeartbeatServices heartbeatServices = new HeartbeatServices(1000L, 1000L);
-
- private static JobManagerSharedServices jobManagerSharedServices;
-
private static JobGraph jobGraph;
private static ArchivedExecutionGraph archivedExecutionGraph;
+ private static LibraryCacheManager libraryCacheManager;
+
+ private static JobMasterServiceFactory jobMasterFactory;
+
private TestingHighAvailabilityServices haServices;
private TestingFatalErrorHandler fatalErrorHandler;
@BeforeClass
- public static void setupClass() throws Exception {
- configuration = new Configuration();
- rpcService = new TestingRpcService();
-
- configuration.setString(BlobServerOptions.STORAGE_DIRECTORY, temporaryFolder.newFolder().getAbsolutePath());
+ public static void setupClass() {
+ libraryCacheManager = new BlobLibraryCacheManager(
+ FailingPermanentBlobService.INSTANCE,
+ FlinkUserCodeClassLoaders.ResolveOrder.CHILD_FIRST,
+ new String[]{});
- jobManagerSharedServices = new TestingJobManagerSharedServicesBuilder().build();
+ jobMasterFactory = TestingJobMasterFactory.INSTANCE;
final JobVertex jobVertex = new JobVertex("Test vertex");
jobVertex.setInvokableClass(NoOpInvokable.class);
@@ -119,13 +114,9 @@ public class JobManagerRunnerTest extends TestLogger {
}
@AfterClass
- public static void tearDownClass() throws Exception {
- if (jobManagerSharedServices != null) {
- jobManagerSharedServices.shutdown();
- }
-
- if (rpcService != null) {
- rpcService.stopService();
+ public static void tearDownClass() {
+ if (libraryCacheManager != null) {
+ libraryCacheManager.shutdown();
}
}
@@ -202,10 +193,7 @@ public class JobManagerRunnerTest extends TestLogger {
VoidPermanentBlobService.INSTANCE,
FlinkUserCodeClassLoaders.ResolveOrder.CHILD_FIRST,
new String[]{});
- final JobManagerSharedServices jobManagerSharedServices = new TestingJobManagerSharedServicesBuilder()
- .setLibraryCacheManager(libraryCacheManager)
- .build();
- final JobManagerRunner jobManagerRunner = createJobManagerRunner(jobManagerSharedServices);
+ final JobManagerRunner jobManagerRunner = createJobManagerRunner(libraryCacheManager);
try {
jobManagerRunner.start();
@@ -222,21 +210,18 @@ public class JobManagerRunnerTest extends TestLogger {
}
@Nonnull
- private JobManagerRunner createJobManagerRunner() throws Exception {
- return createJobManagerRunner(jobManagerSharedServices);
- }
-
- @Nonnull
- private JobManagerRunner createJobManagerRunner(final JobManagerSharedServices jobManagerSharedServices) throws Exception {
+ private JobManagerRunner createJobManagerRunner(LibraryCacheManager libraryCacheManager) throws Exception {
return new JobManagerRunner(
- ResourceID.generate(),
jobGraph,
- configuration,
- rpcService,
+ jobMasterFactory,
haServices,
- heartbeatServices,
- jobManagerSharedServices,
- UnregisteredJobManagerJobMetricGroupFactory.INSTANCE,
+ libraryCacheManager,
+ TestingUtils.defaultExecutor(),
fatalErrorHandler);
}
+
+ @Nonnull
+ private JobManagerRunner createJobManagerRunner() throws Exception {
+ return createJobManagerRunner(libraryCacheManager);
+ }
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingJobMasterService.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingJobMasterService.java
new file mode 100644
index 0000000..7e65da1
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingJobMasterService.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster;
+
+import org.apache.flink.runtime.jobmaster.utils.TestingJobMasterGatewayBuilder;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nonnull;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Implementation of the {@link JobMasterService} for testing purposes.
+ */
+public class TestingJobMasterService implements JobMasterService {
+
+ @Nonnull
+ private final String address;
+
+ private JobMasterGateway jobMasterGateway;
+
+ public TestingJobMasterService(@Nonnull String address) {
+ this.address = address;
+ }
+
+ public TestingJobMasterService() {
+ this("localhost");
+ }
+
+ @Override
+ public CompletableFuture<Acknowledge> start(JobMasterId jobMasterId) {
+ jobMasterGateway = new TestingJobMasterGatewayBuilder().build();
+ return CompletableFuture.completedFuture(Acknowledge.get());
+ }
+
+ @Override
+ public CompletableFuture<Acknowledge> suspend(Exception cause) {
+ jobMasterGateway = null;
+ return CompletableFuture.completedFuture(Acknowledge.get());
+ }
+
+ @Override
+ public JobMasterGateway getGateway() {
+ Preconditions.checkNotNull(jobMasterGateway, "TestingJobMasterService has not been started yet.");
+ return jobMasterGateway;
+ }
+
+ @Override
+ public String getAddress() {
+ return address;
+ }
+
+ @Override
+ public CompletableFuture<Void> closeAsync() {
+ jobMasterGateway = null;
+ return CompletableFuture.completedFuture(null);
+ }
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/factories/TestingJobMasterFactory.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/factories/TestingJobMasterFactory.java
new file mode 100644
index 0000000..ba7f1c8
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/factories/TestingJobMasterFactory.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.factories;
+
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobmanager.OnCompletionActions;
+import org.apache.flink.runtime.jobmaster.JobMaster;
+import org.apache.flink.runtime.jobmaster.JobMasterService;
+import org.apache.flink.runtime.jobmaster.TestingJobMasterService;
+
+/**
+ * Testing implementation of the {@link JobMasterServiceFactory} which returns a {@link JobMaster} mock.
+ */
+public enum TestingJobMasterFactory implements JobMasterServiceFactory {
+ INSTANCE;
+
+ @Override
+ public JobMasterService createJobMasterService(JobGraph jobGraph, OnCompletionActions jobCompletionActions, ClassLoader userCodeClassloader) {
+ return new TestingJobMasterService();
+ }
+}