You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2018/08/02 23:23:05 UTC

[arrow] branch master updated: ARROW-2963: [C++] Make thread pool fork-safe

This is an automated email from the ASF dual-hosted git repository.

wesm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new 7afa4af  ARROW-2963: [C++] Make thread pool fork-safe
7afa4af is described below

commit 7afa4afc88327ec4d1879643faf25e3773ea9636
Author: Antoine Pitrou <an...@python.org>
AuthorDate: Thu Aug 2 19:22:54 2018 -0400

    ARROW-2963: [C++] Make thread pool fork-safe
    
    In a forked child process, ThreadPool should reinitialize its internal state as though no threads had been spawned.
    
    Author: Antoine Pitrou <an...@python.org>
    
    Closes #2363 from pitrou/ARROW-2963-thread-pool-fork-safety and squashes the following commits:
    
    851c1957 <Antoine Pitrou> Fix test on Python 2.7
    366939e8 <Antoine Pitrou> - Rename FixIfBroken() - Add multiprocessing-based Python test
    956556b5 <Antoine Pitrou> Disable test under Valgrind
    67bc877b <Antoine Pitrou> ARROW-2963:  Make thread pool fork-safe
---
 cpp/src/arrow/util/thread-pool-test.cc      | 65 +++++++++++++++++++++++++++++
 cpp/src/arrow/util/thread-pool.cc           | 37 +++++++++++++++-
 cpp/src/arrow/util/thread-pool.h            | 13 +++++-
 python/pyarrow/tests/test_convert_pandas.py | 21 ++++++++--
 4 files changed, 130 insertions(+), 6 deletions(-)

diff --git a/cpp/src/arrow/util/thread-pool-test.cc b/cpp/src/arrow/util/thread-pool-test.cc
index a70f016..99d4de2 100644
--- a/cpp/src/arrow/util/thread-pool-test.cc
+++ b/cpp/src/arrow/util/thread-pool-test.cc
@@ -15,8 +15,14 @@
 // specific language governing permissions and limitations
 // under the License.
 
+#ifndef _WIN32
+#include <sys/wait.h>
+#include <unistd.h>
+#endif
+
 #include <algorithm>
 #include <chrono>
+#include <cstdlib>
 #include <functional>
 #include <thread>
 #include <vector>
@@ -282,6 +288,65 @@ TEST_F(TestThreadPool, Submit) {
   }
 }
 
