You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by yi...@apache.org on 2022/02/08 03:11:59 UTC

[arrow] branch master updated: ARROW-15593: [C++] Make after-fork ThreadPool reinitialization thread-safe

This is an automated email from the ASF dual-hosted git repository.

yibocai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new 70d1f87  ARROW-15593: [C++] Make after-fork ThreadPool reinitialization thread-safe
70d1f87 is described below

commit 70d1f879eac25cd33280dafe8302445fac25265b
Author: Antoine Pitrou <an...@python.org>
AuthorDate: Tue Feb 8 03:09:27 2022 +0000

    ARROW-15593: [C++] Make after-fork ThreadPool reinitialization thread-safe
    
    Since after-fork reinitialization is triggered when one of the ThreadPool methods is called, it can be very well be called from multiple threads at once.  Make it thread-safe.
    
    Closes #12358 from pitrou/ARROW-15593-thread-pool-race-condition
    
    Authored-by: Antoine Pitrou <an...@python.org>
    Signed-off-by: Yibo Cai <yi...@arm.com>
---
 cpp/src/arrow/util/mutex.cc            |  31 +++++++++
 cpp/src/arrow/util/mutex.h             |  21 ++++++
 cpp/src/arrow/util/thread_pool.cc      |  41 +++++++-----
 cpp/src/arrow/util/thread_pool.h       |   6 +-
 cpp/src/arrow/util/thread_pool_test.cc | 117 ++++++++++++++++++++++++++++-----
 5 files changed, 181 insertions(+), 35 deletions(-)

diff --git a/cpp/src/arrow/util/mutex.cc b/cpp/src/arrow/util/mutex.cc
index 7456d78..9f82ad4 100644
--- a/cpp/src/arrow/util/mutex.cc
+++ b/cpp/src/arrow/util/mutex.cc
@@ -19,6 +19,11 @@
 
 #include <mutex>
 
+#ifndef _WIN32
+#include <pthread.h>
+#include <atomic>
+#endif
+
 #include "arrow/util/logging.h"
 
 namespace arrow {
@@ -50,5 +55,31 @@ Mutex::Guard Mutex::Lock() {
 
 Mutex::Mutex() : impl_(new Impl, [](Impl* impl) { delete impl; }) {}
 
+#ifndef _WIN32
+namespace {
+
+struct AfterForkState {
+  // A global instance that will also register the atfork handler when
+  // constructed.
+  static AfterForkState instance;
+
+  // The mutex may be used at shutdown, so make it eternal.
+  // The leak (only in child processes) is a small price to pay for robustness.
+  Mutex* mutex = nullptr;
+
+ private:
+  AfterForkState() {
+    pthread_atfork(/*prepare=*/nullptr, /*parent=*/nullptr, /*child=*/&AfterFork);
+  }
+
+  static void AfterFork() { instance.mutex = new Mutex; }
+};
+
+AfterForkState AfterForkState::instance;
+}  // namespace
+
+Mutex* GlobalForkSafeMutex() { return AfterForkState::instance.mutex; }
+#endif  // _WIN32
+
 }  // namespace util
 }  // namespace arrow
diff --git a/cpp/src/arrow/util/mutex.h b/cpp/src/arrow/util/mutex.h
index f4fc641..ac63cf7 100644
--- a/cpp/src/arrow/util/mutex.h
+++ b/cpp/src/arrow/util/mutex.h
@@ -60,5 +60,26 @@ class ARROW_EXPORT Mutex {
   std::unique_ptr<Impl, void (*)(Impl*)> impl_;
 };
 
+#ifndef _WIN32
+/// Return a pointer to a process-wide, process-specific Mutex that can be used
+/// at any point in a child process.  NULL is returned when called in the parent.
+///
+/// The rule is to first check that getpid() corresponds to the parent process pid
+/// and, if not, call this function to lock any after-fork reinitialization code.
+/// Like this:
+///
+///   std::atomic<pid_t> pid{getpid()};
+///   ...
+///   if (pid.load() != getpid()) {
+///     // In child process
+///     auto lock = GlobalForkSafeMutex()->Lock();
+///     if (pid.load() != getpid()) {
+///       // Reinitialize internal structures after fork
+///       ...
+///       pid.store(getpid());
+ARROW_EXPORT
+Mutex* GlobalForkSafeMutex();
+#endif
+
 }  // namespace util
 }  // namespace arrow
diff --git a/cpp/src/arrow/util/thread_pool.cc b/cpp/src/arrow/util/thread_pool.cc
index 37132fe..a138794 100644
--- a/cpp/src/arrow/util/thread_pool.cc
+++ b/cpp/src/arrow/util/thread_pool.cc
@@ -28,6 +28,7 @@
 
 #include "arrow/util/io_util.h"
 #include "arrow/util/logging.h"
