You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/10/26 08:22:57 UTC
flink git commit: [FLINK-7920] Make MiniClusterConfiguration immutable
Repository: flink
Updated Branches:
refs/heads/master 3039df809 -> a9743eb68
[FLINK-7920] Make MiniClusterConfiguration immutable
This closes #4905.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a9743eb6
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a9743eb6
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a9743eb6
Branch: refs/heads/master
Commit: a9743eb6850809784930c1199741aa83ae54e99a
Parents: 3039df8
Author: Till Rohrmann <tr...@apache.org>
Authored: Wed Oct 25 17:31:45 2017 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Thu Oct 26 10:22:22 2017 +0200
----------------------------------------------------------------------
.../flink/runtime/minicluster/MiniCluster.java | 84 ++-----
.../minicluster/MiniClusterConfiguration.java | 227 +++++++++----------
.../runtime/minicluster/MiniClusterITCase.java | 20 +-
.../Flip6LocalStreamEnvironment.java | 8 +-
4 files changed, 134 insertions(+), 205 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/a9743eb6/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
index f293a01..dd352bb 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
@@ -18,12 +18,9 @@
package org.apache.flink.runtime.minicluster;
-import akka.actor.ActorSystem;
-
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.UnmodifiableConfiguration;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.blob.BlobServer;
import org.apache.flink.runtime.client.JobExecutionException;
@@ -47,6 +44,7 @@ import org.apache.flink.runtime.taskexecutor.TaskExecutor;
import org.apache.flink.runtime.taskexecutor.TaskManagerRunner;
import org.apache.flink.util.ExceptionUtils;
+import akka.actor.ActorSystem;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -67,7 +65,7 @@ public class MiniCluster {
private final Object lock = new Object();
/** The configuration for this mini cluster */
- private final MiniClusterConfiguration config;
+ private final MiniClusterConfiguration miniClusterConfiguration;
@GuardedBy("lock")
private MetricRegistry metricRegistry;
@@ -107,48 +105,12 @@ public class MiniCluster {
// ------------------------------------------------------------------------
/**
- * Creates a new mini cluster with the default configuration:
- * <ul>
- * <li>One JobManager</li>
- * <li>One TaskManager</li>
- * <li>One task slot in the TaskManager</li>
- * <li>All components share the same RPC subsystem (minimizes communication overhead)</li>
- * </ul>
- */
- public MiniCluster() {
- this(new MiniClusterConfiguration());
- }
-
- /**
* Creates a new Flink mini cluster based on the given configuration.
*
- * @param config The configuration for the mini cluster
+ * @param miniClusterConfiguration The configuration for the mini cluster
*/
- public MiniCluster(MiniClusterConfiguration config) {
- this.config = checkNotNull(config, "config may not be null");
- }
-
- /**
- * Creates a mini cluster based on the given configuration.
- *
- * @deprecated Use {@link #MiniCluster(MiniClusterConfiguration)} instead.
- * @see #MiniCluster(MiniClusterConfiguration)
- */
- @Deprecated
- public MiniCluster(Configuration config) {
- this(createConfig(config, true));
- }
-
- /**
- * Creates a mini cluster based on the given configuration, starting one or more
- * RPC services, depending on the given flag.
- *
- * @deprecated Use {@link #MiniCluster(MiniClusterConfiguration)} instead.
- * @see #MiniCluster(MiniClusterConfiguration)
- */
- @Deprecated
- public MiniCluster(Configuration config, boolean singleRpcService) {
- this(createConfig(config, singleRpcService));
+ public MiniCluster(MiniClusterConfiguration miniClusterConfiguration) {
+ this.miniClusterConfiguration = checkNotNull(miniClusterConfiguration, "config may not be null");
running = false;
}
@@ -175,14 +137,14 @@ public class MiniCluster {
checkState(!running, "FlinkMiniCluster is already running");
LOG.info("Starting Flink Mini Cluster");
- LOG.debug("Using configuration {}", config);
+ LOG.debug("Using configuration {}", miniClusterConfiguration);
- final Configuration configuration = new UnmodifiableConfiguration(config.generateConfiguration());
- final Time rpcTimeout = config.getRpcTimeout();
- final int numJobManagers = config.getNumJobManagers();
- final int numTaskManagers = config.getNumTaskManagers();
- final int numResourceManagers = config.getNumResourceManagers();
- final boolean singleRpc = config.getUseSingleRpcSystem();
+ final Configuration configuration = miniClusterConfiguration.getConfiguration();
+ final Time rpcTimeout = miniClusterConfiguration.getRpcTimeout();
+ final int numJobManagers = miniClusterConfiguration.getNumJobManagers();
+ final int numTaskManagers = miniClusterConfiguration.getNumTaskManagers();
+ final int numResourceManagers = miniClusterConfiguration.getNumResourceManagers();
+ final boolean useSingleRpcService = miniClusterConfiguration.getRpcServiceSharing() == MiniClusterConfiguration.RpcServiceSharing.SHARED;
try {
LOG.info("Starting Metrics Registry");
@@ -198,7 +160,7 @@ public class MiniCluster {
// we always need the 'commonRpcService' for auxiliary calls
commonRpcService = createRpcService(configuration, rpcTimeout, false, null);
- if (singleRpc) {
+ if (useSingleRpcService) {
// set that same RPC service for all JobManagers and TaskManagers
for (int i = 0; i < numJobManagers; i++) {
jobManagerRpcServices[i] = commonRpcService;
@@ -216,9 +178,9 @@ public class MiniCluster {
}
else {
// start a new service per component, possibly with custom bind addresses
- final String jobManagerBindAddress = config.getJobManagerBindAddress();
- final String taskManagerBindAddress = config.getTaskManagerBindAddress();
- final String resourceManagerBindAddress = config.getResourceManagerBindAddress();
+ final String jobManagerBindAddress = miniClusterConfiguration.getJobManagerBindAddress();
+ final String taskManagerBindAddress = miniClusterConfiguration.getTaskManagerBindAddress();
+ final String resourceManagerBindAddress = miniClusterConfiguration.getResourceManagerBindAddress();
for (int i = 0; i < numJobManagers; i++) {
jobManagerRpcServices[i] = createRpcService(
@@ -625,20 +587,6 @@ public class MiniCluster {
return priorException;
}
- private static MiniClusterConfiguration createConfig(Configuration cfg, boolean singleRpcService) {
- MiniClusterConfiguration config = cfg == null ?
- new MiniClusterConfiguration() :
- new MiniClusterConfiguration(cfg);
-
- if (singleRpcService) {
- config.setUseSingleRpcService();
- } else {
- config.setUseRpcServicePerComponent();
- }
-
- return config;
- }
-
private class TerminatingFatalErrorHandler implements FatalErrorHandler {
private final int index;
http://git-wip-us.apache.org/repos/asf/flink/blob/a9743eb6/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java
index aa9b0c2..52e037c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java
@@ -22,97 +22,60 @@ import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
-import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.configuration.UnmodifiableConfiguration;
import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.taskexecutor.TaskManagerServices;
-import org.apache.flink.runtime.util.EnvironmentInformation;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
import scala.concurrent.duration.FiniteDuration;
-import static org.apache.flink.util.Preconditions.checkArgument;
-import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.runtime.minicluster.MiniClusterConfiguration.RpcServiceSharing.SHARED;
+/**
+ * Configuration object for the {@link MiniCluster}.
+ */
public class MiniClusterConfiguration {
- private final Configuration config;
+ private final UnmodifiableConfiguration configuration;
- private boolean singleRpcService = true;
+ private final int numJobManagers;
- private int numJobManagers = 1;
+ private final int numTaskManagers;
- private int numTaskManagers = 1;
+ private final int numResourceManagers;
- private int numResourceManagers = 1;
+ private final RpcServiceSharing rpcServiceSharing;
- private String commonBindAddress;
-
- private long managedMemoryPerTaskManager = -1;
+ @Nullable
+ private final String commonBindAddress;
// ------------------------------------------------------------------------
// Construction
// ------------------------------------------------------------------------
- public MiniClusterConfiguration() {
- this.config = new Configuration();
- }
-
- public MiniClusterConfiguration(Configuration config) {
- checkNotNull(config);
- this.config = new Configuration(config);
- }
-
- // ------------------------------------------------------------------------
- // setters
- // ------------------------------------------------------------------------
-
- public void addConfiguration(Configuration config) {
- checkNotNull(config, "configuration must not be null");
- this.config.addAll(config);
- }
+ public MiniClusterConfiguration(
+ Configuration configuration,
+ int numJobManagers,
+ int numTaskManagers,
+ int numResourceManagers,
+ RpcServiceSharing rpcServiceSharing,
+ @Nullable String commonBindAddress) {
- public void setUseSingleRpcService() {
- this.singleRpcService = true;
- }
-
- public void setUseRpcServicePerComponent() {
- this.singleRpcService = false;
- }
-
- public void setNumJobManagers(int numJobManagers) {
- checkArgument(numJobManagers >= 1, "must have at least one JobManager");
+ this.configuration = new UnmodifiableConfiguration(Preconditions.checkNotNull(configuration));
this.numJobManagers = numJobManagers;
- }
-
- public void setNumTaskManagers(int numTaskManagers) {
- checkArgument(numTaskManagers >= 1, "must have at least one TaskManager");
this.numTaskManagers = numTaskManagers;
- }
-
- public void setNumResourceManagers(int numResourceManagers) {
- checkArgument(numResourceManagers >= 1, "must have at least one ResourceManager");
this.numResourceManagers = numResourceManagers;
- }
-
- public void setNumTaskManagerSlots(int numTaskSlots) {
- checkArgument(numTaskSlots >= 1, "must have at least one task slot per TaskManager");
- this.config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numTaskSlots);
- }
-
- public void setCommonRpcBindAddress(String bindAddress) {
- checkNotNull(bindAddress, "bind address must not be null");
- this.commonBindAddress = bindAddress;
- }
-
- public void setManagedMemoryPerTaskManager(long managedMemoryPerTaskManager) {
- checkArgument(managedMemoryPerTaskManager > 0, "must have more than 0 MB of memory for the TaskManager.");
- this.managedMemoryPerTaskManager = managedMemoryPerTaskManager;
+ this.rpcServiceSharing = Preconditions.checkNotNull(rpcServiceSharing);
+ this.commonBindAddress = commonBindAddress;
}
// ------------------------------------------------------------------------
// getters
// ------------------------------------------------------------------------
- public boolean getUseSingleRpcSystem() {
- return singleRpcService;
+ public RpcServiceSharing getRpcServiceSharing() {
+ return rpcServiceSharing;
}
public int getNumJobManagers() {
@@ -127,107 +90,121 @@ public class MiniClusterConfiguration {
return numResourceManagers;
}
- public int getNumSlotsPerTaskManager() {
- return config.getInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1);
- }
-
public String getJobManagerBindAddress() {
return commonBindAddress != null ?
commonBindAddress :
- config.getString(JobManagerOptions.ADDRESS, "localhost");
+ configuration.getString(JobManagerOptions.ADDRESS, "localhost");
}
public String getTaskManagerBindAddress() {
return commonBindAddress != null ?
commonBindAddress :
- config.getString(ConfigConstants.TASK_MANAGER_HOSTNAME_KEY, "localhost");
+ configuration.getString(ConfigConstants.TASK_MANAGER_HOSTNAME_KEY, "localhost");
}
public String getResourceManagerBindAddress() {
return commonBindAddress != null ?
commonBindAddress :
- config.getString(JobManagerOptions.ADDRESS, "localhost"); // TODO: Introduce proper configuration constant for the resource manager hostname
+ configuration.getString(JobManagerOptions.ADDRESS, "localhost"); // TODO: Introduce proper configuration constant for the resource manager hostname
}
public Time getRpcTimeout() {
- FiniteDuration duration = AkkaUtils.getTimeout(config);
+ FiniteDuration duration = AkkaUtils.getTimeout(configuration);
return Time.of(duration.length(), duration.unit());
}
- public long getManagedMemoryPerTaskManager() {
- return getOrCalculateManagedMemoryPerTaskManager();
- }
-
- // ------------------------------------------------------------------------
- // utils
- // ------------------------------------------------------------------------
-
- public Configuration generateConfiguration() {
- Configuration newConfiguration = new Configuration(config);
- // set the memory
- long memory = getOrCalculateManagedMemoryPerTaskManager();
- newConfiguration.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, memory);
-
- return newConfiguration;
+ public UnmodifiableConfiguration getConfiguration() {
+ return configuration;
}
@Override
public String toString() {
return "MiniClusterConfiguration {" +
- "singleRpcService=" + singleRpcService +
+ "singleRpcService=" + rpcServiceSharing +
", numJobManagers=" + numJobManagers +
", numTaskManagers=" + numTaskManagers +
", numResourceManagers=" + numResourceManagers +
", commonBindAddress='" + commonBindAddress + '\'' +
- ", config=" + config +
+ ", config=" + configuration +
'}';
}
+ // ----------------------------------------------------------------------------------
+ // Enums
+ // ----------------------------------------------------------------------------------
+
/**
- * Get or calculate the managed memory per task manager. The memory is calculated in the
- * following order:
- *
- * 1. Return {@link #managedMemoryPerTaskManager} if set
- * 2. Return config.getInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY) if set
- * 3. Distribute the available free memory equally among all components (JMs, RMs and TMs) and
- * calculate the managed memory from the share of memory for a single task manager.
- *
- * @return size of managed memory per task manager (in megabytes)
+ * Enum which defines whether the mini cluster components use a shared RpcService
+ * or whether every component gets its own dedicated RpcService started.
*/
- private long getOrCalculateManagedMemoryPerTaskManager() {
- if (managedMemoryPerTaskManager == -1) {
- // no memory set in the mini cluster configuration
+ public enum RpcServiceSharing {
+ SHARED, // a single shared rpc service
+ DEDICATED // every component gets his own dedicated rpc service
+ }
- long memorySize = config.getLong(TaskManagerOptions.MANAGED_MEMORY_SIZE);
+ // ----------------------------------------------------------------------------------
+ // Builder
+ // ----------------------------------------------------------------------------------
- // we could probably use config.contains() but the previous implementation compared to
- // the default (-1) thus allowing the user to explicitly specify this as well
- // -> don't change this behaviour now
- if (memorySize == TaskManagerOptions.MANAGED_MEMORY_SIZE.defaultValue()) {
- // no memory set in the flink configuration
- // share the available memory among all running components
+ /**
+ * Builder for the MiniClusterConfiguration.
+ */
+ public static class Builder {
+ private Configuration configuration = new Configuration();
+ private int numJobManagers = 1;
+ private int numTaskManagers = 1;
+ private int numSlotsPerTaskManager = 1;
+ private int numResourceManagers = 1;
+ private RpcServiceSharing rpcServiceSharing = SHARED;
+ @Nullable
+ private String commonBindAddress = null;
+
+ public Builder setConfiguration(Configuration configuration1) {
+ this.configuration = Preconditions.checkNotNull(configuration1);
+ return this;
+ }
- float memoryFraction = config.getFloat(TaskManagerOptions.MANAGED_MEMORY_FRACTION);
+ public Builder setNumJobManagers(int numJobManagers) {
+ this.numJobManagers = numJobManagers;
+ return this;
+ }
- long freeMemory = EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag();
+ public Builder setNumTaskManagers(int numTaskManagers) {
+ this.numTaskManagers = numTaskManagers;
+ return this;
+ }
- // we assign each component the same amount of free memory
- // (might be a bit of overkill for the JMs and RMs)
- long memoryPerComponent = freeMemory / (numTaskManagers + numResourceManagers + numJobManagers);
+ public Builder setNumSlotsPerTaskManager(int numSlotsPerTaskManager) {
+ this.numSlotsPerTaskManager = numSlotsPerTaskManager;
+ return this;
+ }
- // subtract the network buffer memory
- long networkBuffersMemory = TaskManagerServices.calculateNetworkBufferMemory(memoryPerComponent, config);
- long memoryMinusNetworkBuffers = memoryPerComponent - networkBuffersMemory;
+ public Builder setNumResourceManagers(int numResourceManagers) {
+ this.numResourceManagers = numResourceManagers;
+ return this;
+ }
- // calculate the managed memory size
- long managedMemoryBytes = (long) (memoryMinusNetworkBuffers * memoryFraction);
+ public Builder setRpcServiceSharing(RpcServiceSharing rpcServiceSharing) {
+ this.rpcServiceSharing = Preconditions.checkNotNull(rpcServiceSharing);
+ return this;
+ }
+
+ public Builder setCommonBindAddress(String commonBindAddress) {
+ this.commonBindAddress = commonBindAddress;
+ return this;
+ }
- return managedMemoryBytes >> 20; // bytes to megabytes
- } else {
- return memorySize;
- }
- } else {
- return managedMemoryPerTaskManager;
+ public MiniClusterConfiguration build() {
+ final Configuration modifiedConfiguration = new Configuration(configuration);
+ modifiedConfiguration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlotsPerTaskManager);
+
+ return new MiniClusterConfiguration(
+ modifiedConfiguration,
+ numJobManagers,
+ numTaskManagers,
+ numResourceManagers,
+ rpcServiceSharing,
+ commonBindAddress);
}
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/a9743eb6/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java
index f90367c..8ca1329 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java
@@ -40,13 +40,13 @@ public class MiniClusterITCase extends TestLogger {
// Simple Job Running Tests
// ------------------------------------------------------------------------
+ private static final MiniClusterConfiguration defaultConfiguration = null;
+
@Test
public void runJobWithSingleRpcService() throws Exception {
- MiniClusterConfiguration cfg = new MiniClusterConfiguration();
-
- // should be the default, but set anyways to make sure the test
- // stays valid when the default changes
- cfg.setUseSingleRpcService();
+ MiniClusterConfiguration cfg = new MiniClusterConfiguration.Builder()
+ .setRpcServiceSharing(MiniClusterConfiguration.RpcServiceSharing.SHARED)
+ .build();
MiniCluster miniCluster = new MiniCluster(cfg);
try {
@@ -60,8 +60,9 @@ public class MiniClusterITCase extends TestLogger {
@Test
public void runJobWithMultipleRpcServices() throws Exception {
- MiniClusterConfiguration cfg = new MiniClusterConfiguration();
- cfg.setUseRpcServicePerComponent();
+ MiniClusterConfiguration cfg = new MiniClusterConfiguration.Builder()
+ .setRpcServiceSharing(MiniClusterConfiguration.RpcServiceSharing.DEDICATED)
+ .build();
MiniCluster miniCluster = new MiniCluster(cfg);
try {
@@ -75,8 +76,9 @@ public class MiniClusterITCase extends TestLogger {
@Test
public void runJobWithMultipleJobManagers() throws Exception {
- MiniClusterConfiguration cfg = new MiniClusterConfiguration();
- cfg.setNumJobManagers(3);
+ MiniClusterConfiguration cfg = new MiniClusterConfiguration.Builder()
+ .setNumJobManagers(3)
+ .build();
MiniCluster miniCluster = new MiniCluster(cfg);
try {
http://git-wip-us.apache.org/repos/asf/flink/blob/a9743eb6/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/Flip6LocalStreamEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/Flip6LocalStreamEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/Flip6LocalStreamEnvironment.java
index cebd15f..e276ac7 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/Flip6LocalStreamEnvironment.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/Flip6LocalStreamEnvironment.java
@@ -99,15 +99,17 @@ public class Flip6LocalStreamEnvironment extends StreamExecutionEnvironment {
// add (and override) the settings with what the user defined
configuration.addAll(this.conf);
- MiniClusterConfiguration cfg = new MiniClusterConfiguration(configuration);
-
// Currently we do not reuse slot anymore,
// so we need to sum up the parallelism of all vertices
int slotsCount = 0;
for (JobVertex jobVertex : jobGraph.getVertices()) {
slotsCount += jobVertex.getParallelism();
}
- cfg.setNumTaskManagerSlots(slotsCount);
+
+ MiniClusterConfiguration cfg = new MiniClusterConfiguration.Builder()
+ .setConfiguration(configuration)
+ .setNumSlotsPerTaskManager(slotsCount)
+ .build();
if (LOG.isInfoEnabled()) {
LOG.info("Running job on local embedded Flink mini cluster");