You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gr...@apache.org on 2017/02/21 16:54:51 UTC

flink git commit: [FLINK-3163] [scripts] Configure Flink for NUMA systems

Repository: flink
Updated Branches:
  refs/heads/master 0c9c04d19 -> 11c868f91


[FLINK-3163] [scripts] Configure Flink for NUMA systems

Start a TaskManager on each NUMA node on each worker when the new
configuration option 'taskmanager.compute.numa' is enabled.

This closes #3249


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/11c868f9
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/11c868f9
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/11c868f9

Branch: refs/heads/master
Commit: 11c868f91db773af626ac6ac4dcba9820c13fa8a
Parents: 0c9c04d
Author: Greg Hogan <co...@greghogan.com>
Authored: Wed Feb 1 12:13:49 2017 -0500
Committer: Greg Hogan <co...@greghogan.com>
Committed: Tue Feb 21 11:00:22 2017 -0500

----------------------------------------------------------------------
 docs/setup/config.md                            |  4 ++++
 flink-dist/src/main/flink-bin/bin/config.sh     | 13 +++++++++++++
 .../src/main/flink-bin/bin/taskmanager.sh       | 20 +++++++++++++++++++-
 3 files changed, 36 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/11c868f9/docs/setup/config.md
----------------------------------------------------------------------
diff --git a/docs/setup/config.md b/docs/setup/config.md
index c4618da..b21c647 100644
--- a/docs/setup/config.md
+++ b/docs/setup/config.md
@@ -72,6 +72,10 @@ without explicit scheme definition, such as `/user/USERNAME/in.txt`, is going to
 
 ## Advanced Options
 
+### Compute
+
+- `taskmanager.compute.numa`: When enabled a TaskManager is started on each NUMA node for each worker listed in *conf/slaves* (DEFAULT: false). Note: only supported when deploying Flink as a standalone cluster.
+
 ### Managed Memory
 
 By default, Flink allocates a fraction of `0.7` of the total memory configured via `taskmanager.heap.mb` for its managed memory. Managed memory helps Flink to run the batch operators efficiently. It prevents `OutOfMemoryException`s because Flink knows how much memory it can use to execute operations. If Flink runs out of managed memory, it utilizes disk space. Using managed memory, some operations can be performed directly on the raw data without having to deserialize the data to convert it into Java objects. All in all, managed memory improves the robustness and speed of the system.

http://git-wip-us.apache.org/repos/asf/flink/blob/11c868f9/flink-dist/src/main/flink-bin/bin/config.sh
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/bin/config.sh b/flink-dist/src/main/flink-bin/bin/config.sh
index 0f24034..568aba3 100755
--- a/flink-dist/src/main/flink-bin/bin/config.sh
+++ b/flink-dist/src/main/flink-bin/bin/config.sh
@@ -96,6 +96,8 @@ KEY_TASKM_MEM_MANAGED_FRACTION="taskmanager.memory.fraction"
 KEY_TASKM_OFFHEAP="taskmanager.memory.off-heap"
 KEY_TASKM_MEM_PRE_ALLOCATE="taskmanager.memory.preallocate"
 
+KEY_TASKM_COMPUTE_NUMA="taskmanager.compute.numa"
+
 KEY_ENV_PID_DIR="env.pid.dir"
 KEY_ENV_LOG_DIR="env.log.dir"
 KEY_ENV_LOG_MAX="env.log.max"
@@ -217,6 +219,17 @@ if [ -z "${FLINK_TM_MEM_PRE_ALLOCATE}" ]; then
     FLINK_TM_MEM_PRE_ALLOCATE=$(readFromConfig ${KEY_TASKM_MEM_PRE_ALLOCATE} "false" "${YAML_CONF}")
 fi
 
+# Verify that NUMA tooling is available
+command -v numactl >/dev/null 2>&1
+if [[ $? -ne 0 ]]; then
+    FLINK_TM_COMPUTE_NUMA="false"
+else
+    # Define FLINK_TM_COMPUTE_NUMA if it is not already set
+    if [ -z "${FLINK_TM_COMPUTE_NUMA}" ]; then
+        FLINK_TM_COMPUTE_NUMA=$(readFromConfig ${KEY_TASKM_COMPUTE_NUMA} "false" "${YAML_CONF}")
+    fi
+fi
+
 if [ -z "${MAX_LOG_FILE_NUMBER}" ]; then
     MAX_LOG_FILE_NUMBER=$(readFromConfig ${KEY_ENV_LOG_MAX} ${DEFAULT_ENV_LOG_MAX} "${YAML_CONF}")
 fi

http://git-wip-us.apache.org/repos/asf/flink/blob/11c868f9/flink-dist/src/main/flink-bin/bin/taskmanager.sh
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/bin/taskmanager.sh b/flink-dist/src/main/flink-bin/bin/taskmanager.sh
index e579c0c..6a745cb 100755
--- a/flink-dist/src/main/flink-bin/bin/taskmanager.sh
+++ b/flink-dist/src/main/flink-bin/bin/taskmanager.sh
@@ -96,4 +96,22 @@ if [[ $STARTSTOP == "start" ]]; then
     args=("--configDir" "${FLINK_CONF_DIR}")
 fi
 
-"${FLINK_BIN_DIR}"/flink-daemon.sh $STARTSTOP taskmanager "${args[@]}"
+TM_COMMAND="${FLINK_BIN_DIR}/flink-daemon.sh $STARTSTOP taskmanager ${args[@]}"
+
+if [[ $FLINK_TM_COMPUTE_NUMA == "false" ]]; then
+    # Start a single TaskManager
+    $TM_COMMAND
+else
+    # Example output from `numactl --show` on an AWS c4.8xlarge:
+    # policy: default
+    # preferred node: current
+    # physcpubind: 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
+    # cpubind: 0 1
+    # nodebind: 0 1
+    # membind: 0 1
+    read -ra NODE_LIST <<< $(numactl --show | grep "^nodebind: ")
+    for NODE_ID in "${NODE_LIST[@]:1}"; do
+        # Start a TaskManager for each NUMA node
+        numactl --membind=$NODE_ID --cpunodebind=$NODE_ID -- $TM_COMMAND
+    done
+fi