You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/06/27 13:10:42 UTC

[2/4] flink git commit: [hotfix] Introduce SlotPoolFactory to make SlotPool instantiation configurable

[hotfix] Introduce SlotPoolFactory to make SlotPool instantiation configurable


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/748610b0
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/748610b0
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/748610b0

Branch: refs/heads/master
Commit: 748610b036a11d36dbed7a319dc190e72409a9b1
Parents: 25fbcf8
Author: Till Rohrmann <tr...@apache.org>
Authored: Mon Jun 25 10:34:30 2018 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Wed Jun 27 15:03:31 2018 +0200

----------------------------------------------------------------------
 .../runtime/jobmaster/JobManagerRunner.java     |  7 ++
 .../flink/runtime/jobmaster/JobMaster.java      | 10 +--
 .../jobmaster/JobMasterConfiguration.java       |  9 +--
 .../slotpool/DefaultSlotPoolFactory.java        | 84 ++++++++++++++++++++
 .../jobmaster/slotpool/SlotPoolFactory.java     | 32 ++++++++
 .../runtime/rpc/akka/AkkaRpcServiceUtils.java   |  9 +--
 .../apache/flink/runtime/akka/AkkaUtils.scala   |  9 ++-
 .../flink/runtime/jobmaster/JobMasterTest.java  | 48 ++++++-----
 8 files changed, 168 insertions(+), 40 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/748610b0/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
index d867ab3..78671bc 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
@@ -33,6 +33,8 @@ import org.apache.flink.runtime.highavailability.RunningJobsRegistry.JobScheduli
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobmanager.OnCompletionActions;
 import org.apache.flink.runtime.jobmaster.factories.JobManagerJobMetricGroupFactory;
+import org.apache.flink.runtime.jobmaster.slotpool.DefaultSlotPoolFactory;
+import org.apache.flink.runtime.jobmaster.slotpool.SlotPoolFactory;
 import org.apache.flink.runtime.leaderelection.LeaderContender;
 import org.apache.flink.runtime.leaderelection.LeaderElectionService;
 import org.apache.flink.runtime.messages.Acknowledge;
@@ -147,6 +149,10 @@ public class JobManagerRunner implements LeaderContender, OnCompletionActions, A
 
 			this.leaderGatewayFuture = new CompletableFuture<>();
 
