You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/10/26 08:22:57 UTC

flink git commit: [FLINK-7920] Make MiniClusterConfiguration immutable

Repository: flink
Updated Branches:
  refs/heads/master 3039df809 -> a9743eb68


[FLINK-7920] Make MiniClusterConfiguration immutable

This closes #4905.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a9743eb6
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a9743eb6
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a9743eb6

Branch: refs/heads/master
Commit: a9743eb6850809784930c1199741aa83ae54e99a
Parents: 3039df8
Author: Till Rohrmann <tr...@apache.org>
Authored: Wed Oct 25 17:31:45 2017 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Thu Oct 26 10:22:22 2017 +0200

----------------------------------------------------------------------
 .../flink/runtime/minicluster/MiniCluster.java  |  84 ++-----
 .../minicluster/MiniClusterConfiguration.java   | 227 +++++++++----------
 .../runtime/minicluster/MiniClusterITCase.java  |  20 +-
 .../Flip6LocalStreamEnvironment.java            |   8 +-
 4 files changed, 134 insertions(+), 205 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/a9743eb6/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
index f293a01..dd352bb 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
@@ -18,12 +18,9 @@
 
 package org.apache.flink.runtime.minicluster;
 
-import akka.actor.ActorSystem;
-
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.UnmodifiableConfiguration;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.blob.BlobServer;
 import org.apache.flink.runtime.client.JobExecutionException;
@@ -47,6 +44,7 @@ import org.apache.flink.runtime.taskexecutor.TaskExecutor;
 import org.apache.flink.runtime.taskexecutor.TaskManagerRunner;
 import org.apache.flink.util.ExceptionUtils;
 
