You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/07/14 05:52:26 UTC
[06/12] flink git commit: [FLINK-9821] Forward dynamic properties to
configuration in TaskManagerRunner
[FLINK-9821] Forward dynamic properties to configuration in TaskManagerRunner
With this commit we can use dynamic properties to overwrite configuration values in the
TaskManagerRunner.
This closes #6318.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/740f2fbf
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/740f2fbf
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/740f2fbf
Branch: refs/heads/master
Commit: 740f2fbf2e65fa988c6a577989ccd8923729be45
Parents: 2fbbf8e
Author: Till Rohrmann <tr...@apache.org>
Authored: Tue Jul 10 23:43:34 2018 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Fri Jul 13 18:02:06 2018 +0200
----------------------------------------------------------------------
.../src/main/flink-bin/bin/taskmanager.sh | 10 ++++---
.../runtime/taskexecutor/TaskManagerRunner.java | 29 ++++++++++++++++----
2 files changed, 29 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/740f2fbf/flink-dist/src/main/flink-bin/bin/taskmanager.sh
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/bin/taskmanager.sh b/flink-dist/src/main/flink-bin/bin/taskmanager.sh
index 771d53f..0d70f34 100755
--- a/flink-dist/src/main/flink-bin/bin/taskmanager.sh
+++ b/flink-dist/src/main/flink-bin/bin/taskmanager.sh
@@ -22,6 +22,8 @@ USAGE="Usage: taskmanager.sh (start|start-foreground|stop|stop-all)"
STARTSTOP=$1
+ARGS=("${@:2}")
+
if [[ $STARTSTOP != "start" ]] && [[ $STARTSTOP != "start-foreground" ]] && [[ $STARTSTOP != "stop" ]] && [[ $STARTSTOP != "stop-all" ]]; then
echo $USAGE
exit 1
@@ -72,15 +74,15 @@ if [[ $STARTSTOP == "start" ]] || [[ $STARTSTOP == "start-foreground" ]]; then
export FLINK_ENV_JAVA_OPTS="${FLINK_ENV_JAVA_OPTS} ${FLINK_ENV_JAVA_OPTS_TM}"
# Startup parameters
- args=("--configDir" "${FLINK_CONF_DIR}")
+ ARGS+=("--configDir" "${FLINK_CONF_DIR}")
fi
if [[ $STARTSTOP == "start-foreground" ]]; then
- exec "${FLINK_BIN_DIR}"/flink-console.sh $TYPE "${args[@]}"
+ exec "${FLINK_BIN_DIR}"/flink-console.sh $TYPE "${ARGS[@]}"
else
if [[ $FLINK_TM_COMPUTE_NUMA == "false" ]]; then
# Start a single TaskManager
- "${FLINK_BIN_DIR}"/flink-daemon.sh $STARTSTOP $TYPE "${args[@]}"
+ "${FLINK_BIN_DIR}"/flink-daemon.sh $STARTSTOP $TYPE "${ARGS[@]}"
else
# Example output from `numactl --show` on an AWS c4.8xlarge:
# policy: default
@@ -92,7 +94,7 @@ else
read -ra NODE_LIST <<< $(numactl --show | grep "^nodebind: ")
for NODE_ID in "${NODE_LIST[@]:1}"; do
# Start a TaskManager for each NUMA node
- numactl --membind=$NODE_ID --cpunodebind=$NODE_ID -- "${FLINK_BIN_DIR}"/flink-daemon.sh $STARTSTOP $TYPE "${args[@]}"
+ numactl --membind=$NODE_ID --cpunodebind=$NODE_ID -- "${FLINK_BIN_DIR}"/flink-daemon.sh $STARTSTOP $TYPE "${ARGS[@]}"
done
fi
fi
http://git-wip-us.apache.org/repos/asf/flink/blob/740f2fbf/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
index 23fd29e..91c5704 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
@@ -19,9 +19,9 @@
package org.apache.flink.runtime.taskexecutor;
import org.apache.flink.api.common.time.Time;
-import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ConfigurationUtils;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.core.fs.FileSystem;
@@ -29,6 +29,10 @@ import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.blob.BlobCacheService;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.entrypoint.ClusterConfiguration;
+import org.apache.flink.runtime.entrypoint.ClusterConfigurationParserFactory;
+import org.apache.flink.runtime.entrypoint.FlinkParseException;
+import org.apache.flink.runtime.entrypoint.parser.CommandLineParser;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
@@ -274,11 +278,7 @@ public class TaskManagerRunner implements FatalErrorHandler, AutoCloseableAsync
LOG.info("Cannot determine the maximum number of open file descriptors");
}
- ParameterTool parameterTool = ParameterTool.fromArgs(args);
-
- final String configDir = parameterTool.get("configDir");
-
- final Configuration configuration = GlobalConfiguration.loadConfiguration(configDir);
+ final Configuration configuration = loadConfiguration(args);
try {
FileSystem.initialize(configuration);
@@ -303,6 +303,23 @@ public class TaskManagerRunner implements FatalErrorHandler, AutoCloseableAsync
}
}
+ private static Configuration loadConfiguration(String[] args) throws FlinkParseException {
+ final CommandLineParser<ClusterConfiguration> commandLineParser = new CommandLineParser<>(new ClusterConfigurationParserFactory());
+
+ final ClusterConfiguration clusterConfiguration;
+
+ try {
+ clusterConfiguration = commandLineParser.parse(args);
+ } catch (FlinkParseException e) {
+ LOG.error("Could not parse the command line options.", e);
+ commandLineParser.printHelp();
+ throw e;
+ }
+
+ final Configuration dynamicProperties = ConfigurationUtils.createConfiguration(clusterConfiguration.getDynamicProperties());
+ return GlobalConfiguration.loadConfiguration(clusterConfiguration.getConfigDir(), dynamicProperties);
+ }
+
public static void runTaskManager(Configuration configuration, ResourceID resourceId) throws Exception {
final TaskManagerRunner taskManagerRunner = new TaskManagerRunner(configuration, resourceId);