You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2018/08/02 23:23:05 UTC
[arrow] branch master updated: ARROW-2963: [C++] Make thread pool
fork-safe
This is an automated email from the ASF dual-hosted git repository.
wesm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new 7afa4af ARROW-2963: [C++] Make thread pool fork-safe
7afa4af is described below
commit 7afa4afc88327ec4d1879643faf25e3773ea9636
Author: Antoine Pitrou <an...@python.org>
AuthorDate: Thu Aug 2 19:22:54 2018 -0400
ARROW-2963: [C++] Make thread pool fork-safe
In a forked child process, ThreadPool should reinitialize its internal state as though no threads had been spawned.
Author: Antoine Pitrou <an...@python.org>
Closes #2363 from pitrou/ARROW-2963-thread-pool-fork-safety and squashes the following commits:
851c1957 <Antoine Pitrou> Fix test on Python 2.7
366939e8 <Antoine Pitrou> - Rename FixIfBroken() - Add multiprocessing-based Python test
956556b5 <Antoine Pitrou> Disable test under Valgrind
67bc877b <Antoine Pitrou> ARROW-2963: Make thread pool fork-safe
---
cpp/src/arrow/util/thread-pool-test.cc | 65 +++++++++++++++++++++++++++++
cpp/src/arrow/util/thread-pool.cc | 37 +++++++++++++++-
cpp/src/arrow/util/thread-pool.h | 13 +++++-
python/pyarrow/tests/test_convert_pandas.py | 21 ++++++++--
4 files changed, 130 insertions(+), 6 deletions(-)
diff --git a/cpp/src/arrow/util/thread-pool-test.cc b/cpp/src/arrow/util/thread-pool-test.cc
index a70f016..99d4de2 100644
--- a/cpp/src/arrow/util/thread-pool-test.cc
+++ b/cpp/src/arrow/util/thread-pool-test.cc
@@ -15,8 +15,14 @@
// specific language governing permissions and limitations
// under the License.
+#ifndef _WIN32
+#include <sys/wait.h>
+#include <unistd.h>
+#endif
+
#include <algorithm>
#include <chrono>
+#include <cstdlib>
#include <functional>
#include <thread>
#include <vector>
@@ -282,6 +288,65 @@ TEST_F(TestThreadPool, Submit) {
}
}
+// Test fork safety on Unix
+
+#if !(defined(_WIN32) || defined(ARROW_VALGRIND))
+TEST_F(TestThreadPool, ForkSafety) {
+ pid_t child_pid;
+ int child_status;
+
+ {
+ // Fork after task submission
+ auto pool = this->MakeThreadPool(3);
+ auto fut = pool->Submit(add<int>, 4, 5);
+ ASSERT_EQ(fut.get(), 9);
+
+ child_pid = fork();
+ if (child_pid == 0) {
+ // Child: thread pool should be usable
+ fut = pool->Submit(add<int>, 3, 4);
+ if (fut.get() != 7) {
+ std::exit(1);
+ }
+ // Shutting down shouldn't hang or fail
+ Status st = pool->Shutdown();
+ std::exit(st.ok() ? 0 : 2);
+ } else {
+ // Parent
+ ASSERT_GT(child_pid, 0);
+ ASSERT_GT(waitpid(child_pid, &child_status, 0), 0);
+ ASSERT_TRUE(WIFEXITED(child_status));
+ ASSERT_EQ(WEXITSTATUS(child_status), 0);
+ ASSERT_OK(pool->Shutdown());
+ }
+ }
+ {
+ // Fork after shutdown
+ auto pool = this->MakeThreadPool(3);
+ ASSERT_OK(pool->Shutdown());
+
+ child_pid = fork();
+ if (child_pid == 0) {
+ // Child
+ // Spawning a task should return with error (pool was shutdown)
+ Status st = pool->Spawn([] {});
+ if (!st.IsInvalid()) {
+ std::exit(1);
+ }
+ // Trigger destructor
+ pool.reset();
+ std::exit(0);
+ } else {
+ // Parent
+ ASSERT_GT(child_pid, 0);
+ ASSERT_GT(waitpid(child_pid, &child_status, 0), 0);
+ ASSERT_TRUE(WIFEXITED(child_status));
+ ASSERT_EQ(WEXITSTATUS(child_status), 0);
+ }
+ }
+}
+#endif
+
TEST(TestGlobalThreadPool, Capacity) {
// Sanity check
auto pool = GetCpuThreadPool();
diff --git a/cpp/src/arrow/util/thread-pool.cc b/cpp/src/arrow/util/thread-pool.cc
index 90fa4b7..f3b05fd 100644
--- a/cpp/src/arrow/util/thread-pool.cc
+++ b/cpp/src/arrow/util/thread-pool.cc
@@ -51,7 +51,11 @@ struct ThreadPool::State {
ThreadPool::ThreadPool()
: sp_state_(std::make_shared<ThreadPool::State>()),
state_(sp_state_.get()),
- shutdown_on_destroy_(true) {}
+ shutdown_on_destroy_(true) {
+#ifndef _WIN32
+ pid_ = getpid();
+#endif
+}
ThreadPool::~ThreadPool() {
if (shutdown_on_destroy_) {
@@ -59,7 +63,34 @@ ThreadPool::~ThreadPool() {
}
}
+void ThreadPool::ProtectAgainstFork() {
+#ifndef _WIN32
+ pid_t current_pid = getpid();
+ if (pid_ != current_pid) {
+ // Reinitialize internal state in child process after fork()
+ // Ideally we would use pthread_at_fork(), but that doesn't allow
+ // storing an argument, hence we'd need to maintain a list of all
+ // existing ThreadPools.
+ int capacity = state_->desired_capacity_;
+
+ auto new_state = std::make_shared<ThreadPool::State>();
+ new_state->please_shutdown_ = state_->please_shutdown_;
+ new_state->quick_shutdown_ = state_->quick_shutdown_;
+
+ pid_ = current_pid;
+ sp_state_ = new_state;
+ state_ = sp_state_.get();
+
+ // Launch worker threads anew
+ if (!state_->please_shutdown_) {
+ ARROW_UNUSED(SetCapacity(capacity));
+ }
+ }
+#endif
+}
+
Status ThreadPool::SetCapacity(int threads) {
+ ProtectAgainstFork();
std::unique_lock<std::mutex> lock(state_->mutex_);
if (state_->please_shutdown_) {
return Status::Invalid("operation forbidden during or after shutdown");
@@ -81,16 +112,19 @@ Status ThreadPool::SetCapacity(int threads) {
}
int ThreadPool::GetCapacity() {
+ ProtectAgainstFork();
std::unique_lock<std::mutex> lock(state_->mutex_);
return state_->desired_capacity_;
}
int ThreadPool::GetActualCapacity() {
+ ProtectAgainstFork();
std::unique_lock<std::mutex> lock(state_->mutex_);
return static_cast<int>(state_->workers_.size());
}
Status ThreadPool::Shutdown(bool wait) {
+ ProtectAgainstFork();
std::unique_lock<std::mutex> lock(state_->mutex_);
if (state_->please_shutdown_) {
@@ -186,6 +220,7 @@ void ThreadPool::WorkerLoop(std::shared_ptr<State> state,
Status ThreadPool::SpawnReal(std::function<void()> task) {
{
+ ProtectAgainstFork();
std::lock_guard<std::mutex> lock(state_->mutex_);
if (state_->please_shutdown_) {
return Status::Invalid("operation forbidden during or after shutdown");
diff --git a/cpp/src/arrow/util/thread-pool.h b/cpp/src/arrow/util/thread-pool.h
index 20b6b2e..07bc5f9 100644
--- a/cpp/src/arrow/util/thread-pool.h
+++ b/cpp/src/arrow/util/thread-pool.h
@@ -18,6 +18,10 @@
#ifndef ARROW_UTIL_THREAD_POOL_H
#define ARROW_UTIL_THREAD_POOL_H
+#ifndef _WIN32
+#include <unistd.h>
+#endif
+
#include <exception>
#include <functional>
#include <future>
@@ -136,6 +140,8 @@ class ARROW_EXPORT ThreadPool {
void LaunchWorkersUnlocked(int threads);
// Get the current actual capacity
int GetActualCapacity();
+ // Reinitialize the thread pool if the pid changed
+ void ProtectAgainstFork();
// The worker loop is a static method so that it can keep running
// after the ThreadPool is destroyed
@@ -144,9 +150,12 @@ class ARROW_EXPORT ThreadPool {
static std::shared_ptr<ThreadPool> MakeCpuThreadPool();
- const std::shared_ptr<State> sp_state_;
- State* const state_;
+ std::shared_ptr<State> sp_state_;
+ State* state_;
bool shutdown_on_destroy_;
+#ifndef _WIN32
+ pid_t pid_;
+#endif
};
// Return the process-global thread pool for CPU-bound tasks.
diff --git a/python/pyarrow/tests/test_convert_pandas.py b/python/pyarrow/tests/test_convert_pandas.py
index e9f2e4b..613d810 100644
--- a/python/pyarrow/tests/test_convert_pandas.py
+++ b/python/pyarrow/tests/test_convert_pandas.py
@@ -17,6 +17,7 @@
# under the License.
import decimal
import json
+import multiprocessing as mp
from collections import OrderedDict
from datetime import date, datetime, time, timedelta
@@ -1826,6 +1827,13 @@ class TestZeroCopyConversion(object):
self.check_zero_copy_failure(pa.array(arr))
+# This function must be at the top-level for Python 2.7's multiprocessing
+def _threaded_conversion():
+ df = _alltypes_example()
+ _check_pandas_roundtrip(df, use_threads=True)
+ _check_pandas_roundtrip(df, use_threads=True, as_batch=True)
+
+
class TestConvertMisc(object):
"""
Miscellaneous conversion tests.
@@ -1866,9 +1874,16 @@ class TestConvertMisc(object):
_check_array_roundtrip(arr, type=pa_type)
def test_threaded_conversion(self):
- df = _alltypes_example()
- _check_pandas_roundtrip(df, use_threads=True)
- _check_pandas_roundtrip(df, use_threads=True, as_batch=True)
+ _threaded_conversion()
+
+ def test_threaded_conversion_multiprocess(self):
+ # Parallel conversion should work from child processes too (ARROW-2963)
+ pool = mp.Pool(2)
+ try:
+ pool.apply(_threaded_conversion)
+ finally:
+ pool.close()
+ pool.join()
def test_category(self):
repeats = 5