You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by zu...@apache.org on 2017/01/11 01:00:57 UTC
[05/50] incubator-quickstep git commit: Auto pin workers to CPU cores
Auto pin workers to CPU cores
- Automatically pin worker threads to CPU cores.
- Use auto pinning when either there are no affinities provided, or the
provided affinities are incorrect.
- Try to balance CPU cores across multiple sockets, when maximum
paralellism is not used.
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/1340fcb7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/1340fcb7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/1340fcb7
Branch: refs/heads/quickstep_partition_parser_support
Commit: 1340fcb7f47f8be0f5f8f397fdc9c9f08db120b3
Parents: 5a3037f
Author: Harshad Deshmukh <hb...@apache.org>
Authored: Thu Nov 3 23:34:37 2016 -0500
Committer: Hakan Memisoglu <ha...@gmail.com>
Committed: Sat Nov 5 14:59:53 2016 -0500
----------------------------------------------------------------------
cli/InputParserUtil.cpp | 68 ++++++++++++++++++++++++++++++++++++++------
1 file changed, 59 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1340fcb7/cli/InputParserUtil.cpp
----------------------------------------------------------------------
diff --git a/cli/InputParserUtil.cpp b/cli/InputParserUtil.cpp
index 0538afc..e45605c 100644
--- a/cli/InputParserUtil.cpp
+++ b/cli/InputParserUtil.cpp
@@ -21,6 +21,7 @@
#include <cstddef>
#include <iostream>
+#include <memory>
#include <ostream>
#include <string>
#include <utility>
@@ -44,23 +45,72 @@ std::vector<int> InputParserUtil::ParseWorkerAffinities(
const int num_workers,
const string &affinity_string) {
std::vector<int> affinities;
+ bool switch_to_default_affinities = false;
if (affinity_string.empty()) {
- affinities.resize(num_workers, -1);
- return affinities;
- }
-
- if (!ParseIntString(affinity_string, ',', &affinities)) {
- LOG(FATAL) << "--worker_affinities must be a comma-separated list of "
- << "integer CPU ids.\n";
+ switch_to_default_affinities = true;
+ LOG(INFO) << "Empty worker affinities provided, switching to default "
+ "worker affinities";
+ } else if (!ParseIntString(affinity_string, ',', &affinities)) {
+ switch_to_default_affinities = true;
+ LOG(INFO) << "Invalid worker affinities provided, switching to default "
+ "affinities";
}
for (const int affinity : affinities) {
if (affinity < -1) {
- LOG(FATAL) << "CPU affinities specified by --worker_affinities must be "
- << "non-negative, or -1 to specify no affinity.\n";
+ switch_to_default_affinities = true;
+ LOG(INFO) << "CPU affinities specified by --worker_affinities must be "
+ "non-negative, or -1 to specify no affinity. Switching to "
+ "default worker affinities";
+ break;
}
}
+ if (switch_to_default_affinities) {
+ // Set default affinities.
+ // If the number of worker threads is less than the maximum parallelism on
+ // the box, we try to balance workers on all sockets. The intention is to
+ // balance the memory bandwidth usage across all sockets. This may however
+ // hurt the performance (due to poor data locality) when the machine has
+ // many sockets and data is not partitioned.
+#ifdef QUICKSTEP_HAVE_LIBNUMA
+ // This code is inspired from the print_node_cpus() function of numactl.
+ // WARNING - If some NUMA sockets are disabled, we can't detect it.
+ const int num_sockets = numa_num_configured_nodes();
+ CHECK_GT(num_sockets, 0);
+ // A vector V where V[i] denotes a vector of CPU cores that belong to the
+ // socket i.
+ std::vector<std::vector<int>> cpus_from_sockets;
+ cpus_from_sockets.resize(num_sockets);
+ for (int curr_socket = 0; curr_socket < num_sockets; ++curr_socket) {
+ std::unique_ptr<struct bitmask> cpus(numa_allocate_cpumask());
+ const int err = numa_node_to_cpus(curr_socket, cpus.get());
+ if (err >= 0) {
+ for (int i = 0; i < static_cast<int>(cpus->size); i++) {
+ if (numa_bitmask_isbitset(cpus.get(), i)) {
+ // The current CPU belongs to curr_socket.
+ cpus_from_sockets[curr_socket].push_back(i);
+ }
+ }
+ }
+ }
+ // Now assign affinity to each worker, picking one CPU from each socket in a
+ // round robin manner.
+ int curr_socket = 0;
+ std::size_t iteration = 0;
+ for (int curr_worker = 0; curr_worker < num_workers; ++curr_worker) {
+ if (iteration < cpus_from_sockets[curr_socket].size()) {
+ const int curr_worker_affinity =
+ cpus_from_sockets[curr_socket][iteration];
+ affinities.push_back(curr_worker_affinity);
+ }
+ // Increase iteration number only when we are at the last socket.
+ iteration = iteration + ((curr_socket + 1) / num_sockets);
+ curr_socket = (curr_socket + 1) % num_sockets;
+ }
+#endif
+ }
+
if (affinities.size() < static_cast<std::size_t>(num_workers)) {
std::cout << "--num_workers is " << num_workers << ", but only "
<< "specified " << affinities.size() << " CPU affinities "