You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/06/27 13:10:42 UTC
[2/4] flink git commit: [hotfix] Introduce SlotPoolFactory to make
SlotPool instantiation configurable
[hotfix] Introduce SlotPoolFactory to make SlotPool instantiation configurable
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/748610b0
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/748610b0
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/748610b0
Branch: refs/heads/master
Commit: 748610b036a11d36dbed7a319dc190e72409a9b1
Parents: 25fbcf8
Author: Till Rohrmann <tr...@apache.org>
Authored: Mon Jun 25 10:34:30 2018 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Wed Jun 27 15:03:31 2018 +0200
----------------------------------------------------------------------
.../runtime/jobmaster/JobManagerRunner.java | 7 ++
.../flink/runtime/jobmaster/JobMaster.java | 10 +--
.../jobmaster/JobMasterConfiguration.java | 9 +--
.../slotpool/DefaultSlotPoolFactory.java | 84 ++++++++++++++++++++
.../jobmaster/slotpool/SlotPoolFactory.java | 32 ++++++++
.../runtime/rpc/akka/AkkaRpcServiceUtils.java | 9 +--
.../apache/flink/runtime/akka/AkkaUtils.scala | 9 ++-
.../flink/runtime/jobmaster/JobMasterTest.java | 48 ++++++-----
8 files changed, 168 insertions(+), 40 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/748610b0/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
index d867ab3..78671bc 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
@@ -33,6 +33,8 @@ import org.apache.flink.runtime.highavailability.RunningJobsRegistry.JobScheduli
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobmanager.OnCompletionActions;
import org.apache.flink.runtime.jobmaster.factories.JobManagerJobMetricGroupFactory;
+import org.apache.flink.runtime.jobmaster.slotpool.DefaultSlotPoolFactory;
+import org.apache.flink.runtime.jobmaster.slotpool.SlotPoolFactory;
import org.apache.flink.runtime.leaderelection.LeaderContender;
import org.apache.flink.runtime.leaderelection.LeaderElectionService;
import org.apache.flink.runtime.messages.Acknowledge;
@@ -147,6 +149,10 @@ public class JobManagerRunner implements LeaderContender, OnCompletionActions, A
this.leaderGatewayFuture = new CompletableFuture<>();
+ final SlotPoolFactory slotPoolFactory = DefaultSlotPoolFactory.fromConfiguration(
+ configuration,
+ rpcService);
+
// now start the JobManager
this.jobMaster = new JobMaster(
rpcService,
@@ -154,6 +160,7 @@ public class JobManagerRunner implements LeaderContender, OnCompletionActions, A
resourceId,
jobGraph,
haServices,
+ slotPoolFactory,
jobManagerSharedServices,
heartbeatServices,
blobServer,
http://git-wip-us.apache.org/repos/asf/flink/blob/748610b0/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index aaa2f0e..e4a1b6a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -70,6 +70,7 @@ import org.apache.flink.runtime.jobmaster.exceptions.JobModificationException;
import org.apache.flink.runtime.jobmaster.factories.JobManagerJobMetricGroupFactory;
import org.apache.flink.runtime.jobmaster.message.ClassloadingProps;
import org.apache.flink.runtime.jobmaster.slotpool.SlotPool;
+import org.apache.flink.runtime.jobmaster.slotpool.SlotPoolFactory;
import org.apache.flink.runtime.jobmaster.slotpool.SlotPoolGateway;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
@@ -101,7 +102,6 @@ import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
import org.apache.flink.runtime.taskmanager.TaskExecutionState;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
-import org.apache.flink.runtime.util.clock.SystemClock;
import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
@@ -226,6 +226,7 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast
ResourceID resourceId,
JobGraph jobGraph,
HighAvailabilityServices highAvailabilityService,
+ SlotPoolFactory slotPoolFactory,
JobManagerSharedServices jobManagerSharedServices,
HeartbeatServices heartbeatServices,
BlobServer blobServer,
@@ -280,12 +281,7 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast
resourceManagerLeaderRetriever = highAvailabilityServices.getResourceManagerLeaderRetriever();
- this.slotPool = new SlotPool(
- rpcService,
- jobGraph.getJobID(),
- SystemClock.getInstance(),
- rpcTimeout,
- jobMasterConfiguration.getSlotIdleTimeout());
+ this.slotPool = checkNotNull(slotPoolFactory).createSlotPool(jobGraph.getJobID());
this.slotPoolGateway = slotPool.getSelfGateway(SlotPoolGateway.class);
http://git-wip-us.apache.org/repos/asf/flink/blob/748610b0/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterConfiguration.java
index 5a4e3b3..8b9bfc4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterConfiguration.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterConfiguration.java
@@ -21,7 +21,6 @@ package org.apache.flink.runtime.jobmaster;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ConfigurationUtils;
-import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.util.Preconditions;
@@ -76,13 +75,7 @@ public class JobMasterConfiguration {
public static JobMasterConfiguration fromConfiguration(Configuration configuration) {
- final Time rpcTimeout;
-
- try {
- rpcTimeout = Time.milliseconds(AkkaUtils.getTimeout(configuration).toMillis());
- } catch (NumberFormatException e) {
- throw new IllegalConfigurationException(AkkaUtils.formatDurationParsingErrorMessage());
- }
+ final Time rpcTimeout = AkkaUtils.getTimeoutAsTime(configuration);
final Time slotRequestTimeout = Time.milliseconds(configuration.getLong(JobManagerOptions.SLOT_REQUEST_TIMEOUT));
final Time slotIdleTimeout = Time.milliseconds(configuration.getLong(JobManagerOptions.SLOT_IDLE_TIMEOUT));
http://git-wip-us.apache.org/repos/asf/flink/blob/748610b0/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultSlotPoolFactory.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultSlotPoolFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultSlotPoolFactory.java
new file mode 100644
index 0000000..2d082c2
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultSlotPoolFactory.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.util.clock.Clock;
+import org.apache.flink.runtime.util.clock.SystemClock;
+
+import javax.annotation.Nonnull;
+
+/**
+ * Default slot pool factory.
+ */
+public class DefaultSlotPoolFactory implements SlotPoolFactory {
+
+ @Nonnull
+ private final RpcService rpcService;
+
+ @Nonnull
+ private final Clock clock;
+
+ @Nonnull
+ private final Time rpcTimeout;
+
+ @Nonnull
+ private final Time slotIdleTimeout;
+
+ public DefaultSlotPoolFactory(
+ @Nonnull RpcService rpcService,
+ @Nonnull Clock clock,
+ @Nonnull Time rpcTimeout,
+ @Nonnull Time slotIdleTimeout) {
+ this.rpcService = rpcService;
+ this.clock = clock;
+ this.rpcTimeout = rpcTimeout;
+ this.slotIdleTimeout = slotIdleTimeout;
+ }
+
+ @Override
+ @Nonnull
+ public SlotPool createSlotPool(@Nonnull JobID jobId) {
+ return new SlotPool(
+ rpcService,
+ jobId,
+ clock,
+ rpcTimeout,
+ slotIdleTimeout);
+ }
+
+ public static DefaultSlotPoolFactory fromConfiguration(
+ @Nonnull Configuration configuration,
+ @Nonnull RpcService rpcService) {
+
+ final Time rpcTimeout = AkkaUtils.getTimeoutAsTime(configuration);
+ final Time slotIdleTimeout = Time.milliseconds(configuration.getLong(JobManagerOptions.SLOT_IDLE_TIMEOUT));
+
+ return new DefaultSlotPoolFactory(
+ rpcService,
+ SystemClock.getInstance(),
+ rpcTimeout,
+ slotIdleTimeout);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/748610b0/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolFactory.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolFactory.java
new file mode 100644
index 0000000..2255c09
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolFactory.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.api.common.JobID;
+
+import javax.annotation.Nonnull;
+
+/**
+ * Factory interface for {@link SlotPool}.
+ */
+public interface SlotPoolFactory {
+
+ @Nonnull
+ SlotPool createSlotPool(@Nonnull JobID jobId);
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/748610b0/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceUtils.java
index d3472ee..6ae142b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceUtils.java
@@ -18,8 +18,6 @@
package org.apache.flink.runtime.rpc.akka;
-import akka.actor.ActorSystem;
-import com.typesafe.config.Config;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.AkkaOptions;
import org.apache.flink.configuration.Configuration;
@@ -29,10 +27,11 @@ import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.A
import org.apache.flink.runtime.net.SSLUtils;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.util.NetUtils;
-
import org.apache.flink.util.Preconditions;
-import org.jboss.netty.channel.ChannelException;
+import akka.actor.ActorSystem;
+import com.typesafe.config.Config;
+import org.jboss.netty.channel.ChannelException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -102,7 +101,7 @@ public class AkkaRpcServiceUtils {
throw new Exception("Could not create TaskManager actor system", t);
}
- final Time timeout = Time.milliseconds(AkkaUtils.getTimeout(configuration).toMillis());
+ final Time timeout = AkkaUtils.getTimeoutAsTime(configuration);
return new AkkaRpcService(actorSystem, timeout);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/748610b0/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
index da700c0..57ca9d4 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
@@ -609,9 +609,14 @@ object AkkaUtils {
}
def getTimeoutAsTime(config: Configuration): Time = {
- val duration = Duration(config.getString(AkkaOptions.ASK_TIMEOUT))
+ try {
+ val duration = Duration(config.getString(AkkaOptions.ASK_TIMEOUT))
- Time.milliseconds(duration.toMillis)
+ Time.milliseconds(duration.toMillis)
+ } catch {
+ case _: NumberFormatException =>
+ throw new IllegalConfigurationException(AkkaUtils.formatDurationParsingErrorMessage)
+ }
}
def getDefaultTimeout: Time = {
http://git-wip-us.apache.org/repos/asf/flink/blob/748610b0/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
index 552c0f0..d7dc017 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
@@ -61,6 +61,7 @@ import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguratio
import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
import org.apache.flink.runtime.jobmanager.OnCompletionActions;
import org.apache.flink.runtime.jobmaster.factories.UnregisteredJobManagerJobMetricGroupFactory;
+import org.apache.flink.runtime.jobmaster.slotpool.DefaultSlotPoolFactory;
import org.apache.flink.runtime.leaderretrieval.SettableLeaderRetrievalService;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.registration.RegistrationResponse;
@@ -212,9 +213,12 @@ public class JobMasterTest extends TestLogger {
rpcService.registerGateway(taskExecutorGateway.getAddress(), taskExecutorGateway);
final JobManagerSharedServices jobManagerSharedServices = new TestingJobManagerSharedServicesBuilder().build();
- final JobMasterConfiguration jobMasterConfiguration = JobMasterConfiguration.fromConfiguration(configuration);
- final JobMaster jobMaster = createJobMaster(jobMasterConfiguration, jobGraph, haServices, jobManagerSharedServices);
+ final JobMaster jobMaster = createJobMaster(
+ configuration,
+ jobGraph,
+ haServices,
+ jobManagerSharedServices);
CompletableFuture<Acknowledge> startFuture = jobMaster.start(jobMasterId, testingTimeout);
@@ -277,9 +281,12 @@ public class JobMasterTest extends TestLogger {
rpcService.registerGateway(resourceManagerAddress, resourceManagerGateway);
final JobManagerSharedServices jobManagerSharedServices = new TestingJobManagerSharedServicesBuilder().build();
- final JobMasterConfiguration jobMasterConfiguration = JobMasterConfiguration.fromConfiguration(configuration);
- final JobMaster jobMaster = createJobMaster(jobMasterConfiguration, jobGraph, haServices, jobManagerSharedServices);
+ final JobMaster jobMaster = createJobMaster(
+ configuration,
+ jobGraph,
+ haServices,
+ jobManagerSharedServices);
CompletableFuture<Acknowledge> startFuture = jobMaster.start(jobMasterId, testingTimeout);
@@ -333,7 +340,7 @@ public class JobMasterTest extends TestLogger {
final TestingCheckpointRecoveryFactory testingCheckpointRecoveryFactory = new TestingCheckpointRecoveryFactory(completedCheckpointStore, new StandaloneCheckpointIDCounter());
haServices.setCheckpointRecoveryFactory(testingCheckpointRecoveryFactory);
final JobMaster jobMaster = createJobMaster(
- JobMasterConfiguration.fromConfiguration(configuration),
+ configuration,
jobGraph,
haServices,
new TestingJobManagerSharedServicesBuilder().build());
@@ -382,8 +389,9 @@ public class JobMasterTest extends TestLogger {
completedCheckpointStore.addCheckpoint(completedCheckpoint);
final TestingCheckpointRecoveryFactory testingCheckpointRecoveryFactory = new TestingCheckpointRecoveryFactory(completedCheckpointStore, new StandaloneCheckpointIDCounter());
haServices.setCheckpointRecoveryFactory(testingCheckpointRecoveryFactory);
+
final JobMaster jobMaster = createJobMaster(
- JobMasterConfiguration.fromConfiguration(configuration),
+ configuration,
jobGraph,
haServices,
new TestingJobManagerSharedServicesBuilder().build());
@@ -412,7 +420,7 @@ public class JobMasterTest extends TestLogger {
configuration.setLong(JobManagerOptions.SLOT_REQUEST_TIMEOUT, slotRequestTimeout);
final JobMaster jobMaster = createJobMaster(
- JobMasterConfiguration.fromConfiguration(configuration),
+ configuration,
restartingJobGraph,
haServices,
new TestingJobManagerSharedServicesBuilder().build(),
@@ -496,7 +504,7 @@ public class JobMasterTest extends TestLogger {
@Test
public void testCloseUnestablishedResourceManagerConnection() throws Exception {
final JobMaster jobMaster = createJobMaster(
- JobMasterConfiguration.fromConfiguration(configuration),
+ configuration,
jobGraph,
haServices,
new TestingJobManagerSharedServicesBuilder().build());
@@ -544,7 +552,7 @@ public class JobMasterTest extends TestLogger {
@Test
public void testReconnectionAfterDisconnect() throws Exception {
final JobMaster jobMaster = createJobMaster(
- JobMasterConfiguration.fromConfiguration(configuration),
+ configuration,
jobGraph,
haServices,
new TestingJobManagerSharedServicesBuilder().build());
@@ -588,7 +596,7 @@ public class JobMasterTest extends TestLogger {
@Test
public void testResourceManagerConnectionAfterRegainingLeadership() throws Exception {
final JobMaster jobMaster = createJobMaster(
- JobMasterConfiguration.fromConfiguration(configuration),
+ configuration,
jobGraph,
haServices,
new TestingJobManagerSharedServicesBuilder().build());
@@ -633,7 +641,7 @@ public class JobMasterTest extends TestLogger {
public void testRequestPartitionState() throws Exception {
final JobGraph producerConsumerJobGraph = producerConsumerJobGraph();
final JobMaster jobMaster = createJobMaster(
- JobMasterConfiguration.fromConfiguration(configuration),
+ configuration,
producerConsumerJobGraph,
haServices,
new TestingJobManagerSharedServicesBuilder().build(),
@@ -746,12 +754,12 @@ public class JobMasterTest extends TestLogger {
@Nonnull
private JobMaster createJobMaster(
- JobMasterConfiguration jobMasterConfiguration,
+ Configuration configuration,
JobGraph jobGraph,
HighAvailabilityServices highAvailabilityServices,
JobManagerSharedServices jobManagerSharedServices) throws Exception {
return createJobMaster(
- jobMasterConfiguration,
+ configuration,
jobGraph,
highAvailabilityServices,
jobManagerSharedServices,
@@ -760,17 +768,21 @@ public class JobMasterTest extends TestLogger {
@Nonnull
private JobMaster createJobMaster(
- JobMasterConfiguration jobMasterConfiguration,
- JobGraph jobGraph,
- HighAvailabilityServices highAvailabilityServices,
- JobManagerSharedServices jobManagerSharedServices,
- HeartbeatServices heartbeatServices) throws Exception {
+ Configuration configuration,
+ JobGraph jobGraph,
+ HighAvailabilityServices highAvailabilityServices,
+ JobManagerSharedServices jobManagerSharedServices,
+ HeartbeatServices heartbeatServices) throws Exception {
+
+ final JobMasterConfiguration jobMasterConfiguration = JobMasterConfiguration.fromConfiguration(configuration);
+
return new JobMaster(
rpcService,
jobMasterConfiguration,
jmResourceId,
jobGraph,
highAvailabilityServices,
+ DefaultSlotPoolFactory.fromConfiguration(configuration, rpcService),
jobManagerSharedServices,
heartbeatServices,
blobServer,