+import akka.actor.ActorSystem;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -67,7 +65,7 @@ public class MiniCluster {
 	private final Object lock = new Object();
 
 	/** The configuration for this mini cluster */
-	private final MiniClusterConfiguration config;
+	private final MiniClusterConfiguration miniClusterConfiguration;
 
 	@GuardedBy("lock") 
 	private MetricRegistry metricRegistry;
@@ -107,48 +105,12 @@ public class MiniCluster {
 	// ------------------------------------------------------------------------
 
 	/**
-	 * Creates a new mini cluster with the default configuration:
-	 * <ul>
-	 *     <li>One JobManager</li>
-	 *     <li>One TaskManager</li>
-	 *     <li>One task slot in the TaskManager</li>
-	 *     <li>All components share the same RPC subsystem (minimizes communication overhead)</li>
-	 * </ul>
-	 */
-	public MiniCluster() {
-		this(new MiniClusterConfiguration());
-	}
-
-	/**
 	 * Creates a new Flink mini cluster based on the given configuration.
 	 * 
-	 * @param config The configuration for the mini cluster
+	 * @param miniClusterConfiguration The configuration for the mini cluster
 	 */
-	public MiniCluster(MiniClusterConfiguration config) {
-		this.config = checkNotNull(config, "config may not be null");
-	}
-
-	/**
-	 * Creates a mini cluster based on the given configuration.
-	 * 
-	 * @deprecated Use {@link #MiniCluster(MiniClusterConfiguration)} instead. 
-	 * @see #MiniCluster(MiniClusterConfiguration)
-	 */
-	@Deprecated
-	public MiniCluster(Configuration config) {
-		this(createConfig(config, true));
-	}
-
-	/**
-	 * Creates a mini cluster based on the given configuration, starting one or more
-	 * RPC services, depending on the given flag.
-	 *
-	 * @deprecated Use {@link #MiniCluster(MiniClusterConfiguration)} instead. 
-	 * @see #MiniCluster(MiniClusterConfiguration)
-	 */
-	@Deprecated
-	public MiniCluster(Configuration config, boolean singleRpcService) {
-		this(createConfig(config, singleRpcService));
+	public MiniCluster(MiniClusterConfiguration miniClusterConfiguration) {
+		this.miniClusterConfiguration = checkNotNull(miniClusterConfiguration, "config may not be null");
 
 		running = false;
 	}
@@ -175,14 +137,14 @@ public class MiniCluster {
 			checkState(!running, "FlinkMiniCluster is already running");
 
 			LOG.info("Starting Flink Mini Cluster");
-			LOG.debug("Using configuration {}", config);
+			LOG.debug("Using configuration {}", miniClusterConfiguration);
 
-			final Configuration configuration = new UnmodifiableConfiguration(config.generateConfiguration());
-			final Time rpcTimeout = config.getRpcTimeout();
-			final int numJobManagers = config.getNumJobManagers();
-			final int numTaskManagers = config.getNumTaskManagers();
-			final int numResourceManagers = config.getNumResourceManagers();
-			final boolean singleRpc = config.getUseSingleRpcSystem();
+			final Configuration configuration = miniClusterConfiguration.getConfiguration();
+			final Time rpcTimeout = miniClusterConfiguration.getRpcTimeout();
+			final int numJobManagers = miniClusterConfiguration.getNumJobManagers();
+			final int numTaskManagers = miniClusterConfiguration.getNumTaskManagers();
+			final int numResourceManagers = miniClusterConfiguration.getNumResourceManagers();
+			final boolean useSingleRpcService = miniClusterConfiguration.getRpcServiceSharing() == MiniClusterConfiguration.RpcServiceSharing.SHARED;
 
 			try {
 				LOG.info("Starting Metrics Registry");
@@ -198,7 +160,7 @@ public class MiniCluster {
 				// we always need the 'commonRpcService' for auxiliary calls
 				commonRpcService = createRpcService(configuration, rpcTimeout, false, null);
 
-				if (singleRpc) {
+				if (useSingleRpcService) {
 					// set that same RPC service for all JobManagers and TaskManagers
 					for (int i = 0; i < numJobManagers; i++) {
 						jobManagerRpcServices[i] = commonRpcService;
@@ -216,9 +178,9 @@ public class MiniCluster {
 				}
 				else {
 					// start a new service per component, possibly with custom bind addresses
-					final String jobManagerBindAddress = config.getJobManagerBindAddress();
-					final String taskManagerBindAddress = config.getTaskManagerBindAddress();
-					final String resourceManagerBindAddress = config.getResourceManagerBindAddress();
+					final String jobManagerBindAddress = miniClusterConfiguration.getJobManagerBindAddress();
+					final String taskManagerBindAddress = miniClusterConfiguration.getTaskManagerBindAddress();
+					final String resourceManagerBindAddress = miniClusterConfiguration.getResourceManagerBindAddress();
 
 					for (int i = 0; i < numJobManagers; i++) {
 						jobManagerRpcServices[i] = createRpcService(
@@ -625,20 +587,6 @@ public class MiniCluster {
 		return priorException;
 	}
 
-	private static MiniClusterConfiguration createConfig(Configuration cfg, boolean singleRpcService) {
-		MiniClusterConfiguration config = cfg == null ?
-				new MiniClusterConfiguration() :
-				new MiniClusterConfiguration(cfg);
-
-		if (singleRpcService) {
-			config.setUseSingleRpcService();
-		} else {
-			config.setUseRpcServicePerComponent();
-		}
-
-		return config;
-	}
-
 	private class TerminatingFatalErrorHandler implements FatalErrorHandler {
 
 		private final int index;

http://git-wip-us.apache.org/repos/asf/flink/blob/a9743eb6/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java
index aa9b0c2..52e037c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java
@@ -22,97 +22,60 @@ import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.JobManagerOptions;
-import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.configuration.UnmodifiableConfiguration;
 import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.taskexecutor.TaskManagerServices;
-import org.apache.flink.runtime.util.EnvironmentInformation;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
 import scala.concurrent.duration.FiniteDuration;
 
-import static org.apache.flink.util.Preconditions.checkArgument;
-import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.runtime.minicluster.MiniClusterConfiguration.RpcServiceSharing.SHARED;
 
+/**
+ * Configuration object for the {@link MiniCluster}.
+ */
 public class MiniClusterConfiguration {
 
-	private final Configuration config;
+	private final UnmodifiableConfiguration configuration;
 
-	private boolean singleRpcService = true;
+	private final int numJobManagers;
 
-	private int numJobManagers = 1;
+	private final int numTaskManagers;
 
-	private int numTaskManagers = 1;
+	private final int numResourceManagers;
 
-	private int numResourceManagers = 1;
+	private final RpcServiceSharing rpcServiceSharing;
 
-	private String commonBindAddress;
-
-	private long managedMemoryPerTaskManager = -1;
+	@Nullable
+	private final String commonBindAddress;
 
 	// ------------------------------------------------------------------------
 	//  Construction
 	// ------------------------------------------------------------------------
 
-	public MiniClusterConfiguration() {
-		this.config = new Configuration();
-	}
-
-	public MiniClusterConfiguration(Configuration config) {
-		checkNotNull(config);
-		this.config = new Configuration(config);
-	}
-
-	// ------------------------------------------------------------------------
-	//  setters
-	// ------------------------------------------------------------------------
-
-	public void addConfiguration(Configuration config) {
-		checkNotNull(config, "configuration must not be null");
-		this.config.addAll(config);
-	}
+	public MiniClusterConfiguration(
+			Configuration configuration,
+			int numJobManagers,
+			int numTaskManagers,
+			int numResourceManagers,
+			RpcServiceSharing rpcServiceSharing,
+			@Nullable String commonBindAddress) {
 
-	public void setUseSingleRpcService() {
-		this.singleRpcService = true;
-	}
-
-	public void setUseRpcServicePerComponent() {
-		this.singleRpcService = false;
-	}
-
-	public void setNumJobManagers(int numJobManagers) {
-		checkArgument(numJobManagers >= 1, "must have at least one JobManager");
+		this.configuration = new UnmodifiableConfiguration(Preconditions.checkNotNull(configuration));
 		this.numJobManagers = numJobManagers;
-	}
-
-	public void setNumTaskManagers(int numTaskManagers) {
-		checkArgument(numTaskManagers >= 1, "must have at least one TaskManager");
 		this.numTaskManagers = numTaskManagers;
-	}
-
-	public void setNumResourceManagers(int numResourceManagers) {
-		checkArgument(numResourceManagers >= 1, "must have at least one ResourceManager");
 		this.numResourceManagers = numResourceManagers;
-	}
-
-	public void setNumTaskManagerSlots(int numTaskSlots) {
-		checkArgument(numTaskSlots >= 1, "must have at least one task slot per TaskManager");
-		this.config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numTaskSlots);
-	}
-
-	public void setCommonRpcBindAddress(String bindAddress) {
-		checkNotNull(bindAddress, "bind address must not be null");
-		this.commonBindAddress = bindAddress;
-	}
-
-	public void setManagedMemoryPerTaskManager(long managedMemoryPerTaskManager) {
-		checkArgument(managedMemoryPerTaskManager > 0, "must have more than 0 MB of memory for the TaskManager.");
-		this.managedMemoryPerTaskManager = managedMemoryPerTaskManager;
+		this.rpcServiceSharing = Preconditions.checkNotNull(rpcServiceSharing);
+		this.commonBindAddress = commonBindAddress;
 	}
 
 	// ------------------------------------------------------------------------
 	//  getters
 	// ------------------------------------------------------------------------
 
-	public boolean getUseSingleRpcSystem() {
-		return singleRpcService;
+	public RpcServiceSharing getRpcServiceSharing() {
+		return rpcServiceSharing;
 	}
 
 	public int getNumJobManagers() {
@@ -127,107 +90,121 @@ public class MiniClusterConfiguration {
 		return numResourceManagers;
 	}
 
-	public int getNumSlotsPerTaskManager() {
-		return config.getInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1);
-	}
-
 	public String getJobManagerBindAddress() {
 		return commonBindAddress != null ?
 				commonBindAddress :
-				config.getString(JobManagerOptions.ADDRESS, "localhost");
+				configuration.getString(JobManagerOptions.ADDRESS, "localhost");
 	}
 
 	public String getTaskManagerBindAddress() {
 		return commonBindAddress != null ?
 				commonBindAddress :
-				config.getString(ConfigConstants.TASK_MANAGER_HOSTNAME_KEY, "localhost");
+				configuration.getString(ConfigConstants.TASK_MANAGER_HOSTNAME_KEY, "localhost");
 	}
 
 	public String getResourceManagerBindAddress() {
 		return commonBindAddress != null ?
 			commonBindAddress :
-			config.getString(JobManagerOptions.ADDRESS, "localhost"); // TODO: Introduce proper configuration constant for the resource manager hostname
+			configuration.getString(JobManagerOptions.ADDRESS, "localhost"); // TODO: Introduce proper configuration constant for the resource manager hostname
 	}
 
 	public Time getRpcTimeout() {
-		FiniteDuration duration = AkkaUtils.getTimeout(config);
+		FiniteDuration duration = AkkaUtils.getTimeout(configuration);
 		return Time.of(duration.length(), duration.unit());
 	}
 
-	public long getManagedMemoryPerTaskManager() {
-		return getOrCalculateManagedMemoryPerTaskManager();
-	}
-
-	// ------------------------------------------------------------------------
-	//  utils
-	// ------------------------------------------------------------------------
-
-	public Configuration generateConfiguration() {
-		Configuration newConfiguration = new Configuration(config);
-		// set the memory
-		long memory = getOrCalculateManagedMemoryPerTaskManager();
-		newConfiguration.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, memory);
-
-		return newConfiguration;
+	public UnmodifiableConfiguration getConfiguration() {
+		return configuration;
 	}
 
 	@Override
 	public String toString() {
 		return "MiniClusterConfiguration {" +
-				"singleRpcService=" + singleRpcService +
+				"singleRpcService=" + rpcServiceSharing +
 				", numJobManagers=" + numJobManagers +
 				", numTaskManagers=" + numTaskManagers +
 				", numResourceManagers=" + numResourceManagers +
 				", commonBindAddress='" + commonBindAddress + '\'' +
-				", config=" + config +
+				", config=" + configuration +
 				'}';
 	}
 
+	// ----------------------------------------------------------------------------------
+	// Enums
+	// ----------------------------------------------------------------------------------
+
 	/**
-	 * Get or calculate the managed memory per task manager. The memory is calculated in the
-	 * following order:
-	 *
-	 * 1. Return {@link #managedMemoryPerTaskManager} if set
-	 * 2. Return config.getInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY) if set
-	 * 3. Distribute the available free memory equally among all components (JMs, RMs and TMs) and
-	 * calculate the managed memory from the share of memory for a single task manager.
-	 *
-	 * @return size of managed memory per task manager (in megabytes)
+	 * Enum which defines whether the mini cluster components use a shared RpcService
+	 * or whether every component gets its own dedicated RpcService started.
 	 */
-	private long getOrCalculateManagedMemoryPerTaskManager() {
-		if (managedMemoryPerTaskManager == -1) {
-			// no memory set in the mini cluster configuration
+	public enum RpcServiceSharing {
+		SHARED, // a single shared rpc service
+		DEDICATED // every component gets his own dedicated rpc service
+	}
 
-			long memorySize = config.getLong(TaskManagerOptions.MANAGED_MEMORY_SIZE);
+	// ----------------------------------------------------------------------------------
+	// Builder
+	// ----------------------------------------------------------------------------------
 
-			// we could probably use config.contains() but the previous implementation compared to
-			// the default (-1) thus allowing the user to explicitly specify this as well
-			// -> don't change this behaviour now
-			if (memorySize == TaskManagerOptions.MANAGED_MEMORY_SIZE.defaultValue()) {
-				// no memory set in the flink configuration
-				// share the available memory among all running components
+	/**
+	 * Builder for the MiniClusterConfiguration.
+	 */
+	public static class Builder {
+		private Configuration configuration = new Configuration();
+		private int numJobManagers = 1;
+		private int numTaskManagers = 1;
+		private int numSlotsPerTaskManager = 1;
+		private int numResourceManagers = 1;
+		private RpcServiceSharing rpcServiceSharing = SHARED;
+		@Nullable
+		private String commonBindAddress = null;
+
+		public Builder setConfiguration(Configuration configuration1) {
+			this.configuration = Preconditions.checkNotNull(configuration1);
+			return this;
+		}
 
-				float memoryFraction = config.getFloat(TaskManagerOptions.MANAGED_MEMORY_FRACTION);
+		public Builder setNumJobManagers(int numJobManagers) {
+			this.numJobManagers = numJobManagers;
+			return this;
+		}
 
-				long freeMemory = EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag();
+		public Builder setNumTaskManagers(int numTaskManagers) {
+			this.numTaskManagers = numTaskManagers;
+			return this;
+		}
 
-				// we assign each component the same amount of free memory
-				// (might be a bit of overkill for the JMs and RMs)
-				long memoryPerComponent = freeMemory / (numTaskManagers + numResourceManagers + numJobManagers);
+		public Builder setNumSlotsPerTaskManager(int numSlotsPerTaskManager) {
+			this.numSlotsPerTaskManager = numSlotsPerTaskManager;
+			return this;
+		}
 
-				// subtract the network buffer memory
-				long networkBuffersMemory = TaskManagerServices.calculateNetworkBufferMemory(memoryPerComponent, config);
-				long memoryMinusNetworkBuffers = memoryPerComponent - networkBuffersMemory;
+		public Builder setNumResourceManagers(int numResourceManagers) {
+			this.numResourceManagers = numResourceManagers;
+			return this;
+		}
 
-				// calculate the managed memory size
-				long managedMemoryBytes = (long) (memoryMinusNetworkBuffers * memoryFraction);
+		public Builder setRpcServiceSharing(RpcServiceSharing rpcServiceSharing) {
+			this.rpcServiceSharing = Preconditions.checkNotNull(rpcServiceSharing);
+			return this;
+		}
+
+		public Builder setCommonBindAddress(String commonBindAddress) {
+			this.commonBindAddress = commonBindAddress;
+			return this;
+		}
 
-				return managedMemoryBytes >> 20; // bytes to megabytes
-			} else {
-				return memorySize;
-			}
-		} else {
-			return managedMemoryPerTaskManager;
+		public MiniClusterConfiguration build() {
+			final Configuration modifiedConfiguration = new Configuration(configuration);
+			modifiedConfiguration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlotsPerTaskManager);
+
+			return new MiniClusterConfiguration(
+				modifiedConfiguration,
+				numJobManagers,
+				numTaskManagers,
+				numResourceManagers,
+				rpcServiceSharing,
+				commonBindAddress);
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a9743eb6/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java
index f90367c..8ca1329 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java
@@ -40,13 +40,13 @@ public class MiniClusterITCase extends TestLogger {
 	//  Simple Job Running Tests
 	// ------------------------------------------------------------------------
 
+	private static final MiniClusterConfiguration defaultConfiguration = null;
+
 	@Test
 	public void runJobWithSingleRpcService() throws Exception {
-		MiniClusterConfiguration cfg = new MiniClusterConfiguration();
-
-		// should be the default, but set anyways to make sure the test
-		// stays valid when the default changes
-		cfg.setUseSingleRpcService();
+		MiniClusterConfiguration cfg = new MiniClusterConfiguration.Builder()
+			.setRpcServiceSharing(MiniClusterConfiguration.RpcServiceSharing.SHARED)
+			.build();
 
 		MiniCluster miniCluster = new MiniCluster(cfg);
 		try {
@@ -60,8 +60,9 @@ public class MiniClusterITCase extends TestLogger {
 
 	@Test
 	public void runJobWithMultipleRpcServices() throws Exception {
-		MiniClusterConfiguration cfg = new MiniClusterConfiguration();
-		cfg.setUseRpcServicePerComponent();
+		MiniClusterConfiguration cfg = new MiniClusterConfiguration.Builder()
+			.setRpcServiceSharing(MiniClusterConfiguration.RpcServiceSharing.DEDICATED)
+			.build();
 
 		MiniCluster miniCluster = new MiniCluster(cfg);
 		try {
@@ -75,8 +76,9 @@ public class MiniClusterITCase extends TestLogger {
 
 	@Test
 	public void runJobWithMultipleJobManagers() throws Exception {
-		MiniClusterConfiguration cfg = new MiniClusterConfiguration();
-		cfg.setNumJobManagers(3);
+		MiniClusterConfiguration cfg = new MiniClusterConfiguration.Builder()
+		.setNumJobManagers(3)
+		.build();
 
 		MiniCluster miniCluster = new MiniCluster(cfg);
 		try {

http://git-wip-us.apache.org/repos/asf/flink/blob/a9743eb6/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/Flip6LocalStreamEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/Flip6LocalStreamEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/Flip6LocalStreamEnvironment.java
index cebd15f..e276ac7 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/Flip6LocalStreamEnvironment.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/Flip6LocalStreamEnvironment.java
@@ -99,15 +99,17 @@ public class Flip6LocalStreamEnvironment extends StreamExecutionEnvironment {
 		// add (and override) the settings with what the user defined
 		configuration.addAll(this.conf);
 
-		MiniClusterConfiguration cfg = new MiniClusterConfiguration(configuration);
-
 		// Currently we do not reuse slot anymore,
 		// so we need to sum up the parallelism of all vertices
 		int slotsCount = 0;
 		for (JobVertex jobVertex : jobGraph.getVertices()) {
 			slotsCount += jobVertex.getParallelism();
 		}
-		cfg.setNumTaskManagerSlots(slotsCount);
+
+		MiniClusterConfiguration cfg = new MiniClusterConfiguration.Builder()
+			.setConfiguration(configuration)
+			.setNumSlotsPerTaskManager(slotsCount)
+			.build();
 
 		if (LOG.isInfoEnabled()) {
 			LOG.info("Running job on local embedded Flink mini cluster");