You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by uw...@apache.org on 2018/05/13 15:18:06 UTC

[arrow] branch master updated: ARROW-2569: [C++] Improve thread pool size heuristic

This is an automated email from the ASF dual-hosted git repository.

uwe pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new da3f843  ARROW-2569: [C++] Improve thread pool size heuristic
da3f843 is described below

commit da3f843c156fbbafbb22187d9c2a7b360af1b628
Author: Antoine Pitrou <an...@python.org>
AuthorDate: Sun May 13 17:18:03 2018 +0200

    ARROW-2569: [C++] Improve thread pool size heuristic
    
    The heuristic goes this way:
    - if the OMP_NUM_THREADS environment variable exists, it defines the baseline
      number of available threads
    - otherwise, the baseline is the value returned by std::thread::harware_concurrency()
    - the OMP_THREAD_LIMIT environment variable, if it exists, defined the upper bound
      for the final value, i.e. we return min(baseline, limit), otherwise we just
      return the baseline.
    
    This is the heuristic used by other packages such as the GNU "nproc" utility.
    
    Author: Antoine Pitrou <an...@python.org>
    
    Closes #2026 from pitrou/ARROW-2569-thread-pool-heuristic and squashes the following commits:
    
    5572b075 <Antoine Pitrou> Factor out environment variable helpers
    2dd0d12e <Antoine Pitrou> ARROW-2569:  Improve thread pool size heuristic
---
 cpp/src/arrow/util/io-util.cc          | 71 ++++++++++++++++++++++++++++++++++
 cpp/src/arrow/util/io-util.h           |  7 ++++
 cpp/src/arrow/util/thread-pool-test.cc | 44 +++++++++++++++++++++
 cpp/src/arrow/util/thread-pool.cc      | 41 +++++++++++++++++---
 cpp/src/arrow/util/thread-pool.h       |  5 +++
 5 files changed, 163 insertions(+), 5 deletions(-)

diff --git a/cpp/src/arrow/util/io-util.cc b/cpp/src/arrow/util/io-util.cc
index ce989f4..2071446 100644
--- a/cpp/src/arrow/util/io-util.cc
+++ b/cpp/src/arrow/util/io-util.cc
@@ -29,6 +29,7 @@
 #include <sstream>
 
 #include <fcntl.h>
+#include <stdlib.h>
 #include <string.h>
 #include <sys/stat.h>
 #include <sys/types.h>  // IWYU pragma: keep
@@ -427,5 +428,75 @@ Status FileTruncate(int fd, const int64_t size) {
   return Status::OK();
 }
 
+//
+// Environment variables
+//
+
+Status GetEnvVar(const char* name, std::string* out) {
+#ifdef _WIN32
+  // On Windows, getenv() reads an early copy of the process' environment
+  // which doesn't get updated when SetEnvironmentVariable() is called.
+  constexpr int32_t bufsize = 2000;
+  char c_str[bufsize];
+  auto res = GetEnvironmentVariableA(name, c_str, bufsize);
+  if (res >= bufsize) {
+    return Status::CapacityError("environment variable value too long");
+  } else if (res == 0) {
+    return Status::KeyError("environment variable undefined");
+  }
+  *out = std::string(c_str);
+  return Status::OK();
+#else
+  char* c_str = getenv(name);
+  if (c_str == nullptr) {
+    return Status::KeyError("environment variable undefined");
+  }
+  *out = std::string(c_str);
+  return Status::OK();
+#endif
+}
+
+Status GetEnvVar(const std::string& name, std::string* out) {
+  return GetEnvVar(name.c_str(), out);
+}
+
+Status SetEnvVar(const char* name, const char* value) {
+#ifdef _WIN32
+  if (SetEnvironmentVariableA(name, value)) {
+    return Status::OK();
+  } else {
+    return Status::Invalid("failed setting environment variable");
+  }
+#else
+  if (setenv(name, value, 1) == 0) {
+    return Status::OK();
+  } else {
+    return Status::Invalid("failed setting environment variable");
+  }
+#endif
+}
+
+Status SetEnvVar(const std::string& name, const std::string& value) {
+  return SetEnvVar(name.c_str(), value.c_str());
+}
+
+Status DelEnvVar(const char* name) {
+#ifdef _WIN32
+  if (SetEnvironmentVariableA(name, nullptr)) {
+    return Status::OK();
+  } else {
+    return Status::Invalid("failed deleting environment variable");
+  }
+#else
+  if (unsetenv(name) == 0) {
+    return Status::OK();
+  } else {
+    return Status::Invalid("failed deleting environment variable");
+  }
+#endif
+}
+
+Status DelEnvVar(const std::string& name) { return DelEnvVar(name.c_str()); }
+
 }  // namespace internal
 }  // namespace arrow
