You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by zu...@apache.org on 2017/01/11 01:00:57 UTC

[05/50] incubator-quickstep git commit: Auto pin workers to CPU cores

Auto pin workers to CPU cores

- Automatically pin worker threads to CPU cores.
- Use auto pinning when either there are no affinities provided, or the
  provided affinities are incorrect.
- Try to balance CPU cores across multiple sockets, when maximum
  paralellism is not used.


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/1340fcb7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/1340fcb7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/1340fcb7

Branch: refs/heads/quickstep_partition_parser_support
Commit: 1340fcb7f47f8be0f5f8f397fdc9c9f08db120b3
Parents: 5a3037f
Author: Harshad Deshmukh <hb...@apache.org>
Authored: Thu Nov 3 23:34:37 2016 -0500
Committer: Hakan Memisoglu <ha...@gmail.com>
Committed: Sat Nov 5 14:59:53 2016 -0500

----------------------------------------------------------------------
 cli/InputParserUtil.cpp | 68 ++++++++++++++++++++++++++++++++++++++------
 1 file changed, 59 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1340fcb7/cli/InputParserUtil.cpp
----------------------------------------------------------------------
diff --git a/cli/InputParserUtil.cpp b/cli/InputParserUtil.cpp
index 0538afc..e45605c 100644
--- a/cli/InputParserUtil.cpp
+++ b/cli/InputParserUtil.cpp
@@ -21,6 +21,7 @@
 
 #include <cstddef>
 #include <iostream>
+#include <memory>
 #include <ostream>
 #include <string>
 #include <utility>
@@ -44,23 +45,72 @@ std::vector<int> InputParserUtil::ParseWorkerAffinities(
     const int num_workers,
     const string &affinity_string) {
   std::vector<int> affinities;
+  bool switch_to_default_affinities = false;
   if (affinity_string.empty()) {
-    affinities.resize(num_workers, -1);
-    return affinities;
-  }
-
-  if (!ParseIntString(affinity_string, ',', &affinities)) {
-    LOG(FATAL) << "--worker_affinities must be a comma-separated list of "
-               << "integer CPU ids.\n";
+    switch_to_default_affinities = true;
+    LOG(INFO) << "Empty worker affinities provided, switching to default "
+                 "worker affinities";
+  } else if (!ParseIntString(affinity_string, ',', &affinities)) {
+      switch_to_default_affinities = true;
+      LOG(INFO) << "Invalid worker affinities provided, switching to default "
+                   "affinities";
   }
 
   for (const int affinity : affinities) {
     if (affinity < -1) {
-      LOG(FATAL) << "CPU affinities specified by --worker_affinities must be "
-                 << "non-negative, or -1 to specify no affinity.\n";
+      switch_to_default_affinities = true;
+      LOG(INFO) << "CPU affinities specified by --worker_affinities must be "
+                   "non-negative, or -1 to specify no affinity. Switching to "
+                   "default worker affinities";
+      break;
     }
   }
 
+  if (switch_to_default_affinities) {
+    // Set default affinities.
+    // If the number of worker threads is less than the maximum parallelism on
+    // the box, we try to balance workers on all sockets. The intention is to
+    // balance the memory bandwidth usage across all sockets. This may however
+    // hurt the performance (due to poor data locality) when the machine has
+    // many sockets and data is not partitioned.
+#ifdef QUICKSTEP_HAVE_LIBNUMA
+    // This code is inspired from the print_node_cpus() function of numactl.
+    // WARNING - If some NUMA sockets are disabled, we can't detect it.
+    const int num_sockets = numa_num_configured_nodes();
+    CHECK_GT(num_sockets, 0);
+    // A vector V where V[i] denotes a vector of CPU cores that belong to the
+    // socket i.
+    std::vector<std::vector<int>> cpus_from_sockets;
+    cpus_from_sockets.resize(num_sockets);
+    for (int curr_socket = 0; curr_socket < num_sockets; ++curr_socket) {
+      std::unique_ptr<struct bitmask> cpus(numa_allocate_cpumask());
+      const int err = numa_node_to_cpus(curr_socket, cpus.get());
+      if (err >= 0) {
+        for (int i = 0; i < static_cast<int>(cpus->size); i++) {
+          if (numa_bitmask_isbitset(cpus.get(), i)) {
+            // The current CPU belongs to curr_socket.
+            cpus_from_sockets[curr_socket].push_back(i);
+          }
+        }
+      }
+    }
+    // Now assign affinity to each worker, picking one CPU from each socket in a
+    // round robin manner.
+    int curr_socket = 0;
+    std::size_t iteration = 0;
+    for (int curr_worker = 0; curr_worker < num_workers; ++curr_worker) {
+      if (iteration < cpus_from_sockets[curr_socket].size()) {
+        const int curr_worker_affinity =
+            cpus_from_sockets[curr_socket][iteration];
+        affinities.push_back(curr_worker_affinity);
+      }
+      // Increase iteration number only when we are at the last socket.
+      iteration = iteration + ((curr_socket + 1) / num_sockets);
+      curr_socket = (curr_socket + 1) % num_sockets;
+    }
+#endif
+  }
+
   if (affinities.size() < static_cast<std::size_t>(num_workers)) {
     std::cout << "--num_workers is " << num_workers << ", but only "
               << "specified " << affinities.size() << " CPU affinities "