You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/08/20 17:28:21 UTC
flink git commit: [FLINK-7269] Refactor passing of dynamic properties
Repository: flink
Updated Branches:
refs/heads/master 40656c5df -> bd70a0001
[FLINK-7269] Refactor passing of dynamic properties
This closes #4415.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/bd70a000
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/bd70a000
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/bd70a000
Branch: refs/heads/master
Commit: bd70a00019f2c9fc01653d0229308635529aad73
Parents: 40656c5
Author: zjureel <zj...@gmail.com>
Authored: Fri Jul 28 14:53:39 2017 +0800
Committer: Till Rohrmann <tr...@apache.org>
Committed: Sat Aug 19 18:42:14 2017 +0200
----------------------------------------------------------------------
.../flink/configuration/Configuration.java | 2 +-
.../configuration/GlobalConfiguration.java | 60 ++++++++++++--------
.../mesos/entrypoint/MesosEntrypointUtils.java | 22 -------
.../entrypoint/MesosJobClusterEntrypoint.java | 14 +++--
.../MesosSessionClusterEntrypoint.java | 14 +++--
.../entrypoint/MesosTaskExecutorRunner.java | 6 +-
.../MesosApplicationMasterRunner.java | 3 +-
.../MesosTaskManagerRunner.java | 3 +-
8 files changed, 64 insertions(+), 60 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/bd70a000/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java b/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java
index d6f1dec..dfcd04f 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java
@@ -79,7 +79,7 @@ public class Configuration extends ExecutionConfig.GlobalJobParameters
}
// --------------------------------------------------------------------------------------------
-
+
/**
* Returns the class associated with the given key as a string.
*
http://git-wip-us.apache.org/repos/asf/flink/blob/bd70a000/flink-core/src/main/java/org/apache/flink/configuration/GlobalConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/GlobalConfiguration.java b/flink-core/src/main/java/org/apache/flink/configuration/GlobalConfiguration.java
index ea9f8bf..4569ebe 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/GlobalConfiguration.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/GlobalConfiguration.java
@@ -28,6 +28,8 @@ import org.apache.flink.annotation.Internal;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.annotation.Nullable;
+
/**
* Global configuration object for Flink. Similar to Java properties configuration
* objects it includes key-value pairs which represent the framework's configuration.
@@ -46,24 +48,6 @@ public final class GlobalConfiguration {
// --------------------------------------------------------------------------------------------
- private static Configuration dynamicProperties = null;
-
- /**
- * Set the process-wide dynamic properties to be merged with the loaded configuration.
- */
- public static void setDynamicProperties(Configuration dynamicProperties) {
- GlobalConfiguration.dynamicProperties = new Configuration(dynamicProperties);
- }
-
- /**
- * Get the dynamic properties.
- */
- public static Configuration getDynamicProperties() {
- return GlobalConfiguration.dynamicProperties;
- }
-
- // --------------------------------------------------------------------------------------------
-
/**
* Loads the global configuration from the environment. Fails if an error occurs during loading. Returns an
* empty configuration object if the environment variable is not set. In production this variable is set but
@@ -76,18 +60,30 @@ public final class GlobalConfiguration {
if (configDir == null) {
return new Configuration();
}
- return loadConfiguration(configDir);
+ return loadConfiguration(configDir, null);
}
/**
* Loads the configuration files from the specified directory.
* <p>
* YAML files are supported as configuration files.
- *
+ *
* @param configDir
* the directory which contains the configuration files
*/
public static Configuration loadConfiguration(final String configDir) {
+ return loadConfiguration(configDir, null);
+ }
+
+ /**
+ * Loads the configuration files from the specified directory. If the dynamic properties
+ * configuration is not null, then it is added to the loaded configuration.
+ *
+ * @param configDir directory to load the configuration from
+ * @param dynamicProperties configuration file containing the dynamic properties. Null if none.
+ * @return The configuration loaded from the given configuration directory
+ */
+ public static Configuration loadConfiguration(final String configDir, @Nullable final Configuration dynamicProperties) {
if (configDir == null) {
throw new IllegalArgumentException("Given configuration directory is null, cannot load configuration");
@@ -109,13 +105,29 @@ public final class GlobalConfiguration {
"' (" + confDirFile.getAbsolutePath() + ") does not exist.");
}
- Configuration conf = loadYAMLResource(yamlConfigFile);
+ Configuration configuration = loadYAMLResource(yamlConfigFile);
- if(dynamicProperties != null) {
- conf.addAll(dynamicProperties);
+ if (dynamicProperties != null) {
+ configuration.addAll(dynamicProperties);
+ }
+
+ return configuration;
+ }
+
+ /**
+ * Loads the global configuration and adds the given dynamic properties
+ * configuration.
+ *
+ * @param dynamicProperties The given dynamic properties
+ * @return Returns the loaded global configuration with dynamic properties
+ */
+ public static Configuration loadConfigurationWithDynamicProperties(Configuration dynamicProperties) {
+ final String configDir = System.getenv(ConfigConstants.ENV_FLINK_CONF_DIR);
+ if (configDir == null) {
+ return new Configuration(dynamicProperties);
}
- return conf;
+ return loadConfiguration(configDir, dynamicProperties);
}
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/bd70a000/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosEntrypointUtils.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosEntrypointUtils.java b/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosEntrypointUtils.java
index 0d81ead..368d62d 100755
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosEntrypointUtils.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosEntrypointUtils.java
@@ -19,12 +19,10 @@
package org.apache.flink.mesos.entrypoint;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.mesos.configuration.MesosOptions;
import org.apache.flink.mesos.runtime.clusterframework.MesosTaskManagerParameters;
import org.apache.flink.mesos.util.MesosConfiguration;
-import org.apache.flink.runtime.clusterframework.BootstrapTools;
import org.apache.flink.runtime.clusterframework.ContainerSpecification;
import org.apache.flink.runtime.clusterframework.overlays.CompositeContainerOverlay;
import org.apache.flink.runtime.clusterframework.overlays.FlinkDistributionOverlay;
@@ -34,7 +32,6 @@ import org.apache.flink.runtime.clusterframework.overlays.KeytabOverlay;
import org.apache.flink.runtime.clusterframework.overlays.Krb5ConfOverlay;
import org.apache.flink.runtime.clusterframework.overlays.SSLStoreOverlay;
-import org.apache.commons.cli.CommandLine;
import org.apache.mesos.Protos;
import org.slf4j.Logger;
@@ -50,25 +47,6 @@ import scala.concurrent.duration.FiniteDuration;
public class MesosEntrypointUtils {
/**
- * Loads the global configuration and adds the dynamic properties parsed from
- * the given command line.
- *
- * @param cmd command line to parse for dynamic properties
- * @return Global configuration with dynamic properties set
- * @deprecated replace once FLINK-7269 has been merged
- */
- @Deprecated
- public static Configuration loadConfiguration(CommandLine cmd) {
-
- // merge the dynamic properties from the command-line
- Configuration dynamicProperties = BootstrapTools.parseDynamicProperties(cmd);
- GlobalConfiguration.setDynamicProperties(dynamicProperties);
- Configuration config = GlobalConfiguration.loadConfiguration();
-
- return config;
- }
-
- /**
* Loads and validates the Mesos scheduler configuration.
* @param flinkConfig the global configuration.
* @param hostname the hostname to advertise to the Mesos master.
http://git-wip-us.apache.org/repos/asf/flink/blob/bd70a000/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosJobClusterEntrypoint.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosJobClusterEntrypoint.java b/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosJobClusterEntrypoint.java
index 890c4a7..ba3b51d 100755
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosJobClusterEntrypoint.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosJobClusterEntrypoint.java
@@ -43,6 +43,7 @@ import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.Preconditions;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
@@ -74,6 +75,8 @@ public class MesosJobClusterEntrypoint extends JobClusterEntrypoint {
.addOption(BootstrapTools.newDynamicPropertiesOption());
}
+ private final Configuration dynamicProperties;
+
private MesosConfiguration schedulerConfiguration;
private MesosServices mesosServices;
@@ -82,8 +85,10 @@ public class MesosJobClusterEntrypoint extends JobClusterEntrypoint {
private ContainerSpecification taskManagerContainerSpec;
- public MesosJobClusterEntrypoint(Configuration config) {
+ public MesosJobClusterEntrypoint(Configuration config, Configuration dynamicProperties) {
super(config);
+
+ this.dynamicProperties = Preconditions.checkNotNull(dynamicProperties);
}
@Override
@@ -100,7 +105,7 @@ public class MesosJobClusterEntrypoint extends JobClusterEntrypoint {
// TM configuration
taskManagerParameters = MesosEntrypointUtils.createTmParameters(config, LOG);
- taskManagerContainerSpec = MesosEntrypointUtils.createContainerSpec(config, GlobalConfiguration.getDynamicProperties());
+ taskManagerContainerSpec = MesosEntrypointUtils.createContainerSpec(config, dynamicProperties);
}
@Override
@@ -195,9 +200,10 @@ public class MesosJobClusterEntrypoint extends JobClusterEntrypoint {
return;
}
- Configuration configuration = MesosEntrypointUtils.loadConfiguration(cmd);
+ Configuration dynamicProperties = BootstrapTools.parseDynamicProperties(cmd);
+ Configuration configuration = GlobalConfiguration.loadConfigurationWithDynamicProperties(dynamicProperties);
- MesosJobClusterEntrypoint clusterEntrypoint = new MesosJobClusterEntrypoint(configuration);
+ MesosJobClusterEntrypoint clusterEntrypoint = new MesosJobClusterEntrypoint(configuration, dynamicProperties);
clusterEntrypoint.startCluster();
}
http://git-wip-us.apache.org/repos/asf/flink/blob/bd70a000/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosSessionClusterEntrypoint.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosSessionClusterEntrypoint.java b/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosSessionClusterEntrypoint.java
index 67f5899..0ee2680 100755
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosSessionClusterEntrypoint.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosSessionClusterEntrypoint.java
@@ -41,6 +41,7 @@ import org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServicesCo
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.Preconditions;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
@@ -64,6 +65,8 @@ public class MesosSessionClusterEntrypoint extends SessionClusterEntrypoint {
.addOption(BootstrapTools.newDynamicPropertiesOption());
}
+ private final Configuration dynamicProperties;
+
private MesosConfiguration mesosConfig;
private MesosServices mesosServices;
@@ -72,8 +75,10 @@ public class MesosSessionClusterEntrypoint extends SessionClusterEntrypoint {
private ContainerSpecification taskManagerContainerSpec;
- public MesosSessionClusterEntrypoint(Configuration config) {
+ public MesosSessionClusterEntrypoint(Configuration config, Configuration dynamicProperties) {
super(config);
+
+ this.dynamicProperties = Preconditions.checkNotNull(dynamicProperties);
}
@Override
@@ -90,7 +95,7 @@ public class MesosSessionClusterEntrypoint extends SessionClusterEntrypoint {
// TM configuration
taskManagerParameters = MesosEntrypointUtils.createTmParameters(config, LOG);
- taskManagerContainerSpec = MesosEntrypointUtils.createContainerSpec(config, GlobalConfiguration.getDynamicProperties());
+ taskManagerContainerSpec = MesosEntrypointUtils.createContainerSpec(config, dynamicProperties);
}
@Override
@@ -169,9 +174,10 @@ public class MesosSessionClusterEntrypoint extends SessionClusterEntrypoint {
return;
}
- Configuration configuration = MesosEntrypointUtils.loadConfiguration(cmd);
+ Configuration dynamicProperties = BootstrapTools.parseDynamicProperties(cmd);
+ Configuration configuration = GlobalConfiguration.loadConfigurationWithDynamicProperties(dynamicProperties);
- MesosSessionClusterEntrypoint clusterEntrypoint = new MesosSessionClusterEntrypoint(configuration);
+ MesosSessionClusterEntrypoint clusterEntrypoint = new MesosSessionClusterEntrypoint(configuration, dynamicProperties);
clusterEntrypoint.startCluster();
}
http://git-wip-us.apache.org/repos/asf/flink/blob/bd70a000/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosTaskExecutorRunner.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosTaskExecutorRunner.java b/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosTaskExecutorRunner.java
index c4343d2..897e26d 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosTaskExecutorRunner.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosTaskExecutorRunner.java
@@ -21,6 +21,7 @@ package org.apache.flink.mesos.entrypoint;
import org.apache.flink.configuration.AkkaOptions;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.mesos.runtime.clusterframework.MesosConfigKeys;
import org.apache.flink.runtime.clusterframework.BootstrapTools;
@@ -71,7 +72,10 @@ public class MesosTaskExecutorRunner {
final Configuration configuration;
try {
- configuration = MesosEntrypointUtils.loadConfiguration(cmd);
+ Configuration dynamicProperties = BootstrapTools.parseDynamicProperties(cmd);
+ LOG.debug("Mesos dynamic properties: {}", dynamicProperties);
+
+ configuration = GlobalConfiguration.loadConfigurationWithDynamicProperties(dynamicProperties);
}
catch (Throwable t) {
LOG.error("Failed to load the TaskManager configuration and dynamic properties.", t);
http://git-wip-us.apache.org/repos/asf/flink/blob/bd70a000/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
index 3d16a66..c0a6855 100755
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
@@ -152,8 +152,7 @@ public class MesosApplicationMasterRunner {
CommandLine cmd = parser.parse(ALL_OPTIONS, args);
final Configuration dynamicProperties = BootstrapTools.parseDynamicProperties(cmd);
- GlobalConfiguration.setDynamicProperties(dynamicProperties);
- final Configuration config = GlobalConfiguration.loadConfiguration();
+ final Configuration config = GlobalConfiguration.loadConfigurationWithDynamicProperties(dynamicProperties);
// configure the default filesystem
try {
http://git-wip-us.apache.org/repos/asf/flink/blob/bd70a000/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerRunner.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerRunner.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerRunner.java
index e1b0efa..4236a43 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerRunner.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerRunner.java
@@ -73,10 +73,9 @@ public class MesosTaskManagerRunner {
final Configuration configuration;
try {
final Configuration dynamicProperties = BootstrapTools.parseDynamicProperties(cmd);
- GlobalConfiguration.setDynamicProperties(dynamicProperties);
LOG.debug("Mesos dynamic properties: {}", dynamicProperties);
- configuration = GlobalConfiguration.loadConfiguration();
+ configuration = GlobalConfiguration.loadConfigurationWithDynamicProperties(dynamicProperties);
}
catch (Throwable t) {
LOG.error("Failed to load the TaskManager configuration and dynamic properties.", t);