+			final SlotPoolFactory slotPoolFactory = DefaultSlotPoolFactory.fromConfiguration(
+				configuration,
+				rpcService);
+
 			// now start the JobManager
 			this.jobMaster = new JobMaster(
 				rpcService,
@@ -154,6 +160,7 @@ public class JobManagerRunner implements LeaderContender, OnCompletionActions, A
 				resourceId,
 				jobGraph,
 				haServices,
+				slotPoolFactory,
 				jobManagerSharedServices,
 				heartbeatServices,
 				blobServer,

http://git-wip-us.apache.org/repos/asf/flink/blob/748610b0/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index aaa2f0e..e4a1b6a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -70,6 +70,7 @@ import org.apache.flink.runtime.jobmaster.exceptions.JobModificationException;
 import org.apache.flink.runtime.jobmaster.factories.JobManagerJobMetricGroupFactory;
 import org.apache.flink.runtime.jobmaster.message.ClassloadingProps;
 import org.apache.flink.runtime.jobmaster.slotpool.SlotPool;
+import org.apache.flink.runtime.jobmaster.slotpool.SlotPoolFactory;
 import org.apache.flink.runtime.jobmaster.slotpool.SlotPoolGateway;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
@@ -101,7 +102,6 @@ import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
 import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
 import org.apache.flink.runtime.taskmanager.TaskExecutionState;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
-import org.apache.flink.runtime.util.clock.SystemClock;
 import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
@@ -226,6 +226,7 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast
 			ResourceID resourceId,
 			JobGraph jobGraph,
 			HighAvailabilityServices highAvailabilityService,
+			SlotPoolFactory slotPoolFactory,
 			JobManagerSharedServices jobManagerSharedServices,
 			HeartbeatServices heartbeatServices,
 			BlobServer blobServer,
@@ -280,12 +281,7 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast
 
 		resourceManagerLeaderRetriever = highAvailabilityServices.getResourceManagerLeaderRetriever();
 
-		this.slotPool = new SlotPool(
-			rpcService,
-			jobGraph.getJobID(),
-			SystemClock.getInstance(),
-			rpcTimeout,
-			jobMasterConfiguration.getSlotIdleTimeout());
+		this.slotPool = checkNotNull(slotPoolFactory).createSlotPool(jobGraph.getJobID());
 
 		this.slotPoolGateway = slotPool.getSelfGateway(SlotPoolGateway.class);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/748610b0/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterConfiguration.java
index 5a4e3b3..8b9bfc4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterConfiguration.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterConfiguration.java
@@ -21,7 +21,6 @@ package org.apache.flink.runtime.jobmaster;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.ConfigurationUtils;
-import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.util.Preconditions;
@@ -76,13 +75,7 @@ public class JobMasterConfiguration {
 
 	public static JobMasterConfiguration fromConfiguration(Configuration configuration) {
 
-		final Time rpcTimeout;
-
-		try {
-			rpcTimeout = Time.milliseconds(AkkaUtils.getTimeout(configuration).toMillis());
-		} catch (NumberFormatException e) {
-			throw new IllegalConfigurationException(AkkaUtils.formatDurationParsingErrorMessage());
-		}
+		final Time rpcTimeout = AkkaUtils.getTimeoutAsTime(configuration);
 
 		final Time slotRequestTimeout = Time.milliseconds(configuration.getLong(JobManagerOptions.SLOT_REQUEST_TIMEOUT));
 		final Time slotIdleTimeout = Time.milliseconds(configuration.getLong(JobManagerOptions.SLOT_IDLE_TIMEOUT));

http://git-wip-us.apache.org/repos/asf/flink/blob/748610b0/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultSlotPoolFactory.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultSlotPoolFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultSlotPoolFactory.java
new file mode 100644
index 0000000..2d082c2
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultSlotPoolFactory.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.util.clock.Clock;
+import org.apache.flink.runtime.util.clock.SystemClock;
+
+import javax.annotation.Nonnull;
+
+/**
+ * Default slot pool factory.
+ */
+public class DefaultSlotPoolFactory implements SlotPoolFactory {
+
+	@Nonnull
+	private final RpcService rpcService;
+
+	@Nonnull
+	private final Clock clock;
+
+	@Nonnull
+	private final Time rpcTimeout;
+
+	@Nonnull
+	private final Time slotIdleTimeout;
+
+	public DefaultSlotPoolFactory(
+			@Nonnull RpcService rpcService,
+			@Nonnull Clock clock,
+			@Nonnull Time rpcTimeout,
+			@Nonnull Time slotIdleTimeout) {
+		this.rpcService = rpcService;
+		this.clock = clock;
+		this.rpcTimeout = rpcTimeout;
+		this.slotIdleTimeout = slotIdleTimeout;
+	}
+
+	@Override
+	@Nonnull
+	public SlotPool createSlotPool(@Nonnull JobID jobId) {
+		return new SlotPool(
+			rpcService,
+			jobId,
+			clock,
+			rpcTimeout,
+			slotIdleTimeout);
+	}
+
+	public static DefaultSlotPoolFactory fromConfiguration(
+			@Nonnull Configuration configuration,
+			@Nonnull RpcService rpcService) {
+
+		final Time rpcTimeout = AkkaUtils.getTimeoutAsTime(configuration);
+		final Time slotIdleTimeout = Time.milliseconds(configuration.getLong(JobManagerOptions.SLOT_IDLE_TIMEOUT));
+
+		return new DefaultSlotPoolFactory(
+			rpcService,
+			SystemClock.getInstance(),
+			rpcTimeout,
+			slotIdleTimeout);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/748610b0/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolFactory.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolFactory.java
new file mode 100644
index 0000000..2255c09
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolFactory.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.api.common.JobID;
+
+import javax.annotation.Nonnull;
+
+/**
+ * Factory interface for {@link SlotPool}.
+ */
+public interface SlotPoolFactory {
+
+	@Nonnull
+	SlotPool createSlotPool(@Nonnull JobID jobId);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/748610b0/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceUtils.java
index d3472ee..6ae142b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceUtils.java
@@ -18,8 +18,6 @@
 
 package org.apache.flink.runtime.rpc.akka;
 
-import akka.actor.ActorSystem;
-import com.typesafe.config.Config;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.AkkaOptions;
 import org.apache.flink.configuration.Configuration;
@@ -29,10 +27,11 @@ import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.A
 import org.apache.flink.runtime.net.SSLUtils;
 import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.util.NetUtils;
-
 import org.apache.flink.util.Preconditions;
-import org.jboss.netty.channel.ChannelException;
 
+import akka.actor.ActorSystem;
+import com.typesafe.config.Config;
+import org.jboss.netty.channel.ChannelException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -102,7 +101,7 @@ public class AkkaRpcServiceUtils {
 			throw new Exception("Could not create TaskManager actor system", t);
 		}
 
-		final Time timeout = Time.milliseconds(AkkaUtils.getTimeout(configuration).toMillis());
+		final Time timeout = AkkaUtils.getTimeoutAsTime(configuration);
 		return new AkkaRpcService(actorSystem, timeout);
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/748610b0/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
index da700c0..57ca9d4 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
@@ -609,9 +609,14 @@ object AkkaUtils {
   }
 
   def getTimeoutAsTime(config: Configuration): Time = {
-    val duration = Duration(config.getString(AkkaOptions.ASK_TIMEOUT))
+    try {
+      val duration = Duration(config.getString(AkkaOptions.ASK_TIMEOUT))
 
-    Time.milliseconds(duration.toMillis)
+      Time.milliseconds(duration.toMillis)
+    } catch {
+      case _: NumberFormatException =>
+        throw new IllegalConfigurationException(AkkaUtils.formatDurationParsingErrorMessage)
+    }
   }
 
   def getDefaultTimeout: Time = {

http://git-wip-us.apache.org/repos/asf/flink/blob/748610b0/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
index 552c0f0..d7dc017 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
@@ -61,6 +61,7 @@ import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguratio
 import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
 import org.apache.flink.runtime.jobmanager.OnCompletionActions;
 import org.apache.flink.runtime.jobmaster.factories.UnregisteredJobManagerJobMetricGroupFactory;
+import org.apache.flink.runtime.jobmaster.slotpool.DefaultSlotPoolFactory;
 import org.apache.flink.runtime.leaderretrieval.SettableLeaderRetrievalService;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.registration.RegistrationResponse;
@@ -212,9 +213,12 @@ public class JobMasterTest extends TestLogger {
 		rpcService.registerGateway(taskExecutorGateway.getAddress(), taskExecutorGateway);
 
 		final JobManagerSharedServices jobManagerSharedServices = new TestingJobManagerSharedServicesBuilder().build();
-		final JobMasterConfiguration jobMasterConfiguration = JobMasterConfiguration.fromConfiguration(configuration);
 
-		final JobMaster jobMaster = createJobMaster(jobMasterConfiguration, jobGraph, haServices, jobManagerSharedServices);
+		final JobMaster jobMaster = createJobMaster(
+			configuration,
+			jobGraph,
+			haServices,
+			jobManagerSharedServices);
 
 		CompletableFuture<Acknowledge> startFuture = jobMaster.start(jobMasterId, testingTimeout);
 
@@ -277,9 +281,12 @@ public class JobMasterTest extends TestLogger {
 		rpcService.registerGateway(resourceManagerAddress, resourceManagerGateway);
 
 		final JobManagerSharedServices jobManagerSharedServices = new TestingJobManagerSharedServicesBuilder().build();
-		final JobMasterConfiguration jobMasterConfiguration = JobMasterConfiguration.fromConfiguration(configuration);
 
-		final JobMaster jobMaster = createJobMaster(jobMasterConfiguration, jobGraph, haServices, jobManagerSharedServices);
+		final JobMaster jobMaster = createJobMaster(
+			configuration,
+			jobGraph,
+			haServices,
+			jobManagerSharedServices);
 
 		CompletableFuture<Acknowledge> startFuture = jobMaster.start(jobMasterId, testingTimeout);
 
@@ -333,7 +340,7 @@ public class JobMasterTest extends TestLogger {
 		final TestingCheckpointRecoveryFactory testingCheckpointRecoveryFactory = new TestingCheckpointRecoveryFactory(completedCheckpointStore, new StandaloneCheckpointIDCounter());
 		haServices.setCheckpointRecoveryFactory(testingCheckpointRecoveryFactory);
 		final JobMaster jobMaster = createJobMaster(
-			JobMasterConfiguration.fromConfiguration(configuration),
+			configuration,
 			jobGraph,
 			haServices,
 			new TestingJobManagerSharedServicesBuilder().build());
@@ -382,8 +389,9 @@ public class JobMasterTest extends TestLogger {
 		completedCheckpointStore.addCheckpoint(completedCheckpoint);
 		final TestingCheckpointRecoveryFactory testingCheckpointRecoveryFactory = new TestingCheckpointRecoveryFactory(completedCheckpointStore, new StandaloneCheckpointIDCounter());
 		haServices.setCheckpointRecoveryFactory(testingCheckpointRecoveryFactory);
+
 		final JobMaster jobMaster = createJobMaster(
-			JobMasterConfiguration.fromConfiguration(configuration),
+			configuration,
 			jobGraph,
 			haServices,
 			new TestingJobManagerSharedServicesBuilder().build());
@@ -412,7 +420,7 @@ public class JobMasterTest extends TestLogger {
 		configuration.setLong(JobManagerOptions.SLOT_REQUEST_TIMEOUT, slotRequestTimeout);
 
 		final JobMaster jobMaster = createJobMaster(
-			JobMasterConfiguration.fromConfiguration(configuration),
+			configuration,
 			restartingJobGraph,
 			haServices,
 			new TestingJobManagerSharedServicesBuilder().build(),
@@ -496,7 +504,7 @@ public class JobMasterTest extends TestLogger {
 	@Test
 	public void testCloseUnestablishedResourceManagerConnection() throws Exception {
 		final JobMaster jobMaster = createJobMaster(
-			JobMasterConfiguration.fromConfiguration(configuration),
+			configuration,
 			jobGraph,
 			haServices,
 			new TestingJobManagerSharedServicesBuilder().build());
@@ -544,7 +552,7 @@ public class JobMasterTest extends TestLogger {
 	@Test
 	public void testReconnectionAfterDisconnect() throws Exception {
 		final JobMaster jobMaster = createJobMaster(
-			JobMasterConfiguration.fromConfiguration(configuration),
+			configuration,
 			jobGraph,
 			haServices,
 			new TestingJobManagerSharedServicesBuilder().build());
@@ -588,7 +596,7 @@ public class JobMasterTest extends TestLogger {
 	@Test
 	public void testResourceManagerConnectionAfterRegainingLeadership() throws Exception {
 		final JobMaster jobMaster = createJobMaster(
-			JobMasterConfiguration.fromConfiguration(configuration),
+			configuration,
 			jobGraph,
 			haServices,
 			new TestingJobManagerSharedServicesBuilder().build());
@@ -633,7 +641,7 @@ public class JobMasterTest extends TestLogger {
 	public void testRequestPartitionState() throws Exception {
 		final JobGraph producerConsumerJobGraph = producerConsumerJobGraph();
 		final JobMaster jobMaster = createJobMaster(
-			JobMasterConfiguration.fromConfiguration(configuration),
+			configuration,
 			producerConsumerJobGraph,
 			haServices,
 			new TestingJobManagerSharedServicesBuilder().build(),
@@ -746,12 +754,12 @@ public class JobMasterTest extends TestLogger {
 
 	@Nonnull
 	private JobMaster createJobMaster(
-			JobMasterConfiguration jobMasterConfiguration,
+			Configuration configuration,
 			JobGraph jobGraph,
 			HighAvailabilityServices highAvailabilityServices,
 			JobManagerSharedServices jobManagerSharedServices) throws Exception {
 		return createJobMaster(
-			jobMasterConfiguration,
+			configuration,
 			jobGraph,
 			highAvailabilityServices,
 			jobManagerSharedServices,
@@ -760,17 +768,21 @@ public class JobMasterTest extends TestLogger {
 
 	@Nonnull
 	private JobMaster createJobMaster(
-		JobMasterConfiguration jobMasterConfiguration,
-		JobGraph jobGraph,
-		HighAvailabilityServices highAvailabilityServices,
-		JobManagerSharedServices jobManagerSharedServices,
-		HeartbeatServices heartbeatServices) throws Exception {
+			Configuration configuration,
+			JobGraph jobGraph,
+			HighAvailabilityServices highAvailabilityServices,
+			JobManagerSharedServices jobManagerSharedServices,
+			HeartbeatServices heartbeatServices) throws Exception {
+
+		final JobMasterConfiguration jobMasterConfiguration = JobMasterConfiguration.fromConfiguration(configuration);
+
 		return new JobMaster(
 			rpcService,
 			jobMasterConfiguration,
 			jmResourceId,
 			jobGraph,
 			highAvailabilityServices,
+			DefaultSlotPoolFactory.fromConfiguration(configuration, rpcService),
 			jobManagerSharedServices,
 			heartbeatServices,
 			blobServer,