diff --git a/cpp/src/arrow/util/io-util.h b/cpp/src/arrow/util/io-util.h
index a0c66e9..28197e0 100644
--- a/cpp/src/arrow/util/io-util.h
+++ b/cpp/src/arrow/util/io-util.h
@@ -163,6 +163,13 @@ Status FileClose(int fd);
 
 Status CreatePipe(int fd[2]);
 
+Status GetEnvVar(const char* name, std::string* out);
+Status GetEnvVar(const std::string& name, std::string* out);
+Status SetEnvVar(const char* name, const char* value);
+Status SetEnvVar(const std::string& name, const std::string& value);
+Status DelEnvVar(const char* name);
+Status DelEnvVar(const std::string& name);
+
 }  // namespace internal
 }  // namespace arrow
 
diff --git a/cpp/src/arrow/util/thread-pool-test.cc b/cpp/src/arrow/util/thread-pool-test.cc
index 9e21b45..42bbe6e 100644
--- a/cpp/src/arrow/util/thread-pool-test.cc
+++ b/cpp/src/arrow/util/thread-pool-test.cc
@@ -22,6 +22,7 @@
 #include <vector>
 
 #include "arrow/test-util.h"
+#include "arrow/util/io-util.h"
 #include "arrow/util/macros.h"
 #include "arrow/util/thread-pool.h"
 
@@ -271,5 +272,48 @@ TEST_F(TestThreadPool, Submit) {
   }
 }
 
+TEST(TestGlobalThreadPool, Capacity) {
+  // Sanity check
+  auto pool = CPUThreadPool();
+  size_t capacity = pool->GetCapacity();
+  ASSERT_GT(capacity, 0);
+
+  // Exercise default capacity heuristic
+  ASSERT_OK(DelEnvVar("OMP_NUM_THREADS"));
+  ASSERT_OK(DelEnvVar("OMP_THREAD_LIMIT"));
+  size_t hw_capacity = std::thread::hardware_concurrency();
+  ASSERT_EQ(ThreadPool::DefaultCapacity(), hw_capacity);
+  ASSERT_OK(SetEnvVar("OMP_NUM_THREADS", "13"));
+  ASSERT_EQ(ThreadPool::DefaultCapacity(), 13);
+  ASSERT_OK(SetEnvVar("OMP_NUM_THREADS", "7,5,13"));
+  ASSERT_EQ(ThreadPool::DefaultCapacity(), 7);
+  ASSERT_OK(DelEnvVar("OMP_NUM_THREADS"));
+
+  ASSERT_OK(SetEnvVar("OMP_THREAD_LIMIT", "1"));
+  ASSERT_EQ(ThreadPool::DefaultCapacity(), 1);
+  ASSERT_OK(SetEnvVar("OMP_THREAD_LIMIT", "999"));
+  if (hw_capacity <= 999) {
+    ASSERT_EQ(ThreadPool::DefaultCapacity(), hw_capacity);
+  }
+  ASSERT_OK(SetEnvVar("OMP_NUM_THREADS", "6,5,13"));
+  ASSERT_EQ(ThreadPool::DefaultCapacity(), 6);
+  ASSERT_OK(SetEnvVar("OMP_THREAD_LIMIT", "2"));
+  ASSERT_EQ(ThreadPool::DefaultCapacity(), 2);
+
+  // Invalid env values
+  ASSERT_OK(SetEnvVar("OMP_NUM_THREADS", "0"));
+  ASSERT_OK(SetEnvVar("OMP_THREAD_LIMIT", "0"));
+  ASSERT_EQ(ThreadPool::DefaultCapacity(), hw_capacity);
+  ASSERT_OK(SetEnvVar("OMP_NUM_THREADS", "zzz"));
+  ASSERT_OK(SetEnvVar("OMP_THREAD_LIMIT", "x"));
+  ASSERT_EQ(ThreadPool::DefaultCapacity(), hw_capacity);
+  ASSERT_OK(SetEnvVar("OMP_THREAD_LIMIT", "-1"));
+  ASSERT_OK(SetEnvVar("OMP_NUM_THREADS", "99999999999999999999999999"));
+  ASSERT_EQ(ThreadPool::DefaultCapacity(), hw_capacity);
+
+  ASSERT_OK(DelEnvVar("OMP_NUM_THREADS"));
+  ASSERT_OK(DelEnvVar("OMP_THREAD_LIMIT"));
+}
+
 }  // namespace internal
 }  // namespace arrow