+#include "arrow/util/mutex.h"
 
 namespace arrow {
 namespace internal {
@@ -235,24 +236,28 @@ ThreadPool::~ThreadPool() {
 void ThreadPool::ProtectAgainstFork() {
 #ifndef _WIN32
   pid_t current_pid = getpid();
-  if (pid_ != current_pid) {
-    // Reinitialize internal state in child process after fork()
-    // Ideally we would use pthread_at_fork(), but that doesn't allow
-    // storing an argument, hence we'd need to maintain a list of all
-    // existing ThreadPools.
-    int capacity = state_->desired_capacity_;
-
-    auto new_state = std::make_shared<ThreadPool::State>();
-    new_state->please_shutdown_ = state_->please_shutdown_;
-    new_state->quick_shutdown_ = state_->quick_shutdown_;
-
-    pid_ = current_pid;
-    sp_state_ = new_state;
-    state_ = sp_state_.get();
-
-    // Launch worker threads anew
-    if (!state_->please_shutdown_) {
-      ARROW_UNUSED(SetCapacity(capacity));
+  if (pid_.load() != current_pid) {
+    // Reinitialize internal state in child process after fork().
+    {
+      // Since after-fork reinitialization is triggered when one of the ThreadPool
+      // methods is called, it can be very well be called from multiple threads
+      // at once.  Therefore, it needs to be guarded with a lock.
+      auto lock = util::GlobalForkSafeMutex()->Lock();
+
+      if (pid_.load() != current_pid) {
+        int capacity = state_->desired_capacity_;
+
+        auto new_state = std::make_shared<ThreadPool::State>();
+        new_state->please_shutdown_ = state_->please_shutdown_;
+        new_state->quick_shutdown_ = state_->quick_shutdown_;
+
+        sp_state_ = new_state;
+        state_ = sp_state_.get();
+        pid_ = current_pid;
+
+        // Launch worker threads anew
+        ARROW_UNUSED(SetCapacity(capacity));
+      }
     }
   }
 #endif
diff --git a/cpp/src/arrow/util/thread_pool.h b/cpp/src/arrow/util/thread_pool.h
index 4ed908d..a104e0e 100644
--- a/cpp/src/arrow/util/thread_pool.h
+++ b/cpp/src/arrow/util/thread_pool.h
@@ -21,6 +21,10 @@
 #include <unistd.h>
 #endif
 
+#ifndef _WIN32
+#include <atomic>
+#endif
+
 #include <cstdint>
 #include <memory>
 #include <queue>
@@ -373,7 +377,7 @@ class ARROW_EXPORT ThreadPool : public Executor {
   State* state_;
   bool shutdown_on_destroy_;
 #ifndef _WIN32
-  pid_t pid_;
+  std::atomic<pid_t> pid_;
 #endif
 };
 
diff --git a/cpp/src/arrow/util/thread_pool_test.cc b/cpp/src/arrow/util/thread_pool_test.cc
index 399c755..56efedc 100644
--- a/cpp/src/arrow/util/thread_pool_test.cc
+++ b/cpp/src/arrow/util/thread_pool_test.cc
@@ -36,6 +36,7 @@
 #include "arrow/testing/future_util.h"
 #include "arrow/testing/gtest_util.h"
 #include "arrow/util/io_util.h"
+#include "arrow/util/logging.h"
 #include "arrow/util/macros.h"
 #include "arrow/util/test_common.h"
 #include "arrow/util/thread_pool.h"
@@ -610,32 +611,42 @@ TEST_F(TestThreadPool, SubmitWithStopTokenCancelled) {
 
 #if !(defined(_WIN32) || defined(ARROW_VALGRIND) || defined(ADDRESS_SANITIZER) || \
       defined(THREAD_SANITIZER))
-TEST_F(TestThreadPool, ForkSafety) {
-  pid_t child_pid;
-  int child_status;
 
+class TestThreadPoolForkSafety : public TestThreadPool {
+ public:
+  void CheckChildExit(int child_pid) {
+    ASSERT_GT(child_pid, 0);
+    int child_status;
+    int got_pid = waitpid(child_pid, &child_status, 0);
+    ASSERT_EQ(got_pid, child_pid);
+    if (WIFSIGNALED(child_status)) {
+      FAIL() << "Child terminated by signal " << WTERMSIG(child_status);
+    }
+    if (!WIFEXITED(child_status)) {
+      FAIL() << "Child didn't terminate normally?? Child status = " << child_status;
+    }
+    ASSERT_EQ(WEXITSTATUS(child_status), 0);
+  }
+};
+
+TEST_F(TestThreadPoolForkSafety, Basics) {
   {
     // Fork after task submission
     auto pool = this->MakeThreadPool(3);
     ASSERT_OK_AND_ASSIGN(auto fut, pool->Submit(add<int>, 4, 5));
     ASSERT_OK_AND_EQ(9, fut.result());
 
-    child_pid = fork();
+    auto child_pid = fork();
     if (child_pid == 0) {
       // Child: thread pool should be usable
       ASSERT_OK_AND_ASSIGN(fut, pool->Submit(add<int>, 3, 4));
-      if (*fut.result() != 7) {
-        std::exit(1);
-      }
+      ASSERT_FINISHES_OK_AND_EQ(7, fut);
       // Shutting down shouldn't hang or fail
       Status st = pool->Shutdown();
       std::exit(st.ok() ? 0 : 2);
     } else {
       // Parent
-      ASSERT_GT(child_pid, 0);
-      ASSERT_GT(waitpid(child_pid, &child_status, 0), 0);
-      ASSERT_TRUE(WIFEXITED(child_status));
-      ASSERT_EQ(WEXITSTATUS(child_status), 0);
+      CheckChildExit(child_pid);
       ASSERT_OK(pool->Shutdown());
     }
   }
@@ -644,7 +655,7 @@ TEST_F(TestThreadPool, ForkSafety) {
     auto pool = this->MakeThreadPool(3);
     ASSERT_OK(pool->Shutdown());
 
-    child_pid = fork();
+    auto child_pid = fork();
     if (child_pid == 0) {
       // Child
       // Spawning a task should return with error (pool was shutdown)
@@ -657,13 +668,87 @@ TEST_F(TestThreadPool, ForkSafety) {
       std::exit(0);
     } else {
       // Parent
-      ASSERT_GT(child_pid, 0);
-      ASSERT_GT(waitpid(child_pid, &child_status, 0), 0);
-      ASSERT_TRUE(WIFEXITED(child_status));
-      ASSERT_EQ(WEXITSTATUS(child_status), 0);
+      CheckChildExit(child_pid);
     }
   }
 }
+
+TEST_F(TestThreadPoolForkSafety, MultipleChildThreads) {
+  // ARROW-15593: race condition in after-fork ThreadPool reinitialization
+  // when SpawnReal() was called from multiple threads in a forked child.
+  auto run_in_child = [](ThreadPool* pool) {
+    const int n_threads = 5;
+    std::vector<Future<int>> futures;
+    std::vector<std::thread> threads;
+    futures.reserve(n_threads);
+    threads.reserve(n_threads);
+
+    auto run_in_thread = [&]() {
+      auto maybe_fut = pool->Submit(add<int>, 3, 4);
+      futures.push_back(DeferNotOk(std::move(maybe_fut)));
+    };
+
+    for (int i = 0; i < n_threads; ++i) {
+      threads.emplace_back(run_in_thread);
+    }
+    for (auto& thread : threads) {
+      thread.join();
+    }
+    for (const auto& fut : futures) {
+      ASSERT_FINISHES_OK_AND_EQ(7, fut);
+    }
+  };
+
+  {
+    auto pool = this->MakeThreadPool(3);
+    ASSERT_OK_AND_ASSIGN(auto fut, pool->Submit(add<int>, 4, 5));
+    ASSERT_OK_AND_EQ(9, fut.result());
+
+    auto child_pid = fork();
+    if (child_pid == 0) {
+      // Child: spawn tasks from multiple threads at once
+      run_in_child(pool.get());
+      std::exit(0);
+    } else {
+      // Parent
+      CheckChildExit(child_pid);
+      ASSERT_OK(pool->Shutdown());
+    }
+  }
+}
+
+TEST_F(TestThreadPoolForkSafety, NestedChild) {
+  {
+    auto pool = this->MakeThreadPool(3);
+    ASSERT_OK_AND_ASSIGN(auto fut, pool->Submit(add<int>, 4, 5));
+    ASSERT_OK_AND_EQ(9, fut.result());
+
+    auto child_pid = fork();
+    if (child_pid == 0) {
+      // Child
+      ASSERT_OK_AND_ASSIGN(fut, pool->Submit(add<int>, 3, 4));
+      // Fork while the task is running
+      auto grandchild_pid = fork();
+      if (grandchild_pid == 0) {
+        // Grandchild
+        ASSERT_OK_AND_ASSIGN(fut, pool->Submit(add<int>, 1, 2));
+        ASSERT_FINISHES_OK_AND_EQ(3, fut);
+        ASSERT_OK(pool->Shutdown());
+      } else {
+        // Child
+        CheckChildExit(grandchild_pid);
+        ASSERT_FINISHES_OK_AND_EQ(7, fut);
+        ASSERT_OK(pool->Shutdown());
+      }
+      std::exit(0);
+    } else {
+      // Parent
+      CheckChildExit(child_pid);
+      ASSERT_OK(pool->Shutdown());
+    }
+  }
+}
+
 #endif
 
 TEST(TestGlobalThreadPool, Capacity) {