You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/08/20 17:28:21 UTC

flink git commit: [FLINK-7269] Refactor passing of dynamic properties

Repository: flink
Updated Branches:
  refs/heads/master 40656c5df -> bd70a0001


[FLINK-7269] Refactor passing of dynamic properties

This closes #4415.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/bd70a000
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/bd70a000
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/bd70a000

Branch: refs/heads/master
Commit: bd70a00019f2c9fc01653d0229308635529aad73
Parents: 40656c5
Author: zjureel <zj...@gmail.com>
Authored: Fri Jul 28 14:53:39 2017 +0800
Committer: Till Rohrmann <tr...@apache.org>
Committed: Sat Aug 19 18:42:14 2017 +0200

----------------------------------------------------------------------
 .../flink/configuration/Configuration.java      |  2 +-
 .../configuration/GlobalConfiguration.java      | 60 ++++++++++++--------
 .../mesos/entrypoint/MesosEntrypointUtils.java  | 22 -------
 .../entrypoint/MesosJobClusterEntrypoint.java   | 14 +++--
 .../MesosSessionClusterEntrypoint.java          | 14 +++--
 .../entrypoint/MesosTaskExecutorRunner.java     |  6 +-
 .../MesosApplicationMasterRunner.java           |  3 +-
 .../MesosTaskManagerRunner.java                 |  3 +-
 8 files changed, 64 insertions(+), 60 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/bd70a000/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java b/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java
index d6f1dec..dfcd04f 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java
@@ -79,7 +79,7 @@ public class Configuration extends ExecutionConfig.GlobalJobParameters
 	}
 	
 	// --------------------------------------------------------------------------------------------