diff --git a/cpp/src/arrow/util/thread-pool.cc b/cpp/src/arrow/util/thread-pool.cc
index 26ff51c..0e264f3 100644
--- a/cpp/src/arrow/util/thread-pool.cc
+++ b/cpp/src/arrow/util/thread-pool.cc
@@ -16,8 +16,12 @@
 // under the License.
 
 #include "arrow/util/thread-pool.h"
+#include "arrow/util/io-util.h"
 #include "arrow/util/logging.h"
 
+#include <algorithm>
+#include <string>
+
 namespace arrow {
 namespace internal {
 
@@ -162,8 +166,34 @@ Status ThreadPool::Make(size_t threads, std::shared_ptr<ThreadPool>* out) {
 // ----------------------------------------------------------------------
 // Global thread pool
 
-static size_t DefaultCapacity() {
-  size_t capacity = std::thread::hardware_concurrency();
+static size_t ParseOMPEnvVar(const char* name) {
+  // OMP_NUM_THREADS is a comma-separated list of positive integers.
+  // We are only interested in the first (top-level) number.
+  std::string str;
+  if (!GetEnvVar(name, &str).ok()) {
+    return 0;
+  }
+  auto first_comma = str.find_first_of(',');
+  if (first_comma != std::string::npos) {
+    str = str.substr(0, first_comma);
+  }
+  try {
+    return static_cast<size_t>(std::max(0LL, std::stoll(str)));
+  } catch (...) {
+    return 0;
+  }
+}
+
+size_t ThreadPool::DefaultCapacity() {
+  size_t capacity, limit;
+  capacity = ParseOMPEnvVar("OMP_NUM_THREADS");
+  if (capacity == 0) {
+    capacity = std::thread::hardware_concurrency();
+  }
+  limit = ParseOMPEnvVar("OMP_THREAD_LIMIT");
+  if (limit > 0) {
+    capacity = std::min(limit, capacity);
+  }
   if (capacity == 0) {
     ARROW_LOG(WARNING) << "Failed to determine the number of available threads, "
                           "using a hardcoded arbitrary value";
@@ -175,7 +205,7 @@ static size_t DefaultCapacity() {
 // Helper for the singleton pattern
 static std::shared_ptr<ThreadPool> MakePoolWithDefaultCapacity() {
   std::shared_ptr<ThreadPool> pool;
-  DCHECK_OK(ThreadPool::Make(DefaultCapacity(), &pool));
+  DCHECK_OK(ThreadPool::Make(ThreadPool::DefaultCapacity(), &pool));
   return pool;
 }
 
@@ -184,9 +214,10 @@ ThreadPool* CPUThreadPool() {
   return singleton.get();
 }
 
+}  // namespace internal
+
 Status SetCPUThreadPoolCapacity(size_t threads) {
-  return CPUThreadPool()->SetCapacity(threads);
+  return internal::CPUThreadPool()->SetCapacity(threads);
 }
 
-}  // namespace internal
 }  // namespace arrow
diff --git a/cpp/src/arrow/util/thread-pool.h b/cpp/src/arrow/util/thread-pool.h
index d5913cf..d426cbb 100644
--- a/cpp/src/arrow/util/thread-pool.h
+++ b/cpp/src/arrow/util/thread-pool.h
@@ -72,6 +72,10 @@ class ThreadPool {
   // thread count is fully adjusted.
   Status SetCapacity(size_t threads);
 
+  // Heuristic for the default capacity of a thread pool for CPU-bound tasks.
+  // This is exposed as a static method to help with testing.
+  static size_t DefaultCapacity();
+
   // Shutdown the pool.  Once the pool starts shutting down, new tasks
   // cannot be submitted anymore.
   // If "wait" is true, shutdown waits for all pending tasks to be finished.
@@ -109,6 +113,7 @@ class ThreadPool {
 
  protected:
   FRIEND_TEST(TestThreadPool, SetCapacity);
+  FRIEND_TEST(TestGlobalThreadPool, Capacity);
 
   ThreadPool();
 

-- 
To stop receiving notification emails like this one, please contact
uwe@apache.org.