+// Test fork safety on Unix
+
+#if !(defined(_WIN32) || defined(ARROW_VALGRIND))
+TEST_F(TestThreadPool, ForkSafety) {
+  pid_t child_pid;
+  int child_status;
+
+  {
+    // Fork after task submission
+    auto pool = this->MakeThreadPool(3);
+    auto fut = pool->Submit(add<int>, 4, 5);
+    ASSERT_EQ(fut.get(), 9);
+
+    child_pid = fork();
+    if (child_pid == 0) {
+      // Child: thread pool should be usable
+      fut = pool->Submit(add<int>, 3, 4);
+      if (fut.get() != 7) {
+        std::exit(1);
+      }
+      // Shutting down shouldn't hang or fail
+      Status st = pool->Shutdown();
+      std::exit(st.ok() ? 0 : 2);
+    } else {
+      // Parent
+      ASSERT_GT(child_pid, 0);
+      ASSERT_GT(waitpid(child_pid, &child_status, 0), 0);
+      ASSERT_TRUE(WIFEXITED(child_status));
+      ASSERT_EQ(WEXITSTATUS(child_status), 0);
+      ASSERT_OK(pool->Shutdown());
+    }
+  }
+  {
+    // Fork after shutdown
+    auto pool = this->MakeThreadPool(3);
+    ASSERT_OK(pool->Shutdown());
+
+    child_pid = fork();
+    if (child_pid == 0) {
+      // Child
+      // Spawning a task should return with error (pool was shutdown)
+      Status st = pool->Spawn([] {});
+      if (!st.IsInvalid()) {
+        std::exit(1);
+      }
+      // Trigger destructor
+      pool.reset();
+      std::exit(0);
+    } else {
+      // Parent
+      ASSERT_GT(child_pid, 0);
+      ASSERT_GT(waitpid(child_pid, &child_status, 0), 0);
+      ASSERT_TRUE(WIFEXITED(child_status));
+      ASSERT_EQ(WEXITSTATUS(child_status), 0);
+    }
+  }
+}
+#endif
+
 TEST(TestGlobalThreadPool, Capacity) {
   // Sanity check
   auto pool = GetCpuThreadPool();
diff --git a/cpp/src/arrow/util/thread-pool.cc b/cpp/src/arrow/util/thread-pool.cc
index 90fa4b7..f3b05fd 100644
--- a/cpp/src/arrow/util/thread-pool.cc
+++ b/cpp/src/arrow/util/thread-pool.cc
@@ -51,7 +51,11 @@ struct ThreadPool::State {
 ThreadPool::ThreadPool()
     : sp_state_(std::make_shared<ThreadPool::State>()),
       state_(sp_state_.get()),
-      shutdown_on_destroy_(true) {}
+      shutdown_on_destroy_(true) {
+#ifndef _WIN32
+  pid_ = getpid();
+#endif
+}
 
 ThreadPool::~ThreadPool() {
   if (shutdown_on_destroy_) {
@@ -59,7 +63,34 @@ ThreadPool::~ThreadPool() {
   }
 }
 
+void ThreadPool::ProtectAgainstFork() {
+#ifndef _WIN32
+  pid_t current_pid = getpid();
+  if (pid_ != current_pid) {
+    // Reinitialize internal state in child process after fork()
+    // Ideally we would use pthread_at_fork(), but that doesn't allow
+    // storing an argument, hence we'd need to maintain a list of all
+    // existing ThreadPools.
+    int capacity = state_->desired_capacity_;
+
+    auto new_state = std::make_shared<ThreadPool::State>();
+    new_state->please_shutdown_ = state_->please_shutdown_;
+    new_state->quick_shutdown_ = state_->quick_shutdown_;
+
+    pid_ = current_pid;
+    sp_state_ = new_state;
+    state_ = sp_state_.get();
+
+    // Launch worker threads anew
+    if (!state_->please_shutdown_) {
+      ARROW_UNUSED(SetCapacity(capacity));
+    }
+  }
+#endif
+}
+
 Status ThreadPool::SetCapacity(int threads) {
+  ProtectAgainstFork();
   std::unique_lock<std::mutex> lock(state_->mutex_);
   if (state_->please_shutdown_) {
     return Status::Invalid("operation forbidden during or after shutdown");
@@ -81,16 +112,19 @@ Status ThreadPool::SetCapacity(int threads) {
 }
 
 int ThreadPool::GetCapacity() {
+  ProtectAgainstFork();
   std::unique_lock<std::mutex> lock(state_->mutex_);
   return state_->desired_capacity_;
 }
 
 int ThreadPool::GetActualCapacity() {
+  ProtectAgainstFork();
   std::unique_lock<std::mutex> lock(state_->mutex_);
   return static_cast<int>(state_->workers_.size());
 }
 
 Status ThreadPool::Shutdown(bool wait) {
+  ProtectAgainstFork();
   std::unique_lock<std::mutex> lock(state_->mutex_);
 
   if (state_->please_shutdown_) {
@@ -186,6 +220,7 @@ void ThreadPool::WorkerLoop(std::shared_ptr<State> state,
 
 Status ThreadPool::SpawnReal(std::function<void()> task) {
   {
+    ProtectAgainstFork();
     std::lock_guard<std::mutex> lock(state_->mutex_);
     if (state_->please_shutdown_) {
       return Status::Invalid("operation forbidden during or after shutdown");
diff --git a/cpp/src/arrow/util/thread-pool.h b/cpp/src/arrow/util/thread-pool.h
index 20b6b2e..07bc5f9 100644
--- a/cpp/src/arrow/util/thread-pool.h
+++ b/cpp/src/arrow/util/thread-pool.h
@@ -18,6 +18,10 @@
 #ifndef ARROW_UTIL_THREAD_POOL_H
 #define ARROW_UTIL_THREAD_POOL_H
 
+#ifndef _WIN32
+#include <unistd.h>
+#endif
+
 #include <exception>
 #include <functional>
 #include <future>
@@ -136,6 +140,8 @@ class ARROW_EXPORT ThreadPool {
   void LaunchWorkersUnlocked(int threads);
   // Get the current actual capacity
   int GetActualCapacity();
+  // Reinitialize the thread pool if the pid changed
+  void ProtectAgainstFork();
 
   // The worker loop is a static method so that it can keep running
   // after the ThreadPool is destroyed
@@ -144,9 +150,12 @@ class ARROW_EXPORT ThreadPool {
 
   static std::shared_ptr<ThreadPool> MakeCpuThreadPool();
 
-  const std::shared_ptr<State> sp_state_;
-  State* const state_;
+  std::shared_ptr<State> sp_state_;
+  State* state_;
   bool shutdown_on_destroy_;
+#ifndef _WIN32
+  pid_t pid_;
+#endif
 };
 
 // Return the process-global thread pool for CPU-bound tasks.
diff --git a/python/pyarrow/tests/test_convert_pandas.py b/python/pyarrow/tests/test_convert_pandas.py
index e9f2e4b..613d810 100644
--- a/python/pyarrow/tests/test_convert_pandas.py
+++ b/python/pyarrow/tests/test_convert_pandas.py
@@ -17,6 +17,7 @@
 # under the License.
 import decimal
 import json
+import multiprocessing as mp
 from collections import OrderedDict
 from datetime import date, datetime, time, timedelta
 
@@ -1826,6 +1827,13 @@ class TestZeroCopyConversion(object):
         self.check_zero_copy_failure(pa.array(arr))
 
 
+# This function must be at the top-level for Python 2.7's multiprocessing
+def _threaded_conversion():
+    df = _alltypes_example()
+    _check_pandas_roundtrip(df, use_threads=True)
+    _check_pandas_roundtrip(df, use_threads=True, as_batch=True)
+
+
 class TestConvertMisc(object):
     """
     Miscellaneous conversion tests.
@@ -1866,9 +1874,16 @@ class TestConvertMisc(object):
             _check_array_roundtrip(arr, type=pa_type)
 
     def test_threaded_conversion(self):
-        df = _alltypes_example()
-        _check_pandas_roundtrip(df, use_threads=True)
-        _check_pandas_roundtrip(df, use_threads=True, as_batch=True)
+        _threaded_conversion()
+
+    def test_threaded_conversion_multiprocess(self):
+        # Parallel conversion should work from child processes too (ARROW-2963)
+        pool = mp.Pool(2)
+        try:
+            pool.apply(_threaded_conversion)
+        finally:
+            pool.close()
+            pool.join()
 
     def test_category(self):
         repeats = 5