-	
+
 	/**
 	 * Returns the class associated with the given key as a string.
 	 * 

http://git-wip-us.apache.org/repos/asf/flink/blob/bd70a000/flink-core/src/main/java/org/apache/flink/configuration/GlobalConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/GlobalConfiguration.java b/flink-core/src/main/java/org/apache/flink/configuration/GlobalConfiguration.java
index ea9f8bf..4569ebe 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/GlobalConfiguration.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/GlobalConfiguration.java
@@ -28,6 +28,8 @@ import org.apache.flink.annotation.Internal;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nullable;
+
 /**
  * Global configuration object for Flink. Similar to Java properties configuration
  * objects it includes key-value pairs which represent the framework's configuration.
@@ -46,24 +48,6 @@ public final class GlobalConfiguration {
 
 	// --------------------------------------------------------------------------------------------
 
-	private static Configuration dynamicProperties = null;
-
-	/**
-	 * Set the process-wide dynamic properties to be merged with the loaded configuration.
-     */
-	public static void setDynamicProperties(Configuration dynamicProperties) {
-		GlobalConfiguration.dynamicProperties = new Configuration(dynamicProperties);
-	}
-
-	/**
-	 * Get the dynamic properties.
-     */
-	public static Configuration getDynamicProperties() {
-		return GlobalConfiguration.dynamicProperties;
-	}
-
-	// --------------------------------------------------------------------------------------------
-
 	/**
 	 * Loads the global configuration from the environment. Fails if an error occurs during loading. Returns an
 	 * empty configuration object if the environment variable is not set. In production this variable is set but
@@ -76,18 +60,30 @@ public final class GlobalConfiguration {
 		if (configDir == null) {
 			return new Configuration();
 		}
-		return loadConfiguration(configDir);
+		return loadConfiguration(configDir, null);
 	}
 
 	/**
 	 * Loads the configuration files from the specified directory.
 	 * <p>
 	 * YAML files are supported as configuration files.
-	 * 
+	 *
 	 * @param configDir
 	 *        the directory which contains the configuration files
 	 */
 	public static Configuration loadConfiguration(final String configDir) {
+		return loadConfiguration(configDir, null);
+	}
+
+	/**
+	 * Loads the configuration files from the specified directory. If the dynamic properties
+	 * configuration is not null, then it is added to the loaded configuration.
+	 *
+	 * @param configDir directory to load the configuration from
+	 * @param dynamicProperties configuration file containing the dynamic properties. Null if none.
+	 * @return The configuration loaded from the given configuration directory
+	 */
+	public static Configuration loadConfiguration(final String configDir, @Nullable final Configuration dynamicProperties) {
 
 		if (configDir == null) {
 			throw new IllegalArgumentException("Given configuration directory is null, cannot load configuration");
@@ -109,13 +105,29 @@ public final class GlobalConfiguration {
 					"' (" + confDirFile.getAbsolutePath() + ") does not exist.");
 		}
 
-		Configuration conf = loadYAMLResource(yamlConfigFile);
+		Configuration configuration = loadYAMLResource(yamlConfigFile);
 
-		if(dynamicProperties != null) {
-			conf.addAll(dynamicProperties);
+		if (dynamicProperties != null) {
+			configuration.addAll(dynamicProperties);
+		}
+
+		return configuration;
+	}
+
+	/**
+	 * Loads the global configuration and adds the given dynamic properties
+	 * configuration.
+	 *
+	 * @param dynamicProperties The given dynamic properties
+	 * @return Returns the loaded global configuration with dynamic properties
+	 */
+	public static Configuration loadConfigurationWithDynamicProperties(Configuration dynamicProperties) {
+		final String configDir = System.getenv(ConfigConstants.ENV_FLINK_CONF_DIR);
+		if (configDir == null) {
+			return new Configuration(dynamicProperties);
 		}
 
-		return conf;
+		return loadConfiguration(configDir, dynamicProperties);
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/bd70a000/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosEntrypointUtils.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosEntrypointUtils.java b/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosEntrypointUtils.java
index 0d81ead..368d62d 100755
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosEntrypointUtils.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosEntrypointUtils.java
@@ -19,12 +19,10 @@
 package org.apache.flink.mesos.entrypoint;
 
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.mesos.configuration.MesosOptions;
 import org.apache.flink.mesos.runtime.clusterframework.MesosTaskManagerParameters;
 import org.apache.flink.mesos.util.MesosConfiguration;
-import org.apache.flink.runtime.clusterframework.BootstrapTools;
 import org.apache.flink.runtime.clusterframework.ContainerSpecification;
 import org.apache.flink.runtime.clusterframework.overlays.CompositeContainerOverlay;
 import org.apache.flink.runtime.clusterframework.overlays.FlinkDistributionOverlay;
@@ -34,7 +32,6 @@ import org.apache.flink.runtime.clusterframework.overlays.KeytabOverlay;
 import org.apache.flink.runtime.clusterframework.overlays.Krb5ConfOverlay;
 import org.apache.flink.runtime.clusterframework.overlays.SSLStoreOverlay;
 
-import org.apache.commons.cli.CommandLine;
 import org.apache.mesos.Protos;
 import org.slf4j.Logger;
 
@@ -50,25 +47,6 @@ import scala.concurrent.duration.FiniteDuration;
 public class MesosEntrypointUtils {
 
 	/**
-	 * Loads the global configuration and adds the dynamic properties parsed from
-	 * the given command line.
-	 *
-	 * @param cmd command line to parse for dynamic properties
-	 * @return Global configuration with dynamic properties set
-	 * @deprecated replace once FLINK-7269 has been merged
-	 */
-	@Deprecated
-	public static Configuration loadConfiguration(CommandLine cmd) {
-
-		// merge the dynamic properties from the command-line
-		Configuration dynamicProperties = BootstrapTools.parseDynamicProperties(cmd);
-		GlobalConfiguration.setDynamicProperties(dynamicProperties);
-		Configuration config = GlobalConfiguration.loadConfiguration();
-
-		return config;
-	}
-
-	/**
 	 * Loads and validates the Mesos scheduler configuration.
 	 * @param flinkConfig the global configuration.
 	 * @param hostname the hostname to advertise to the Mesos master.

http://git-wip-us.apache.org/repos/asf/flink/blob/bd70a000/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosJobClusterEntrypoint.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosJobClusterEntrypoint.java b/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosJobClusterEntrypoint.java
index 890c4a7..ba3b51d 100755
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosJobClusterEntrypoint.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosJobClusterEntrypoint.java
@@ -43,6 +43,7 @@ import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.Preconditions;
 
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.CommandLineParser;
@@ -74,6 +75,8 @@ public class MesosJobClusterEntrypoint extends JobClusterEntrypoint {
 				.addOption(BootstrapTools.newDynamicPropertiesOption());
 	}
 
+	private final Configuration dynamicProperties;
+
 	private MesosConfiguration schedulerConfiguration;
 
 	private MesosServices mesosServices;
@@ -82,8 +85,10 @@ public class MesosJobClusterEntrypoint extends JobClusterEntrypoint {
 
 	private ContainerSpecification taskManagerContainerSpec;
 
-	public MesosJobClusterEntrypoint(Configuration config) {
+	public MesosJobClusterEntrypoint(Configuration config, Configuration dynamicProperties) {
 		super(config);
+
+		this.dynamicProperties = Preconditions.checkNotNull(dynamicProperties);
 	}
 
 	@Override
@@ -100,7 +105,7 @@ public class MesosJobClusterEntrypoint extends JobClusterEntrypoint {
 
 		// TM configuration
 		taskManagerParameters = MesosEntrypointUtils.createTmParameters(config, LOG);
-		taskManagerContainerSpec = MesosEntrypointUtils.createContainerSpec(config, GlobalConfiguration.getDynamicProperties());
+		taskManagerContainerSpec = MesosEntrypointUtils.createContainerSpec(config, dynamicProperties);
 	}
 
 	@Override
@@ -195,9 +200,10 @@ public class MesosJobClusterEntrypoint extends JobClusterEntrypoint {
 			return;
 		}
 
-		Configuration configuration = MesosEntrypointUtils.loadConfiguration(cmd);
+		Configuration dynamicProperties = BootstrapTools.parseDynamicProperties(cmd);
+		Configuration configuration = GlobalConfiguration.loadConfigurationWithDynamicProperties(dynamicProperties);
 
-		MesosJobClusterEntrypoint clusterEntrypoint = new MesosJobClusterEntrypoint(configuration);
+		MesosJobClusterEntrypoint clusterEntrypoint = new MesosJobClusterEntrypoint(configuration, dynamicProperties);
 
 		clusterEntrypoint.startCluster();
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/bd70a000/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosSessionClusterEntrypoint.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosSessionClusterEntrypoint.java b/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosSessionClusterEntrypoint.java
index 67f5899..0ee2680 100755
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosSessionClusterEntrypoint.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosSessionClusterEntrypoint.java
@@ -41,6 +41,7 @@ import org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServicesCo
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.Preconditions;
 
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.CommandLineParser;
@@ -64,6 +65,8 @@ public class MesosSessionClusterEntrypoint extends SessionClusterEntrypoint {
 				.addOption(BootstrapTools.newDynamicPropertiesOption());
 	}
 
+	private final Configuration dynamicProperties;
+
 	private MesosConfiguration mesosConfig;
 
 	private MesosServices mesosServices;
@@ -72,8 +75,10 @@ public class MesosSessionClusterEntrypoint extends SessionClusterEntrypoint {
 
 	private ContainerSpecification taskManagerContainerSpec;
 
-	public MesosSessionClusterEntrypoint(Configuration config) {
+	public MesosSessionClusterEntrypoint(Configuration config, Configuration dynamicProperties) {
 		super(config);
+
+		this.dynamicProperties = Preconditions.checkNotNull(dynamicProperties);
 	}
 
 	@Override
@@ -90,7 +95,7 @@ public class MesosSessionClusterEntrypoint extends SessionClusterEntrypoint {
 
 		// TM configuration
 		taskManagerParameters = MesosEntrypointUtils.createTmParameters(config, LOG);
-		taskManagerContainerSpec = MesosEntrypointUtils.createContainerSpec(config, GlobalConfiguration.getDynamicProperties());
+		taskManagerContainerSpec = MesosEntrypointUtils.createContainerSpec(config, dynamicProperties);
 	}
 
 	@Override
@@ -169,9 +174,10 @@ public class MesosSessionClusterEntrypoint extends SessionClusterEntrypoint {
 			return;
 		}
 
-		Configuration configuration = MesosEntrypointUtils.loadConfiguration(cmd);
+		Configuration dynamicProperties = BootstrapTools.parseDynamicProperties(cmd);
+		Configuration configuration = GlobalConfiguration.loadConfigurationWithDynamicProperties(dynamicProperties);
 
-		MesosSessionClusterEntrypoint clusterEntrypoint = new MesosSessionClusterEntrypoint(configuration);
+		MesosSessionClusterEntrypoint clusterEntrypoint = new MesosSessionClusterEntrypoint(configuration, dynamicProperties);
 
 		clusterEntrypoint.startCluster();
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/bd70a000/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosTaskExecutorRunner.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosTaskExecutorRunner.java b/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosTaskExecutorRunner.java
index c4343d2..897e26d 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosTaskExecutorRunner.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosTaskExecutorRunner.java
@@ -21,6 +21,7 @@ package org.apache.flink.mesos.entrypoint;
 import org.apache.flink.configuration.AkkaOptions;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.mesos.runtime.clusterframework.MesosConfigKeys;
 import org.apache.flink.runtime.clusterframework.BootstrapTools;
@@ -71,7 +72,10 @@ public class MesosTaskExecutorRunner {
 
 		final Configuration configuration;
 		try {
-			configuration = MesosEntrypointUtils.loadConfiguration(cmd);
+			Configuration dynamicProperties = BootstrapTools.parseDynamicProperties(cmd);
+			LOG.debug("Mesos dynamic properties: {}", dynamicProperties);
+
+			configuration = GlobalConfiguration.loadConfigurationWithDynamicProperties(dynamicProperties);
 		}
 		catch (Throwable t) {
 			LOG.error("Failed to load the TaskManager configuration and dynamic properties.", t);

http://git-wip-us.apache.org/repos/asf/flink/blob/bd70a000/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
index 3d16a66..c0a6855 100755
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
@@ -152,8 +152,7 @@ public class MesosApplicationMasterRunner {
 			CommandLine cmd = parser.parse(ALL_OPTIONS, args);
 
 			final Configuration dynamicProperties = BootstrapTools.parseDynamicProperties(cmd);
-			GlobalConfiguration.setDynamicProperties(dynamicProperties);
-			final Configuration config = GlobalConfiguration.loadConfiguration();
+			final Configuration config = GlobalConfiguration.loadConfigurationWithDynamicProperties(dynamicProperties);
 
 			// configure the default filesystem
 			try {

http://git-wip-us.apache.org/repos/asf/flink/blob/bd70a000/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerRunner.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerRunner.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerRunner.java
index e1b0efa..4236a43 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerRunner.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerRunner.java
@@ -73,10 +73,9 @@ public class MesosTaskManagerRunner {
 		final Configuration configuration;
 		try {
 			final Configuration dynamicProperties = BootstrapTools.parseDynamicProperties(cmd);
-			GlobalConfiguration.setDynamicProperties(dynamicProperties);
 			LOG.debug("Mesos dynamic properties: {}", dynamicProperties);
 
-			configuration = GlobalConfiguration.loadConfiguration();
+			configuration = GlobalConfiguration.loadConfigurationWithDynamicProperties(dynamicProperties);
 		}
 		catch (Throwable t) {
 			LOG.error("Failed to load the TaskManager configuration and dynamic properties.", t);