You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by yi...@apache.org on 2022/02/08 03:11:59 UTC
[arrow] branch master updated: ARROW-15593: [C++] Make after-fork ThreadPool reinitialization thread-safe
This is an automated email from the ASF dual-hosted git repository.
yibocai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new 70d1f87 ARROW-15593: [C++] Make after-fork ThreadPool reinitialization thread-safe
70d1f87 is described below
commit 70d1f879eac25cd33280dafe8302445fac25265b
Author: Antoine Pitrou <an...@python.org>
AuthorDate: Tue Feb 8 03:09:27 2022 +0000
ARROW-15593: [C++] Make after-fork ThreadPool reinitialization thread-safe
Since after-fork reinitialization is triggered when one of the ThreadPool methods is called, it can be very well be called from multiple threads at once. Make it thread-safe.
Closes #12358 from pitrou/ARROW-15593-thread-pool-race-condition
Authored-by: Antoine Pitrou <an...@python.org>
Signed-off-by: Yibo Cai <yi...@arm.com>
---
cpp/src/arrow/util/mutex.cc | 31 +++++++++
cpp/src/arrow/util/mutex.h | 21 ++++++
cpp/src/arrow/util/thread_pool.cc | 41 +++++++-----
cpp/src/arrow/util/thread_pool.h | 6 +-
cpp/src/arrow/util/thread_pool_test.cc | 117 ++++++++++++++++++++++++++++-----
5 files changed, 181 insertions(+), 35 deletions(-)
diff --git a/cpp/src/arrow/util/mutex.cc b/cpp/src/arrow/util/mutex.cc
index 7456d78..9f82ad4 100644
--- a/cpp/src/arrow/util/mutex.cc
+++ b/cpp/src/arrow/util/mutex.cc
@@ -19,6 +19,11 @@
#include <mutex>
+#ifndef _WIN32
+#include <pthread.h>
+#include <atomic>
+#endif
+
#include "arrow/util/logging.h"
namespace arrow {
@@ -50,5 +55,31 @@ Mutex::Guard Mutex::Lock() {
Mutex::Mutex() : impl_(new Impl, [](Impl* impl) { delete impl; }) {}
+#ifndef _WIN32
+namespace {
+
+struct AfterForkState {
+ // A global instance that will also register the atfork handler when
+ // constructed.
+ static AfterForkState instance;
+
+ // The mutex may be used at shutdown, so make it eternal.
+ // The leak (only in child processes) is a small price to pay for robustness.
+ Mutex* mutex = nullptr;
+
+ private:
+ AfterForkState() {
+ pthread_atfork(/*prepare=*/nullptr, /*parent=*/nullptr, /*child=*/&AfterFork);
+ }
+
+ static void AfterFork() { instance.mutex = new Mutex; }
+};
+
+AfterForkState AfterForkState::instance;
+} // namespace
+
+Mutex* GlobalForkSafeMutex() { return AfterForkState::instance.mutex; }
+#endif // _WIN32
+
} // namespace util
} // namespace arrow
diff --git a/cpp/src/arrow/util/mutex.h b/cpp/src/arrow/util/mutex.h
index f4fc641..ac63cf7 100644
--- a/cpp/src/arrow/util/mutex.h
+++ b/cpp/src/arrow/util/mutex.h
@@ -60,5 +60,26 @@ class ARROW_EXPORT Mutex {
std::unique_ptr<Impl, void (*)(Impl*)> impl_;
};
+#ifndef _WIN32
+/// Return a pointer to a process-wide, process-specific Mutex that can be used
+/// at any point in a child process. NULL is returned when called in the parent.
+///
+/// The rule is to first check that getpid() corresponds to the parent process pid
+/// and, if not, call this function to lock any after-fork reinitialization code.
+/// Like this:
+///
+/// std::atomic<pid_t> pid{getpid()};
+/// ...
+/// if (pid.load() != getpid()) {
+/// // In child process
+/// auto lock = GlobalForkSafeMutex()->Lock();
+/// if (pid.load() != getpid()) {
+/// // Reinitialize internal structures after fork
+/// ...
+/// pid.store(getpid());
+ARROW_EXPORT
+Mutex* GlobalForkSafeMutex();
+#endif
+
} // namespace util
} // namespace arrow
diff --git a/cpp/src/arrow/util/thread_pool.cc b/cpp/src/arrow/util/thread_pool.cc
index 37132fe..a138794 100644
--- a/cpp/src/arrow/util/thread_pool.cc
+++ b/cpp/src/arrow/util/thread_pool.cc
@@ -28,6 +28,7 @@
#include "arrow/util/io_util.h"
#include "arrow/util/logging.h"
+#include "arrow/util/mutex.h"
namespace arrow {
namespace internal {
@@ -235,24 +236,28 @@ ThreadPool::~ThreadPool() {
void ThreadPool::ProtectAgainstFork() {
#ifndef _WIN32
pid_t current_pid = getpid();
- if (pid_ != current_pid) {
- // Reinitialize internal state in child process after fork()
- // Ideally we would use pthread_at_fork(), but that doesn't allow
- // storing an argument, hence we'd need to maintain a list of all
- // existing ThreadPools.
- int capacity = state_->desired_capacity_;
-
- auto new_state = std::make_shared<ThreadPool::State>();
- new_state->please_shutdown_ = state_->please_shutdown_;
- new_state->quick_shutdown_ = state_->quick_shutdown_;
-
- pid_ = current_pid;
- sp_state_ = new_state;
- state_ = sp_state_.get();
-
- // Launch worker threads anew
- if (!state_->please_shutdown_) {
- ARROW_UNUSED(SetCapacity(capacity));
+ if (pid_.load() != current_pid) {
+ // Reinitialize internal state in child process after fork().
+ {
+ // Since after-fork reinitialization is triggered when one of the ThreadPool
+ // methods is called, it can be very well be called from multiple threads
+ // at once. Therefore, it needs to be guarded with a lock.
+ auto lock = util::GlobalForkSafeMutex()->Lock();
+
+ if (pid_.load() != current_pid) {
+ int capacity = state_->desired_capacity_;
+
+ auto new_state = std::make_shared<ThreadPool::State>();
+ new_state->please_shutdown_ = state_->please_shutdown_;
+ new_state->quick_shutdown_ = state_->quick_shutdown_;
+
+ sp_state_ = new_state;
+ state_ = sp_state_.get();
+ pid_ = current_pid;
+
+ // Launch worker threads anew
+ ARROW_UNUSED(SetCapacity(capacity));
+ }
}
}
#endif
diff --git a/cpp/src/arrow/util/thread_pool.h b/cpp/src/arrow/util/thread_pool.h
index 4ed908d..a104e0e 100644
--- a/cpp/src/arrow/util/thread_pool.h
+++ b/cpp/src/arrow/util/thread_pool.h
@@ -21,6 +21,10 @@
#include <unistd.h>
#endif
+#ifndef _WIN32
+#include <atomic>
+#endif
+
#include <cstdint>
#include <memory>
#include <queue>
@@ -373,7 +377,7 @@ class ARROW_EXPORT ThreadPool : public Executor {
State* state_;
bool shutdown_on_destroy_;
#ifndef _WIN32
- pid_t pid_;
+ std::atomic<pid_t> pid_;
#endif
};
diff --git a/cpp/src/arrow/util/thread_pool_test.cc b/cpp/src/arrow/util/thread_pool_test.cc
index 399c755..56efedc 100644
--- a/cpp/src/arrow/util/thread_pool_test.cc
+++ b/cpp/src/arrow/util/thread_pool_test.cc
@@ -36,6 +36,7 @@
#include "arrow/testing/future_util.h"
#include "arrow/testing/gtest_util.h"
#include "arrow/util/io_util.h"
+#include "arrow/util/logging.h"
#include "arrow/util/macros.h"
#include "arrow/util/test_common.h"
#include "arrow/util/thread_pool.h"
@@ -610,32 +611,42 @@ TEST_F(TestThreadPool, SubmitWithStopTokenCancelled) {
#if !(defined(_WIN32) || defined(ARROW_VALGRIND) || defined(ADDRESS_SANITIZER) || \
defined(THREAD_SANITIZER))
-TEST_F(TestThreadPool, ForkSafety) {
- pid_t child_pid;
- int child_status;
+class TestThreadPoolForkSafety : public TestThreadPool {
+ public:
+ void CheckChildExit(int child_pid) {
+ ASSERT_GT(child_pid, 0);
+ int child_status;
+ int got_pid = waitpid(child_pid, &child_status, 0);
+ ASSERT_EQ(got_pid, child_pid);
+ if (WIFSIGNALED(child_status)) {
+ FAIL() << "Child terminated by signal " << WTERMSIG(child_status);
+ }
+ if (!WIFEXITED(child_status)) {
+ FAIL() << "Child didn't terminate normally?? Child status = " << child_status;
+ }
+ ASSERT_EQ(WEXITSTATUS(child_status), 0);
+ }
+};
+
+TEST_F(TestThreadPoolForkSafety, Basics) {
{
// Fork after task submission
auto pool = this->MakeThreadPool(3);
ASSERT_OK_AND_ASSIGN(auto fut, pool->Submit(add<int>, 4, 5));
ASSERT_OK_AND_EQ(9, fut.result());
- child_pid = fork();
+ auto child_pid = fork();
if (child_pid == 0) {
// Child: thread pool should be usable
ASSERT_OK_AND_ASSIGN(fut, pool->Submit(add<int>, 3, 4));
- if (*fut.result() != 7) {
- std::exit(1);
- }
+ ASSERT_FINISHES_OK_AND_EQ(7, fut);
// Shutting down shouldn't hang or fail
Status st = pool->Shutdown();
std::exit(st.ok() ? 0 : 2);
} else {
// Parent
- ASSERT_GT(child_pid, 0);
- ASSERT_GT(waitpid(child_pid, &child_status, 0), 0);
- ASSERT_TRUE(WIFEXITED(child_status));
- ASSERT_EQ(WEXITSTATUS(child_status), 0);
+ CheckChildExit(child_pid);
ASSERT_OK(pool->Shutdown());
}
}
@@ -644,7 +655,7 @@ TEST_F(TestThreadPool, ForkSafety) {
auto pool = this->MakeThreadPool(3);
ASSERT_OK(pool->Shutdown());
- child_pid = fork();
+ auto child_pid = fork();
if (child_pid == 0) {
// Child
// Spawning a task should return with error (pool was shutdown)
@@ -657,13 +668,87 @@ TEST_F(TestThreadPool, ForkSafety) {
std::exit(0);
} else {
// Parent
- ASSERT_GT(child_pid, 0);
- ASSERT_GT(waitpid(child_pid, &child_status, 0), 0);
- ASSERT_TRUE(WIFEXITED(child_status));
- ASSERT_EQ(WEXITSTATUS(child_status), 0);
+ CheckChildExit(child_pid);
}
}
}
+
+TEST_F(TestThreadPoolForkSafety, MultipleChildThreads) {
+ // ARROW-15593: race condition in after-fork ThreadPool reinitialization
+ // when SpawnReal() was called from multiple threads in a forked child.
+ auto run_in_child = [](ThreadPool* pool) {
+ const int n_threads = 5;
+ std::vector<Future<int>> futures;
+ std::vector<std::thread> threads;
+ futures.reserve(n_threads);
+ threads.reserve(n_threads);
+
+ auto run_in_thread = [&]() {
+ auto maybe_fut = pool->Submit(add<int>, 3, 4);
+ futures.push_back(DeferNotOk(std::move(maybe_fut)));
+ };
+
+ for (int i = 0; i < n_threads; ++i) {
+ threads.emplace_back(run_in_thread);
+ }
+ for (auto& thread : threads) {
+ thread.join();
+ }
+ for (const auto& fut : futures) {
+ ASSERT_FINISHES_OK_AND_EQ(7, fut);
+ }
+ };
+
+ {
+ auto pool = this->MakeThreadPool(3);
+ ASSERT_OK_AND_ASSIGN(auto fut, pool->Submit(add<int>, 4, 5));
+ ASSERT_OK_AND_EQ(9, fut.result());
+
+ auto child_pid = fork();
+ if (child_pid == 0) {
+ // Child: spawn tasks from multiple threads at once
+ run_in_child(pool.get());
+ std::exit(0);
+ } else {
+ // Parent
+ CheckChildExit(child_pid);
+ ASSERT_OK(pool->Shutdown());
+ }
+ }
+}
+
+TEST_F(TestThreadPoolForkSafety, NestedChild) {
+ {
+ auto pool = this->MakeThreadPool(3);
+ ASSERT_OK_AND_ASSIGN(auto fut, pool->Submit(add<int>, 4, 5));
+ ASSERT_OK_AND_EQ(9, fut.result());
+
+ auto child_pid = fork();
+ if (child_pid == 0) {
+ // Child
+ ASSERT_OK_AND_ASSIGN(fut, pool->Submit(add<int>, 3, 4));
+ // Fork while the task is running
+ auto grandchild_pid = fork();
+ if (grandchild_pid == 0) {
+ // Grandchild
+ ASSERT_OK_AND_ASSIGN(fut, pool->Submit(add<int>, 1, 2));
+ ASSERT_FINISHES_OK_AND_EQ(3, fut);
+ ASSERT_OK(pool->Shutdown());
+ } else {
+ // Child
+ CheckChildExit(grandchild_pid);
+ ASSERT_FINISHES_OK_AND_EQ(7, fut);
+ ASSERT_OK(pool->Shutdown());
+ }
+ std::exit(0);
+ } else {
+ // Parent
+ CheckChildExit(child_pid);
+ ASSERT_OK(pool->Shutdown());
+ }
+ }
+}
+
#endif
TEST(TestGlobalThreadPool, Capacity) {