You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/07/14 05:52:26 UTC

[06/12] flink git commit: [FLINK-9821] Forward dynamic properties to configuration in TaskManagerRunner

[FLINK-9821] Forward dynamic properties to configuration in TaskManagerRunner

With this commit we can use dynamic properties to overwrite configuration values in the
TaskManagerRunner.

This closes #6318.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/740f2fbf
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/740f2fbf
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/740f2fbf

Branch: refs/heads/master
Commit: 740f2fbf2e65fa988c6a577989ccd8923729be45
Parents: 2fbbf8e
Author: Till Rohrmann <tr...@apache.org>
Authored: Tue Jul 10 23:43:34 2018 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Fri Jul 13 18:02:06 2018 +0200

----------------------------------------------------------------------
 .../src/main/flink-bin/bin/taskmanager.sh       | 10 ++++---
 .../runtime/taskexecutor/TaskManagerRunner.java | 29 ++++++++++++++++----
 2 files changed, 29 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/740f2fbf/flink-dist/src/main/flink-bin/bin/taskmanager.sh
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/bin/taskmanager.sh b/flink-dist/src/main/flink-bin/bin/taskmanager.sh
index 771d53f..0d70f34 100755
--- a/flink-dist/src/main/flink-bin/bin/taskmanager.sh
+++ b/flink-dist/src/main/flink-bin/bin/taskmanager.sh
@@ -22,6 +22,8 @@ USAGE="Usage: taskmanager.sh (start|start-foreground|stop|stop-all)"
 
 STARTSTOP=$1
 
+ARGS=("${@:2}")
+
 if [[ $STARTSTOP != "start" ]] && [[ $STARTSTOP != "start-foreground" ]] && [[ $STARTSTOP != "stop" ]] && [[ $STARTSTOP != "stop-all" ]]; then
   echo $USAGE
   exit 1
@@ -72,15 +74,15 @@ if [[ $STARTSTOP == "start" ]] || [[ $STARTSTOP == "start-foreground" ]]; then
     export FLINK_ENV_JAVA_OPTS="${FLINK_ENV_JAVA_OPTS} ${FLINK_ENV_JAVA_OPTS_TM}"
 
     # Startup parameters
-    args=("--configDir" "${FLINK_CONF_DIR}")
+    ARGS+=("--configDir" "${FLINK_CONF_DIR}")
 fi
 
 if [[ $STARTSTOP == "start-foreground" ]]; then
-    exec "${FLINK_BIN_DIR}"/flink-console.sh $TYPE "${args[@]}"
+    exec "${FLINK_BIN_DIR}"/flink-console.sh $TYPE "${ARGS[@]}"
 else
     if [[ $FLINK_TM_COMPUTE_NUMA == "false" ]]; then
         # Start a single TaskManager
-        "${FLINK_BIN_DIR}"/flink-daemon.sh $STARTSTOP $TYPE "${args[@]}"
+        "${FLINK_BIN_DIR}"/flink-daemon.sh $STARTSTOP $TYPE "${ARGS[@]}"
     else
         # Example output from `numactl --show` on an AWS c4.8xlarge:
         # policy: default
@@ -92,7 +94,7 @@ else
         read -ra NODE_LIST <<< $(numactl --show | grep "^nodebind: ")
         for NODE_ID in "${NODE_LIST[@]:1}"; do
             # Start a TaskManager for each NUMA node
-            numactl --membind=$NODE_ID --cpunodebind=$NODE_ID -- "${FLINK_BIN_DIR}"/flink-daemon.sh $STARTSTOP $TYPE "${args[@]}"
+            numactl --membind=$NODE_ID --cpunodebind=$NODE_ID -- "${FLINK_BIN_DIR}"/flink-daemon.sh $STARTSTOP $TYPE "${ARGS[@]}"
         done
     fi
 fi

http://git-wip-us.apache.org/repos/asf/flink/blob/740f2fbf/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
index 23fd29e..91c5704 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
@@ -19,9 +19,9 @@
 package org.apache.flink.runtime.taskexecutor;
 
 import org.apache.flink.api.common.time.Time;
-import org.apache.flink.api.java.utils.ParameterTool;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ConfigurationUtils;
 import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.core.fs.FileSystem;
@@ -29,6 +29,10 @@ import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.blob.BlobCacheService;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.entrypoint.ClusterConfiguration;
+import org.apache.flink.runtime.entrypoint.ClusterConfigurationParserFactory;
+import org.apache.flink.runtime.entrypoint.FlinkParseException;
+import org.apache.flink.runtime.entrypoint.parser.CommandLineParser;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
@@ -274,11 +278,7 @@ public class TaskManagerRunner implements FatalErrorHandler, AutoCloseableAsync
 			LOG.info("Cannot determine the maximum number of open file descriptors");
 		}
 
-		ParameterTool parameterTool = ParameterTool.fromArgs(args);
-
-		final String configDir = parameterTool.get("configDir");
-
-		final Configuration configuration = GlobalConfiguration.loadConfiguration(configDir);
+		final Configuration configuration = loadConfiguration(args);
 
 		try {
 			FileSystem.initialize(configuration);
@@ -303,6 +303,23 @@ public class TaskManagerRunner implements FatalErrorHandler, AutoCloseableAsync
 		}
 	}
 
+	private static Configuration loadConfiguration(String[] args) throws FlinkParseException {
+		final CommandLineParser<ClusterConfiguration> commandLineParser = new CommandLineParser<>(new ClusterConfigurationParserFactory());
+
+		final ClusterConfiguration clusterConfiguration;
+
+		try {
+			clusterConfiguration = commandLineParser.parse(args);
+		} catch (FlinkParseException e) {
+			LOG.error("Could not parse the command line options.", e);
+			commandLineParser.printHelp();
+			throw e;
+		}
+
+		final Configuration dynamicProperties = ConfigurationUtils.createConfiguration(clusterConfiguration.getDynamicProperties());
+		return GlobalConfiguration.loadConfiguration(clusterConfiguration.getConfigDir(), dynamicProperties);
+	}
+
 	public static void runTaskManager(Configuration configuration, ResourceID resourceId) throws Exception {
 		final TaskManagerRunner taskManagerRunner = new TaskManagerRunner(configuration, resourceId);