You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2018/05/25 01:37:52 UTC
[arrow] branch master updated: ARROW-2568: [Python] Expose thread
pool size setting to Python, and deprecate "nthreads" where possible
This is an automated email from the ASF dual-hosted git repository.
wesm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new 01202cc ARROW-2568: [Python] Expose thread pool size setting to Python, and deprecate "nthreads" where possible
01202cc is described below
commit 01202ccc84144d69723bba76e61e8afb8222437b
Author: Antoine Pitrou <an...@python.org>
AuthorDate: Thu May 24 21:28:15 2018 -0400
ARROW-2568: [Python] Expose thread pool size setting to Python, and deprecate "nthreads" where possible
There are two areas where `nthreads` cannot be replaced immediately by the global thread pool:
1. when converting Pandas data to Arrow table or record batch, since it uses a Python `ThreadPoolExecutor` from pure Python code (see `dataframe_to_arrays` in `pandas_compat.py`)
2. when reading or writing Parquet data, since `parquet-cpp` relies on parallelization facilities in the stable version of Arrow (see https://github.com/apache/parquet-cpp/pull/467)
Elsewhere, we add a `use_threads` boolean argument and deprecate `nthreads`.
Author: Antoine Pitrou <an...@python.org>
Closes #2078 from pitrou/ARROW-2568 and squashes the following commits:
91187bf6 <Antoine Pitrou> Move use_threads flag into PandasOptions
a7aeed0e <Antoine Pitrou> Factor out secession predicate
f601d4e9 <Antoine Pitrou> ThreadPool::State pointer is const
4567a2c3 <Antoine Pitrou> Add a two-argument variant of ParallelFor() that uses the global CPU thread pool
d0a527ab <Antoine Pitrou> Restore single-thread path in WriteTableToBlocks()
2171e6e2 <Antoine Pitrou> On Windows, avoid shutting down the global thread pool at process exit
934e5a11 <Antoine Pitrou> Add & operator between Statuses
96397076 <Antoine Pitrou> Factor out deprecation logic
6b685406 <Antoine Pitrou> Fix MSVC warning
6b6f64a1 <Antoine Pitrou> Make ThreadPool capacity an int, not a size_t
61669755 <Antoine Pitrou> Rename CPUThreadPool() to GetCpuThreadPool()
d4eb8d47 <Antoine Pitrou> Export ThreadPool and CPUThreadPool()
6985afe6 <Antoine Pitrou> Lint
fcca62f5 <Antoine Pitrou> Emit FutureWarning (which is visible by default) rather than DeprecationWarning
172fba37 <Antoine Pitrou> Use C++ API, rather than multiprocessing, in pyarrow.{cpu_count,set_cpu_count}
50310b88 <Antoine Pitrou> Add API function to get desired ThreadPool capacity
3417325c <Antoine Pitrou> ARROW-2568: WIP
---
cpp/src/arrow/python/arrow_to_pandas.cc | 30 +++---
cpp/src/arrow/python/arrow_to_pandas.h | 10 +-
cpp/src/arrow/python/python-test.cc | 3 +-
cpp/src/arrow/status-test.cc | 37 +++++++
cpp/src/arrow/status.cc | 6 +-
cpp/src/arrow/status.h | 64 +++++++++++-
cpp/src/arrow/util/memory.h | 2 +-
cpp/src/arrow/util/parallel.h | 23 ++++
cpp/src/arrow/util/thread-pool-test.cc | 28 +++--
cpp/src/arrow/util/thread-pool.cc | 156 ++++++++++++++++++----------
cpp/src/arrow/util/thread-pool.h | 58 ++++++-----
cpp/src/plasma/client.cc | 2 +-
python/pyarrow/_config.pyx | 57 ----------
python/pyarrow/array.pxi | 3 +-
python/pyarrow/feather.py | 25 +++--
python/pyarrow/includes/libarrow.pxd | 8 +-
python/pyarrow/ipc.py | 12 ++-
python/pyarrow/lib.pyx | 30 +++---
python/pyarrow/pandas_compat.py | 12 +--
python/pyarrow/table.pxi | 33 +++---
python/pyarrow/tests/test_convert_pandas.py | 10 +-
python/pyarrow/tests/test_feather.py | 6 +-
python/pyarrow/tests/test_ipc.py | 8 +-
python/pyarrow/tests/test_misc.py | 10 ++
python/pyarrow/util.py | 10 ++
25 files changed, 402 insertions(+), 241 deletions(-)
diff --git a/cpp/src/arrow/python/arrow_to_pandas.cc b/cpp/src/arrow/python/arrow_to_pandas.cc
index f8887d4..71b62cf 100644
--- a/cpp/src/arrow/python/arrow_to_pandas.cc
+++ b/cpp/src/arrow/python/arrow_to_pandas.cc
@@ -1370,14 +1370,14 @@ class DataFrameBlockCreator {
const std::shared_ptr<Table>& table, MemoryPool* pool)
: table_(table), options_(options), pool_(pool) {}
- Status Convert(int nthreads, PyObject** output) {
+ Status Convert(PyObject** output) {
column_types_.resize(table_->num_columns());
column_block_placement_.resize(table_->num_columns());
type_counts_.clear();
blocks_.clear();
RETURN_NOT_OK(CreateBlocks());
- RETURN_NOT_OK(WriteTableToBlocks(nthreads));
+ RETURN_NOT_OK(WriteTableToBlocks());
return GetResultList(output);
}
@@ -1450,23 +1450,21 @@ class DataFrameBlockCreator {
return Status::OK();
}
- Status WriteTableToBlocks(int nthreads) {
+ Status WriteTableToBlocks() {
auto WriteColumn = [this](int i) {
std::shared_ptr<PandasBlock> block;
RETURN_NOT_OK(this->GetBlock(i, &block));
return block->Write(this->table_->column(i), i, this->column_block_placement_[i]);
};
- int num_tasks = table_->num_columns();
- nthreads = std::min<int>(nthreads, num_tasks);
- if (nthreads == 1) {
- for (int i = 0; i < num_tasks; ++i) {
+ if (options_.use_threads) {
+ return ParallelFor(table_->num_columns(), WriteColumn);
+ } else {
+ for (int i = 0; i < table_->num_columns(); ++i) {
RETURN_NOT_OK(WriteColumn(i));
}
- } else {
- RETURN_NOT_OK(ParallelFor(nthreads, num_tasks, WriteColumn));
+ return Status::OK();
}
- return Status::OK();
}
Status AppendBlocks(const BlockMap& blocks, PyObject* list) {
@@ -1866,15 +1864,15 @@ Status ConvertColumnToPandas(PandasOptions options, const std::shared_ptr<Column
}
Status ConvertTableToPandas(PandasOptions options, const std::shared_ptr<Table>& table,
- int nthreads, MemoryPool* pool, PyObject** out) {
- return ConvertTableToPandas(options, std::unordered_set<std::string>(), table, nthreads,
- pool, out);
+ MemoryPool* pool, PyObject** out) {
+ return ConvertTableToPandas(options, std::unordered_set<std::string>(), table, pool,
+ out);
}
Status ConvertTableToPandas(PandasOptions options,
const std::unordered_set<std::string>& categorical_columns,
- const std::shared_ptr<Table>& table, int nthreads,
- MemoryPool* pool, PyObject** out) {
+ const std::shared_ptr<Table>& table, MemoryPool* pool,
+ PyObject** out) {
std::shared_ptr<Table> current_table = table;
if (!categorical_columns.empty()) {
FunctionContext ctx;
@@ -1894,7 +1892,7 @@ Status ConvertTableToPandas(PandasOptions options,
}
DataFrameBlockCreator helper(options, current_table, pool);
- return helper.Convert(nthreads, out);
+ return helper.Convert(out);
}
} // namespace py
diff --git a/cpp/src/arrow/python/arrow_to_pandas.h b/cpp/src/arrow/python/arrow_to_pandas.h
index 4819eb4..2d32281 100644
--- a/cpp/src/arrow/python/arrow_to_pandas.h
+++ b/cpp/src/arrow/python/arrow_to_pandas.h
@@ -45,11 +45,13 @@ struct PandasOptions {
bool strings_to_categorical;
bool zero_copy_only;
bool integer_object_nulls;
+ bool use_threads;
PandasOptions()
: strings_to_categorical(false),
zero_copy_only(false),
- integer_object_nulls(false) {}
+ integer_object_nulls(false),
+ use_threads(false) {}
};
ARROW_EXPORT
@@ -68,7 +70,7 @@ Status ConvertColumnToPandas(PandasOptions options, const std::shared_ptr<Column
// tuple item: (indices: ndarray[int32], block: ndarray[TYPE, ndim=2])
ARROW_EXPORT
Status ConvertTableToPandas(PandasOptions options, const std::shared_ptr<Table>& table,
- int nthreads, MemoryPool* pool, PyObject** out);
+ MemoryPool* pool, PyObject** out);
/// Convert a whole table as efficiently as possible to a pandas.DataFrame.
///
@@ -77,8 +79,8 @@ Status ConvertTableToPandas(PandasOptions options, const std::shared_ptr<Table>&
ARROW_EXPORT
Status ConvertTableToPandas(PandasOptions options,
const std::unordered_set<std::string>& categorical_columns,
- const std::shared_ptr<Table>& table, int nthreads,
- MemoryPool* pool, PyObject** out);
+ const std::shared_ptr<Table>& table, MemoryPool* pool,
+ PyObject** out);
} // namespace py
} // namespace arrow
diff --git a/cpp/src/arrow/python/python-test.cc b/cpp/src/arrow/python/python-test.cc
index 60da08b..abe93b0 100644
--- a/cpp/src/arrow/python/python-test.cc
+++ b/cpp/src/arrow/python/python-test.cc
@@ -238,8 +238,9 @@ TEST(PandasConversionTest, TestObjectBlockWriteFails) {
PyObject* out;
Py_BEGIN_ALLOW_THREADS;
PandasOptions options;
+ options.use_threads = true;
MemoryPool* pool = default_memory_pool();
- ASSERT_RAISES(UnknownError, ConvertTableToPandas(options, table, 2, pool, &out));
+ ASSERT_RAISES(UnknownError, ConvertTableToPandas(options, table, pool, &out));
Py_END_ALLOW_THREADS;
}
diff --git a/cpp/src/arrow/status-test.cc b/cpp/src/arrow/status-test.cc
index d4f84e4..b7fc61f 100644
--- a/cpp/src/arrow/status-test.cc
+++ b/cpp/src/arrow/status-test.cc
@@ -40,4 +40,41 @@ TEST(StatusTest, TestToString) {
ASSERT_EQ(file_error.ToString(), ss.str());
}
+TEST(StatusTest, AndStatus) {
+ Status a = Status::OK();
+ Status b = Status::OK();
+ Status c = Status::Invalid("invalid value");
+ Status d = Status::IOError("file error");
+
+ Status res;
+ res = a & b;
+ ASSERT_TRUE(res.ok());
+ res = a & c;
+ ASSERT_TRUE(res.IsInvalid());
+ res = d & c;
+ ASSERT_TRUE(res.IsIOError());
+
+ res = Status::OK();
+ res &= c;
+ ASSERT_TRUE(res.IsInvalid());
+ res &= d;
+ ASSERT_TRUE(res.IsInvalid());
+
+ // With rvalues
+ res = Status::OK() & Status::Invalid("foo");
+ ASSERT_TRUE(res.IsInvalid());
+ res = Status::Invalid("foo") & Status::OK();
+ ASSERT_TRUE(res.IsInvalid());
+ res = Status::Invalid("foo") & Status::IOError("bar");
+ ASSERT_TRUE(res.IsInvalid());
+
+ res = Status::OK();
+ res &= Status::OK();
+ ASSERT_TRUE(res.ok());
+ res &= Status::Invalid("foo");
+ ASSERT_TRUE(res.IsInvalid());
+ res &= Status::IOError("bar");
+ ASSERT_TRUE(res.IsInvalid());
+}
+
} // namespace arrow
diff --git a/cpp/src/arrow/status.cc b/cpp/src/arrow/status.cc
index 8d446ef..918dbc7 100644
--- a/cpp/src/arrow/status.cc
+++ b/cpp/src/arrow/status.cc
@@ -23,12 +23,12 @@ Status::Status(StatusCode code, const std::string& msg) {
state_->msg = msg;
}
-void Status::CopyFrom(const State* state) {
+void Status::CopyFrom(const Status& s) {
delete state_;
- if (state == nullptr) {
+ if (s.state_ == nullptr) {
state_ = nullptr;
} else {
- state_ = new State(*state);
+ state_ = new State(*s.state_);
}
}
diff --git a/cpp/src/arrow/status.h b/cpp/src/arrow/status.h
index 0e7f014..d1e4d66 100644
--- a/cpp/src/arrow/status.h
+++ b/cpp/src/arrow/status.h
@@ -18,6 +18,7 @@
#include <cstring>
#include <iosfwd>
#include <string>
+#include <utility>
#ifdef ARROW_EXTRA_ERROR_CONTEXT
#include <sstream>
@@ -100,7 +101,17 @@ class ARROW_EXPORT Status {
// Copy the specified status.
Status(const Status& s);
- void operator=(const Status& s);
+ Status& operator=(const Status& s);
+
+ // Move the specified status.
+ Status(Status&& s);
+ Status& operator=(Status&& s);
+
+ // AND the statuses.
+ Status operator&(const Status& s) const;
+ Status operator&(Status&& s) const;
+ Status& operator&=(const Status& s);
+ Status& operator&=(Status&& s);
// Return a success status.
static Status OK() { return Status(); }
@@ -207,7 +218,8 @@ class ARROW_EXPORT Status {
// a `State` structure containing the error code and message(s)
State* state_;
- void CopyFrom(const State* s);
+ void CopyFrom(const Status& s);
+ void MoveFrom(Status& s);
};
static inline std::ostream& operator<<(std::ostream& os, const Status& x) {
@@ -215,15 +227,59 @@ static inline std::ostream& operator<<(std::ostream& os, const Status& x) {
return os;
}
+inline void Status::MoveFrom(Status& s) {
+ delete state_;
+ state_ = s.state_;
+ s.state_ = NULL;
+}
+
inline Status::Status(const Status& s)
: state_((s.state_ == NULL) ? NULL : new State(*s.state_)) {}
-inline void Status::operator=(const Status& s) {
+inline Status& Status::operator=(const Status& s) {
// The following condition catches both aliasing (when this == &s),
// and the common case where both s and *this are ok.
if (state_ != s.state_) {
- CopyFrom(s.state_);
+ CopyFrom(s);
+ }
+ return *this;
+}
+
+inline Status::Status(Status&& s) : state_(s.state_) { s.state_ = NULL; }
+
+inline Status& Status::operator=(Status&& s) {
+ MoveFrom(s);
+ return *this;
+}
+
+inline Status Status::operator&(const Status& s) const {
+ if (ok()) {
+ return s;
+ } else {
+ return *this;
+ }
+}
+
+inline Status Status::operator&(Status&& s) const {
+ if (ok()) {
+ return std::move(s);
+ } else {
+ return *this;
+ }
+}
+
+inline Status& Status::operator&=(const Status& s) {
+ if (ok() && !s.ok()) {
+ CopyFrom(s);
+ }
+ return *this;
+}
+
+inline Status& Status::operator&=(Status&& s) {
+ if (ok() && !s.ok()) {
+ MoveFrom(s);
}
+ return *this;
}
} // namespace arrow
diff --git a/cpp/src/arrow/util/memory.h b/cpp/src/arrow/util/memory.h
index f391d27..56fadca 100644
--- a/cpp/src/arrow/util/memory.h
+++ b/cpp/src/arrow/util/memory.h
@@ -36,7 +36,7 @@ uint8_t* pointer_logical_and(const uint8_t* address, uintptr_t bits) {
void parallel_memcopy(uint8_t* dst, const uint8_t* src, int64_t nbytes,
uintptr_t block_size, int num_threads) {
// XXX This function is really using `num_threads + 1` threads.
- auto pool = CPUThreadPool();
+ auto pool = GetCpuThreadPool();
uint8_t* left = pointer_logical_and(src + block_size - 1, ~(block_size - 1));
uint8_t* right = pointer_logical_and(src + nbytes, ~(block_size - 1));
diff --git a/cpp/src/arrow/util/parallel.h b/cpp/src/arrow/util/parallel.h
index 9fec000..156679e 100644
--- a/cpp/src/arrow/util/parallel.h
+++ b/cpp/src/arrow/util/parallel.h
@@ -24,9 +24,32 @@
#include <vector>
#include "arrow/status.h"
+#include "arrow/util/thread-pool.h"
namespace arrow {
+// A parallelizer that takes a `Status(int)` function and calls it with
+// arguments between 0 and `num_tasks - 1`, on an arbitrary number of threads.
+
+template <class FUNCTION>
+Status ParallelFor(int num_tasks, FUNCTION&& func) {
+ auto pool = internal::GetCpuThreadPool();
+ std::vector<std::future<Status>> futures(num_tasks);
+
+ for (int i = 0; i < num_tasks; ++i) {
+ futures[i] = pool->Submit(func, i);
+ }
+ auto st = Status::OK();
+ for (auto& fut : futures) {
+ st &= fut.get();
+ }
+ return st;
+}
+
+// A variant of ParallelFor() with an explicit number of dedicated threads.
+// In most cases it's more appropriate to use the 2-argument ParallelFor (above),
+// or directly the global CPU thread pool (arrow/util/thread-pool.h).
+
template <class FUNCTION>
Status ParallelFor(int nthreads, int num_tasks, FUNCTION&& func) {
std::vector<std::thread> thread_pool;
diff --git a/cpp/src/arrow/util/thread-pool-test.cc b/cpp/src/arrow/util/thread-pool-test.cc
index 42bbe6e..a70f016 100644
--- a/cpp/src/arrow/util/thread-pool-test.cc
+++ b/cpp/src/arrow/util/thread-pool-test.cc
@@ -127,7 +127,7 @@ class TestThreadPool : public ::testing::Test {
std::shared_ptr<ThreadPool> MakeThreadPool() { return MakeThreadPool(4); }
- std::shared_ptr<ThreadPool> MakeThreadPool(size_t threads) {
+ std::shared_ptr<ThreadPool> MakeThreadPool(int threads) {
std::shared_ptr<ThreadPool> pool;
Status st = ThreadPool::Make(threads, &pool);
return pool;
@@ -163,7 +163,7 @@ class TestThreadPool : public ::testing::Test {
TEST_F(TestThreadPool, ConstructDestruct) {
// Stress shutdown-at-destruction logic
- for (size_t threads : {1, 2, 3, 8, 32, 70}) {
+ for (int threads : {1, 2, 3, 8, 32, 70}) {
auto pool = this->MakeThreadPool(threads);
}
}
@@ -223,21 +223,31 @@ TEST_F(TestThreadPool, QuickShutdown) {
TEST_F(TestThreadPool, SetCapacity) {
auto pool = this->MakeThreadPool(3);
ASSERT_EQ(pool->GetCapacity(), 3);
+ ASSERT_EQ(pool->GetActualCapacity(), 3);
+
ASSERT_OK(pool->SetCapacity(5));
ASSERT_EQ(pool->GetCapacity(), 5);
+ ASSERT_EQ(pool->GetActualCapacity(), 5);
+
ASSERT_OK(pool->SetCapacity(2));
- // Wait for workers to wake up and secede
- busy_wait(0.5, [&] { return pool->GetCapacity() == 2; });
ASSERT_EQ(pool->GetCapacity(), 2);
+ // Wait for workers to wake up and secede
+ busy_wait(0.5, [&] { return pool->GetActualCapacity() == 2; });
+ ASSERT_EQ(pool->GetActualCapacity(), 2);
+
ASSERT_OK(pool->SetCapacity(5));
ASSERT_EQ(pool->GetCapacity(), 5);
+ ASSERT_EQ(pool->GetActualCapacity(), 5);
+
// Downsize while tasks are pending
for (int i = 0; i < 10; ++i) {
ASSERT_OK(pool->Spawn(std::bind(sleep_for, 0.01 /* seconds */)));
}
ASSERT_OK(pool->SetCapacity(2));
- busy_wait(0.5, [&] { return pool->GetCapacity() == 2; });
ASSERT_EQ(pool->GetCapacity(), 2);
+ busy_wait(0.5, [&] { return pool->GetActualCapacity() == 2; });
+ ASSERT_EQ(pool->GetActualCapacity(), 2);
+
// Ensure nothing got stuck
ASSERT_OK(pool->Shutdown());
}
@@ -274,14 +284,16 @@ TEST_F(TestThreadPool, Submit) {
TEST(TestGlobalThreadPool, Capacity) {
// Sanity check
- auto pool = CPUThreadPool();
- size_t capacity = pool->GetCapacity();
+ auto pool = GetCpuThreadPool();
+ int capacity = pool->GetCapacity();
ASSERT_GT(capacity, 0);
+ ASSERT_EQ(pool->GetActualCapacity(), capacity);
+ ASSERT_EQ(GetCpuThreadPoolCapacity(), capacity);
// Exercise default capacity heuristic
ASSERT_OK(DelEnvVar("OMP_NUM_THREADS"));
ASSERT_OK(DelEnvVar("OMP_THREAD_LIMIT"));
- size_t hw_capacity = std::thread::hardware_concurrency();
+ int hw_capacity = std::thread::hardware_concurrency();
ASSERT_EQ(ThreadPool::DefaultCapacity(), hw_capacity);
ASSERT_OK(SetEnvVar("OMP_NUM_THREADS", "13"));
ASSERT_EQ(ThreadPool::DefaultCapacity(), 13);
diff --git a/cpp/src/arrow/util/thread-pool.cc b/cpp/src/arrow/util/thread-pool.cc
index 0e264f3..997ff5d 100644
--- a/cpp/src/arrow/util/thread-pool.cc
+++ b/cpp/src/arrow/util/thread-pool.cc
@@ -25,14 +25,39 @@
namespace arrow {
namespace internal {
+struct ThreadPool::State {
+ State() : desired_capacity_(0), please_shutdown_(false), quick_shutdown_(false) {}
+
+ std::mutex mutex_;
+ std::condition_variable cv_;
+ std::condition_variable cv_shutdown_;
+
+ std::list<std::thread> workers_;
+ // Trashcan for finished threads
+ std::vector<std::thread> finished_workers_;
+ std::deque<std::function<void()>> pending_tasks_;
+
+ // Desired number of threads
+ int desired_capacity_;
+ // Are we shutting down?
+ bool please_shutdown_;
+ bool quick_shutdown_;
+};
+
ThreadPool::ThreadPool()
- : desired_capacity_(0), please_shutdown_(false), quick_shutdown_(false) {}
+ : sp_state_(std::make_shared<ThreadPool::State>()),
+ state_(sp_state_.get()),
+ shutdown_on_destroy_(true) {}
-ThreadPool::~ThreadPool() { ARROW_UNUSED(Shutdown(false /* wait */)); }
+ThreadPool::~ThreadPool() {
+ if (shutdown_on_destroy_) {
+ ARROW_UNUSED(Shutdown(false /* wait */));
+ }
+}
-Status ThreadPool::SetCapacity(size_t threads) {
- std::unique_lock<std::mutex> lock(mutex_);
- if (please_shutdown_) {
+Status ThreadPool::SetCapacity(int threads) {
+ std::unique_lock<std::mutex> lock(state_->mutex_);
+ if (state_->please_shutdown_) {
return Status::Invalid("operation forbidden during or after shutdown");
}
if (threads <= 0) {
@@ -40,91 +65,103 @@ Status ThreadPool::SetCapacity(size_t threads) {
}
CollectFinishedWorkersUnlocked();
- desired_capacity_ = threads;
- int64_t diff = desired_capacity_ - workers_.size();
+ state_->desired_capacity_ = threads;
+ int diff = static_cast<int>(threads - state_->workers_.size());
if (diff > 0) {
- LaunchWorkersUnlocked(static_cast<size_t>(diff));
+ LaunchWorkersUnlocked(diff);
} else if (diff < 0) {
// Wake threads to ask them to stop
- cv_.notify_all();
+ state_->cv_.notify_all();
}
return Status::OK();
}
-size_t ThreadPool::GetCapacity() {
- std::unique_lock<std::mutex> lock(mutex_);
- return workers_.size();
+int ThreadPool::GetCapacity() {
+ std::unique_lock<std::mutex> lock(state_->mutex_);
+ return state_->desired_capacity_;
+}
+
+int ThreadPool::GetActualCapacity() {
+ std::unique_lock<std::mutex> lock(state_->mutex_);
+ return static_cast<int>(state_->workers_.size());
}
Status ThreadPool::Shutdown(bool wait) {
- std::unique_lock<std::mutex> lock(mutex_);
+ std::unique_lock<std::mutex> lock(state_->mutex_);
- if (please_shutdown_) {
+ if (state_->please_shutdown_) {
return Status::Invalid("Shutdown() already called");
}
- please_shutdown_ = true;
- quick_shutdown_ = !wait;
- cv_.notify_all();
- cv_shutdown_.wait(lock, [this] { return workers_.empty(); });
- if (!quick_shutdown_) {
- DCHECK_EQ(pending_tasks_.size(), 0);
+ state_->please_shutdown_ = true;
+ state_->quick_shutdown_ = !wait;
+ state_->cv_.notify_all();
+ state_->cv_shutdown_.wait(lock, [this] { return state_->workers_.empty(); });
+ if (!state_->quick_shutdown_) {
+ DCHECK_EQ(state_->pending_tasks_.size(), 0);
} else {
- pending_tasks_.clear();
+ state_->pending_tasks_.clear();
}
CollectFinishedWorkersUnlocked();
return Status::OK();
}
void ThreadPool::CollectFinishedWorkersUnlocked() {
- for (auto& thread : finished_workers_) {
+ for (auto& thread : state_->finished_workers_) {
// Make sure OS thread has exited
thread.join();
}
- finished_workers_.clear();
+ state_->finished_workers_.clear();
}
-void ThreadPool::LaunchWorkersUnlocked(size_t threads) {
- for (size_t i = 0; i < threads; i++) {
- workers_.emplace_back();
- auto it = --workers_.end();
- *it = std::thread([this, it] { WorkerLoop(it); });
+void ThreadPool::LaunchWorkersUnlocked(int threads) {
+ std::shared_ptr<State> state = sp_state_;
+
+ for (int i = 0; i < threads; i++) {
+ state_->workers_.emplace_back();
+ auto it = --(state_->workers_.end());
+ *it = std::thread([state, it] { WorkerLoop(state, it); });
}
}
-void ThreadPool::WorkerLoop(std::list<std::thread>::iterator it) {
- std::unique_lock<std::mutex> lock(mutex_);
+void ThreadPool::WorkerLoop(std::shared_ptr<State> state,
+ std::list<std::thread>::iterator it) {
+ std::unique_lock<std::mutex> lock(state->mutex_);
// Since we hold the lock, `it` now points to the correct thread object
// (LaunchWorkersUnlocked has exited)
DCHECK_EQ(std::this_thread::get_id(), it->get_id());
+ // If too many threads, we should secede from the pool
+ const auto should_secede = [&]() -> bool {
+ return state->workers_.size() > static_cast<size_t>(state->desired_capacity_);
+ };
+
while (true) {
// By the time this thread is started, some tasks may have been pushed
// or shutdown could even have been requested. So we only wait on the
// condition variable at the end of the loop.
// Execute pending tasks if any
- while (!pending_tasks_.empty() && !quick_shutdown_) {
- // If too many threads, secede from the pool.
+ while (!state->pending_tasks_.empty() && !state->quick_shutdown_) {
// We check this opportunistically at each loop iteration since
// it releases the lock below.
- if (workers_.size() > desired_capacity_) {
+ if (should_secede()) {
break;
}
{
- std::function<void()> task = std::move(pending_tasks_.front());
- pending_tasks_.pop_front();
+ std::function<void()> task = std::move(state->pending_tasks_.front());
+ state->pending_tasks_.pop_front();
lock.unlock();
task();
}
lock.lock();
}
// Now either the queue is empty *or* a quick shutdown was requested
- if (please_shutdown_ || workers_.size() > desired_capacity_) {
+ if (state->please_shutdown_ || should_secede()) {
break;
}
// Wait for next wakeup
- cv_.wait(lock);
+ state->cv_.wait(lock);
}
// We're done. Move our thread object to the trashcan of finished
@@ -135,28 +172,28 @@ void ThreadPool::WorkerLoop(std::list<std::thread>::iterator it) {
// are exited before the ThreadPool is destroyed. Otherwise subtle
// timing conditions can lead to false positives with Valgrind.
DCHECK_EQ(std::this_thread::get_id(), it->get_id());
- finished_workers_.push_back(std::move(*it));
- workers_.erase(it);
- if (please_shutdown_) {
+ state->finished_workers_.push_back(std::move(*it));
+ state->workers_.erase(it);
+ if (state->please_shutdown_) {
// Notify the function waiting in Shutdown().
- cv_shutdown_.notify_one();
+ state->cv_shutdown_.notify_one();
}
}
Status ThreadPool::SpawnReal(std::function<void()> task) {
{
- std::lock_guard<std::mutex> lock(mutex_);
- if (please_shutdown_) {
+ std::lock_guard<std::mutex> lock(state_->mutex_);
+ if (state_->please_shutdown_) {
return Status::Invalid("operation forbidden during or after shutdown");
}
CollectFinishedWorkersUnlocked();
- pending_tasks_.push_back(std::move(task));
+ state_->pending_tasks_.push_back(std::move(task));
}
- cv_.notify_one();
+ state_->cv_.notify_one();
return Status::OK();
}
-Status ThreadPool::Make(size_t threads, std::shared_ptr<ThreadPool>* out) {
+Status ThreadPool::Make(int threads, std::shared_ptr<ThreadPool>* out) {
auto pool = std::shared_ptr<ThreadPool>(new ThreadPool());
RETURN_NOT_OK(pool->SetCapacity(threads));
*out = std::move(pool);
@@ -166,7 +203,7 @@ Status ThreadPool::Make(size_t threads, std::shared_ptr<ThreadPool>* out) {
// ----------------------------------------------------------------------
// Global thread pool
-static size_t ParseOMPEnvVar(const char* name) {
+static int ParseOMPEnvVar(const char* name) {
// OMP_NUM_THREADS is a comma-separated list of positive integers.
// We are only interested in the first (top-level) number.
std::string str;
@@ -178,14 +215,14 @@ static size_t ParseOMPEnvVar(const char* name) {
str = str.substr(0, first_comma);
}
try {
- return static_cast<size_t>(std::max(0LL, std::stoll(str)));
+ return std::max(0, std::stoi(str));
} catch (...) {
return 0;
}
}
-size_t ThreadPool::DefaultCapacity() {
- size_t capacity, limit;
+int ThreadPool::DefaultCapacity() {
+ int capacity, limit;
capacity = ParseOMPEnvVar("OMP_NUM_THREADS");
if (capacity == 0) {
capacity = std::thread::hardware_concurrency();
@@ -203,21 +240,30 @@ size_t ThreadPool::DefaultCapacity() {
}
// Helper for the singleton pattern
-static std::shared_ptr<ThreadPool> MakePoolWithDefaultCapacity() {
+std::shared_ptr<ThreadPool> ThreadPool::MakeCpuThreadPool() {
std::shared_ptr<ThreadPool> pool;
DCHECK_OK(ThreadPool::Make(ThreadPool::DefaultCapacity(), &pool));
+ // On Windows, the global ThreadPool destructor may be called after
+ // non-main threads have been killed by the OS, and hang in a condition
+ // variable.
+ // On Unix, we want to avoid leak reports by Valgrind.
+#ifdef _WIN32
+ pool->shutdown_on_destroy_ = false;
+#endif
return pool;
}
-ThreadPool* CPUThreadPool() {
- static std::shared_ptr<ThreadPool> singleton = MakePoolWithDefaultCapacity();
+ThreadPool* GetCpuThreadPool() {
+ static std::shared_ptr<ThreadPool> singleton = ThreadPool::MakeCpuThreadPool();
return singleton.get();
}
} // namespace internal
-Status SetCPUThreadPoolCapacity(size_t threads) {
- return internal::CPUThreadPool()->SetCapacity(threads);
+int GetCpuThreadPoolCapacity() { return internal::GetCpuThreadPool()->GetCapacity(); }
+
+Status SetCpuThreadPoolCapacity(int threads) {
+ return internal::GetCpuThreadPool()->SetCapacity(threads);
}
} // namespace arrow
diff --git a/cpp/src/arrow/util/thread-pool.h b/cpp/src/arrow/util/thread-pool.h
index d426cbb..c1b10f7 100644
--- a/cpp/src/arrow/util/thread-pool.h
+++ b/cpp/src/arrow/util/thread-pool.h
@@ -36,9 +36,14 @@
namespace arrow {
+// Get the number of worker threads used by the process-global thread pool
+// for CPU-bound tasks. This is an idealized number, the actual number
+// may lag a bit.
+ARROW_EXPORT int GetCpuThreadPoolCapacity();
+
// Set the number of worker threads used by the process-global thread pool
// for CPU-bound tasks.
-ARROW_EXPORT Status SetCPUThreadPoolCapacity(size_t threads);
+ARROW_EXPORT Status SetCpuThreadPoolCapacity(int threads);
namespace internal {
@@ -59,22 +64,27 @@ struct packaged_task_wrapper {
} // namespace detail
-class ThreadPool {
+class ARROW_EXPORT ThreadPool {
public:
// Construct a thread pool with the given number of worker threads
- static Status Make(size_t threads, std::shared_ptr<ThreadPool>* out);
+ static Status Make(int threads, std::shared_ptr<ThreadPool>* out);
// Destroy thread pool; the pool will first be shut down
~ThreadPool();
+ // Return the desired number of worker threads.
+ // The actual number of workers may lag a bit before being adjusted to
+ // match this value.
+ int GetCapacity();
+
// Dynamically change the number of worker threads.
// This function returns quickly, but it may take more time before the
// thread count is fully adjusted.
- Status SetCapacity(size_t threads);
+ Status SetCapacity(int threads);
// Heuristic for the default capacity of a thread pool for CPU-bound tasks.
// This is exposed as a static method to help with testing.
- static size_t DefaultCapacity();
+ static int DefaultCapacity();
// Shutdown the pool. Once the pool starts shutting down, new tasks
// cannot be submitted anymore.
@@ -106,6 +116,7 @@ class ThreadPool {
Status st = SpawnReal(detail::packaged_task_wrapper<Result>(std::move(task)));
if (!st.ok()) {
+ // This happens when Submit() is called after Shutdown()
throw std::runtime_error(st.ToString());
}
return fut;
@@ -114,6 +125,9 @@ class ThreadPool {
protected:
FRIEND_TEST(TestThreadPool, SetCapacity);
FRIEND_TEST(TestGlobalThreadPool, Capacity);
+ friend ARROW_EXPORT ThreadPool* GetCpuThreadPool();
+
+ struct State;
ThreadPool();
@@ -123,28 +137,24 @@ class ThreadPool {
// Collect finished worker threads, making sure the OS threads have exited
void CollectFinishedWorkersUnlocked();
// Launch a given number of additional workers
- void LaunchWorkersUnlocked(size_t threads);
- void WorkerLoop(std::list<std::thread>::iterator it);
- size_t GetCapacity();
-
- std::mutex mutex_;
- std::condition_variable cv_;
- std::condition_variable cv_shutdown_;
-
- std::list<std::thread> workers_;
- // Trashcan for finished threads
- std::vector<std::thread> finished_workers_;
- std::deque<std::function<void()>> pending_tasks_;
-
- // Desired number of threads
- size_t desired_capacity_;
- // Are we shutting down?
- bool please_shutdown_;
- bool quick_shutdown_;
+ void LaunchWorkersUnlocked(int threads);
+ // Get the current actual capacity
+ int GetActualCapacity();
+
+ // The worker loop is a static method so that it can keep running
+ // after the ThreadPool is destroyed
+ static void WorkerLoop(std::shared_ptr<State> state,
+ std::list<std::thread>::iterator it);
+
+ static std::shared_ptr<ThreadPool> MakeCpuThreadPool();
+
+ const std::shared_ptr<State> sp_state_;
+ State* const state_;
+ bool shutdown_on_destroy_;
};
// Return the process-global thread pool for CPU-bound tasks.
-ThreadPool* CPUThreadPool();
+ARROW_EXPORT ThreadPool* GetCpuThreadPool();
} // namespace internal
} // namespace arrow
diff --git a/cpp/src/plasma/client.cc b/cpp/src/plasma/client.cc
index 43e27e0..30d2e43 100644
--- a/cpp/src/plasma/client.cc
+++ b/cpp/src/plasma/client.cc
@@ -698,7 +698,7 @@ bool PlasmaClient::Impl::compute_object_hash_parallel(XXH64_state_t* hash_state,
int64_t nbytes) {
// Note that this function will likely be faster if the address of data is
// aligned on a 64-byte boundary.
- auto pool = arrow::internal::CPUThreadPool();
+ auto pool = arrow::internal::GetCpuThreadPool();
const int num_threads = kHashingConcurrency;
uint64_t threadhash[num_threads + 1];
diff --git a/python/pyarrow/_config.pyx b/python/pyarrow/_config.pyx
deleted file mode 100644
index bc9f36d..0000000
--- a/python/pyarrow/_config.pyx
+++ /dev/null
@@ -1,57 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-# cython: profile=False
-# distutils: language = c++
-# cython: embedsignature = True
-
-import numpy as np
-import multiprocessing
-import os
-
-cdef extern from 'arrow/python/init.h':
- int arrow_init_numpy() except -1
-
-cdef extern from 'arrow/python/config.h' namespace 'arrow::py':
- void set_numpy_nan(object o)
-
-arrow_init_numpy()
-
-set_numpy_nan(np.nan)
-
-cdef int CPU_COUNT = int(
- os.environ.get('OMP_NUM_THREADS',
- max(multiprocessing.cpu_count() // 2, 1)))
-
-
-def cpu_count():
- """
- Returns
- -------
- count : Number of CPUs to use by default in parallel operations. Default is
- max(1, multiprocessing.cpu_count() / 2), but can be overridden by the
- OMP_NUM_THREADS environment variable. For the default, we divide the CPU
- count by 2 because most modern computers have hyperthreading turned on,
- so doubling the CPU count beyond the number of physical cores does not
- help.
- """
- return CPU_COUNT
-
-
-def set_cpu_count(count):
- global CPU_COUNT
- CPU_COUNT = max(int(count), 1)
diff --git a/python/pyarrow/array.pxi b/python/pyarrow/array.pxi
index 94c37b3..3f84f17 100644
--- a/python/pyarrow/array.pxi
+++ b/python/pyarrow/array.pxi
@@ -602,7 +602,8 @@ cdef class Array:
options = PandasOptions(
strings_to_categorical=strings_to_categorical,
zero_copy_only=zero_copy_only,
- integer_object_nulls=integer_object_nulls)
+ integer_object_nulls=integer_object_nulls,
+ use_threads=False)
with nogil:
check_status(ConvertArrayToPandas(options, self.sp_array,
self, &out))
diff --git a/python/pyarrow/feather.py b/python/pyarrow/feather.py
index c2157ab..26f22da 100644
--- a/python/pyarrow/feather.py
+++ b/python/pyarrow/feather.py
@@ -26,6 +26,8 @@ from pyarrow.compat import pdapi
from pyarrow.lib import FeatherError # noqa
from pyarrow.lib import RecordBatch, Table, concat_tables
import pyarrow.lib as ext
+from .util import _deprecate_nthreads
+
try:
infer_dtype = pdapi.infer_dtype
@@ -45,7 +47,7 @@ class FeatherReader(ext.FeatherReader):
def read(self, *args, **kwargs):
warnings.warn("read has been deprecated. Use read_pandas instead.",
- DeprecationWarning)
+ FutureWarning, stacklevel=2)
return self.read_pandas(*args, **kwargs)
def read_table(self, columns=None):
@@ -66,8 +68,10 @@ class FeatherReader(ext.FeatherReader):
table = Table.from_arrays(columns, names=names)
return table
- def read_pandas(self, columns=None, nthreads=1):
- return self.read_table(columns=columns).to_pandas(nthreads=nthreads)
+ def read_pandas(self, columns=None, nthreads=None, use_threads=False):
+ use_threads = _deprecate_nthreads(use_threads, nthreads)
+ return self.read_table(columns=columns).to_pandas(
+ use_threads=use_threads)
class FeatherWriter(object):
@@ -141,7 +145,7 @@ class FeatherDataset(object):
.format(piece, self.schema,
table.schema))
- def read_pandas(self, columns=None, nthreads=1):
+ def read_pandas(self, columns=None, nthreads=None, use_threads=False):
"""
Read multiple Parquet files as a single pandas DataFrame
@@ -157,7 +161,9 @@ class FeatherDataset(object):
pandas.DataFrame
Content of the file as a pandas DataFrame (of columns)
"""
- return self.read_table(columns=columns).to_pandas(nthreads=nthreads)
+ use_threads = _deprecate_nthreads(use_threads, nthreads)
+ return self.read_table(columns=columns).to_pandas(
+ use_threads=use_threads)
def write_feather(df, dest):
@@ -186,7 +192,7 @@ def write_feather(df, dest):
raise
-def read_feather(source, columns=None, nthreads=1):
+def read_feather(source, columns=None, nthreads=None, use_threads=False):
"""
Read a pandas.DataFrame from Feather format
@@ -196,15 +202,16 @@ def read_feather(source, columns=None, nthreads=1):
columns : sequence, optional
Only read a specific set of columns. If not provided, all columns are
read
- nthreads : int, default 1
- Number of CPU threads to use when reading to pandas.DataFrame
+ use_threads: bool, default False
+ Whether to parallelize reading using multiple threads
Returns
-------
df : pandas.DataFrame
"""
+ use_threads = _deprecate_nthreads(use_threads, nthreads)
reader = FeatherReader(source)
- return reader.read_pandas(columns=columns, nthreads=nthreads)
+ return reader.read_pandas(columns=columns, use_threads=use_threads)
def read_table(source, columns=None):
diff --git a/python/pyarrow/includes/libarrow.pxd b/python/pyarrow/includes/libarrow.pxd
index 9a0b468..5b0c211 100644
--- a/python/pyarrow/includes/libarrow.pxd
+++ b/python/pyarrow/includes/libarrow.pxd
@@ -959,7 +959,7 @@ cdef extern from "arrow/python/api.h" namespace "arrow::py" nogil:
PandasOptions options,
const unordered_set[c_string]& categorical_columns,
const shared_ptr[CTable]& table,
- int nthreads, CMemoryPool* pool,
+ CMemoryPool* pool,
PyObject** out)
void c_set_default_memory_pool \
@@ -987,6 +987,7 @@ cdef extern from "arrow/python/api.h" namespace "arrow::py" nogil:
c_bool strings_to_categorical
c_bool zero_copy_only
c_bool integer_object_nulls
+ c_bool use_threads
cdef extern from "arrow/python/api.h" namespace 'arrow::py' nogil:
@@ -1045,3 +1046,8 @@ cdef extern from 'arrow/util/compression.h' namespace 'arrow' nogil:
int64_t* output_length)
int64_t MaxCompressedLen(int64_t input_len, const uint8_t* input)
+
+
+cdef extern from 'arrow/util/thread-pool.h' namespace 'arrow' nogil:
+ int GetCpuThreadPoolCapacity()
+ CStatus SetCpuThreadPoolCapacity(int threads)
diff --git a/python/pyarrow/ipc.py b/python/pyarrow/ipc.py
index 4081fc5..bed2dd6 100644
--- a/python/pyarrow/ipc.py
+++ b/python/pyarrow/ipc.py
@@ -24,6 +24,7 @@ from pyarrow.lib import (Message, MessageReader, # noqa
read_tensor, write_tensor,
get_record_batch_size, get_tensor_size)
import pyarrow.lib as lib
+from .util import _deprecate_nthreads
class _ReadPandasOption(object):
@@ -168,22 +169,23 @@ def serialize_pandas(df, nthreads=None, preserve_index=True):
return sink.get_result()
-def deserialize_pandas(buf, nthreads=None):
+def deserialize_pandas(buf, nthreads=None, use_threads=False):
"""Deserialize a buffer protocol compatible object into a pandas DataFrame.
Parameters
----------
buf : buffer
An object compatible with the buffer protocol
- nthreads : int, defualt None
- The number of threads to use to convert the buffer to a DataFrame,
- default all CPUs
+ use_threads: boolean, default False
+ Whether to parallelize the conversion using multiple threads
Returns
-------
df : pandas.DataFrame
"""
+ use_threads = _deprecate_nthreads(use_threads, nthreads)
+
buffer_reader = pa.BufferReader(buf)
reader = pa.RecordBatchStreamReader(buffer_reader)
table = reader.read_all()
- return table.to_pandas(nthreads=nthreads)
+ return table.to_pandas(use_threads=use_threads)
diff --git a/python/pyarrow/lib.pyx b/python/pyarrow/lib.pyx
index 4c22fc4..9c661db 100644
--- a/python/pyarrow/lib.pyx
+++ b/python/pyarrow/lib.pyx
@@ -36,28 +36,26 @@ cimport cpython as cp
arrow_init_numpy()
set_numpy_nan(np.nan)
-cdef int CPU_COUNT = int(
- os.environ.get('OMP_NUM_THREADS',
- max(multiprocessing.cpu_count() // 2, 1)))
-
def cpu_count():
"""
- Returns
- -------
- count : Number of CPUs to use by default in parallel operations. Default is
- max(1, multiprocessing.cpu_count() / 2), but can be overridden by the
- OMP_NUM_THREADS environment variable. For the default, we divide the CPU
- count by 2 because most modern computers have hyperthreading turned on,
- so doubling the CPU count beyond the number of physical cores does not
- help.
+ Return the number of threads to use in parallel operations.
+
+ The number of threads is determined at startup by inspecting the
+ OMP_NUM_THREADS and OMP_THREAD_LIMIT environment variables. If neither
+ is present, it will default to the number of hardware threads on the
+ system. It can be modified at runtime by calling set_cpu_count().
"""
- return CPU_COUNT
+ return GetCpuThreadPoolCapacity()
-def set_cpu_count(count):
- global CPU_COUNT
- CPU_COUNT = max(int(count), 1)
+def set_cpu_count(int count):
+ """
+ Set the number of threads to use in parallel operations.
+ """
+ if count < 1:
+ raise ValueError("CPU count must be strictly positive")
+ check_status(SetCpuThreadPoolCapacity(count))
Type_NA = _Type_NA
diff --git a/python/pyarrow/pandas_compat.py b/python/pyarrow/pandas_compat.py
index c288c7f..18d8ed3 100644
--- a/python/pyarrow/pandas_compat.py
+++ b/python/pyarrow/pandas_compat.py
@@ -493,8 +493,7 @@ def _make_datetimetz(tz):
# Converting pyarrow.Table efficiently to pandas.DataFrame
-def table_to_blockmanager(options, table, memory_pool, nthreads=1,
- categories=None):
+def table_to_blockmanager(options, table, memory_pool, categories=None):
from pyarrow.compat import DatetimeTZDtype
index_columns = []
@@ -568,8 +567,7 @@ def table_to_blockmanager(options, table, memory_pool, nthreads=1,
block_table.schema.get_field_index(raw_name)
)
- blocks = _table_to_blocks(options, block_table, nthreads, memory_pool,
- categories)
+ blocks = _table_to_blocks(options, block_table, memory_pool, categories)
# Construct the row index
if len(index_arrays) > 1:
@@ -728,12 +726,12 @@ def _reconstruct_columns_from_metadata(columns, column_indexes):
return pd.MultiIndex(levels=new_levels, labels=labels, names=columns.names)
-def _table_to_blocks(options, block_table, nthreads, memory_pool, categories):
+def _table_to_blocks(options, block_table, memory_pool, categories):
# Part of table_to_blockmanager
# Convert an arrow table to Block from the internal pandas API
- result = pa.lib.table_to_blocks(options, block_table, nthreads,
- memory_pool, categories)
+ result = pa.lib.table_to_blocks(options, block_table, memory_pool,
+ categories)
# Defined above
return [_reconstruct_block(item) for item in result]
diff --git a/python/pyarrow/table.pxi b/python/pyarrow/table.pxi
index c867657..4edf5af 100644
--- a/python/pyarrow/table.pxi
+++ b/python/pyarrow/table.pxi
@@ -27,6 +27,8 @@ except ImportError:
else:
import pyarrow.pandas_compat as pdcompat
+from .util import _deprecate_nthreads
+
cdef class ChunkedArray:
"""
@@ -365,7 +367,8 @@ cdef class Column:
options = PandasOptions(
strings_to_categorical=strings_to_categorical,
zero_copy_only=zero_copy_only,
- integer_object_nulls=integer_object_nulls)
+ integer_object_nulls=integer_object_nulls,
+ use_threads=False)
with nogil:
check_status(libarrow.ConvertColumnToPandas(options,
@@ -746,7 +749,7 @@ cdef class RecordBatch:
entries.append((name, column))
return OrderedDict(entries)
- def to_pandas(self, nthreads=None):
+ def to_pandas(self, nthreads=None, use_threads=False):
"""
Convert the arrow::RecordBatch to a pandas DataFrame
@@ -754,7 +757,8 @@ cdef class RecordBatch:
-------
pandas.DataFrame
"""
- return Table.from_batches([self]).to_pandas(nthreads=nthreads)
+ use_threads = _deprecate_nthreads(use_threads, nthreads)
+ return Table.from_batches([self]).to_pandas(use_threads=use_threads)
@classmethod
def from_pandas(cls, df, Schema schema=None, bint preserve_index=True,
@@ -825,7 +829,7 @@ cdef class RecordBatch:
CRecordBatch.Make(schema, num_rows, c_arrays))
-def table_to_blocks(PandasOptions options, Table table, int nthreads,
+def table_to_blocks(PandasOptions options, Table table,
MemoryPool memory_pool, categories):
cdef:
PyObject* result_obj
@@ -840,9 +844,7 @@ def table_to_blocks(PandasOptions options, Table table, int nthreads,
with nogil:
check_status(
libarrow.ConvertTableToPandas(
- options, categorical_columns, c_table, nthreads, pool,
- &result_obj
- )
+ options, categorical_columns, c_table, pool, &result_obj)
)
return PyObject_to_object(result_obj)
@@ -1148,16 +1150,12 @@ cdef class Table:
def to_pandas(self, nthreads=None, strings_to_categorical=False,
memory_pool=None, zero_copy_only=False, categories=None,
- integer_object_nulls=False):
+ integer_object_nulls=False, use_threads=False):
"""
Convert the arrow::Table to a pandas DataFrame
Parameters
----------
- nthreads : int, default max(1, multiprocessing.cpu_count() / 2)
- For the default, we divide the CPU count by 2 because most modern
- computers have hyperthreading turned on, so doubling the CPU count
- beyond the number of physical cores does not help
strings_to_categorical : boolean, default False
Encode string (UTF8) and binary types to pandas.Categorical
memory_pool: MemoryPool, optional
@@ -1169,6 +1167,8 @@ cdef class Table:
List of columns that should be returned as pandas.Categorical
integer_object_nulls : boolean, default False
Cast integers with nulls to objects
+ use_threads: boolean, default False
+ Whether to parallelize the conversion using multiple threads
Returns
-------
@@ -1177,15 +1177,16 @@ cdef class Table:
cdef:
PandasOptions options
+ use_threads = _deprecate_nthreads(use_threads, nthreads)
+
options = PandasOptions(
strings_to_categorical=strings_to_categorical,
zero_copy_only=zero_copy_only,
- integer_object_nulls=integer_object_nulls)
+ integer_object_nulls=integer_object_nulls,
+ use_threads=use_threads)
self._check_nullptr()
- if nthreads is None:
- nthreads = cpu_count()
mgr = pdcompat.table_to_blockmanager(options, self, memory_pool,
- nthreads, categories)
+ categories)
return pd.DataFrame(mgr)
def to_pydict(self):
diff --git a/python/pyarrow/tests/test_convert_pandas.py b/python/pyarrow/tests/test_convert_pandas.py
index bdb84c7..6bdbcca 100644
--- a/python/pyarrow/tests/test_convert_pandas.py
+++ b/python/pyarrow/tests/test_convert_pandas.py
@@ -60,7 +60,7 @@ def _alltypes_example(size=100):
})
-def _check_pandas_roundtrip(df, expected=None, nthreads=1,
+def _check_pandas_roundtrip(df, expected=None, use_threads=False,
expected_schema=None,
check_dtype=True, schema=None,
preserve_index=False,
@@ -68,9 +68,9 @@ def _check_pandas_roundtrip(df, expected=None, nthreads=1,
klass = pa.RecordBatch if as_batch else pa.Table
table = klass.from_pandas(df, schema=schema,
preserve_index=preserve_index,
- nthreads=nthreads)
+ nthreads=2 if use_threads else 1)
- result = table.to_pandas(nthreads=nthreads)
+ result = table.to_pandas(use_threads=use_threads)
if expected_schema:
assert table.schema.equals(expected_schema)
if expected is None:
@@ -1836,8 +1836,8 @@ class TestConvertMisc(object):
def test_threaded_conversion(self):
df = _alltypes_example()
- _check_pandas_roundtrip(df, nthreads=2)
- _check_pandas_roundtrip(df, nthreads=2, as_batch=True)
+ _check_pandas_roundtrip(df, use_threads=True)
+ _check_pandas_roundtrip(df, use_threads=True, as_batch=True)
def test_category(self):
repeats = 5
diff --git a/python/pyarrow/tests/test_feather.py b/python/pyarrow/tests/test_feather.py
index 46e4e5f..1fcc608 100644
--- a/python/pyarrow/tests/test_feather.py
+++ b/python/pyarrow/tests/test_feather.py
@@ -65,7 +65,7 @@ class TestFeatherReader(unittest.TestCase):
def _check_pandas_roundtrip(self, df, expected=None, path=None,
columns=None, null_counts=None,
- nthreads=1):
+ use_threads=False):
if path is None:
path = random_path()
@@ -74,7 +74,7 @@ class TestFeatherReader(unittest.TestCase):
if not os.path.exists(path):
raise Exception('file not written')
- result = read_feather(path, columns, nthreads=nthreads)
+ result = read_feather(path, columns, use_threads=use_threads)
if expected is None:
expected = df
@@ -384,7 +384,7 @@ class TestFeatherReader(unittest.TestCase):
data = {'c{0}'.format(i): [''] * 10
for i in range(100)}
df = pd.DataFrame(data)
- self._check_pandas_roundtrip(df, nthreads=4)
+ self._check_pandas_roundtrip(df, use_threads=True)
def test_nan_as_null(self):
# Create a nan that is not numpy.nan
diff --git a/python/pyarrow/tests/test_ipc.py b/python/pyarrow/tests/test_ipc.py
index c31c322..07de19f 100644
--- a/python/pyarrow/tests/test_ipc.py
+++ b/python/pyarrow/tests/test_ipc.py
@@ -406,9 +406,9 @@ def test_get_record_batch_size():
assert pa.get_record_batch_size(batch) > (N * itemsize)
-def _check_serialize_pandas_round_trip(df, nthreads=1):
- buf = pa.serialize_pandas(df, nthreads=nthreads)
- result = pa.deserialize_pandas(buf, nthreads=nthreads)
+def _check_serialize_pandas_round_trip(df, use_threads=False):
+ buf = pa.serialize_pandas(df, nthreads=2 if use_threads else 1)
+ result = pa.deserialize_pandas(buf, use_threads=use_threads)
assert_frame_equal(result, df)
@@ -429,7 +429,7 @@ def test_pandas_serialize_round_trip_nthreads():
{'foo': [1.5, 1.6, 1.7], 'bar': list('abc')},
index=index, columns=columns
)
- _check_serialize_pandas_round_trip(df, nthreads=2)
+ _check_serialize_pandas_round_trip(df, use_threads=True)
def test_pandas_serialize_round_trip_multi_index():
diff --git a/python/pyarrow/tests/test_misc.py b/python/pyarrow/tests/test_misc.py
index 55787f1..28553a0 100644
--- a/python/pyarrow/tests/test_misc.py
+++ b/python/pyarrow/tests/test_misc.py
@@ -23,3 +23,13 @@ import pyarrow as pa
def test_get_include():
include_dir = pa.get_include()
assert os.path.exists(os.path.join(include_dir, 'arrow', 'api.h'))
+
+
+def test_cpu_count():
+ n = pa.cpu_count()
+ assert n > 0
+ try:
+ pa.set_cpu_count(n + 5)
+ assert pa.cpu_count() == n + 5
+ finally:
+ pa.set_cpu_count(n)
diff --git a/python/pyarrow/util.py b/python/pyarrow/util.py
index b882565..2954b62 100644
--- a/python/pyarrow/util.py
+++ b/python/pyarrow/util.py
@@ -35,3 +35,13 @@ def _deprecate_api(old_name, new_name, api, next_version):
warnings.warn(msg, FutureWarning)
return api(*args)
return wrapper
+
+
+def _deprecate_nthreads(use_threads, nthreads):
+ if nthreads is not None:
+ warnings.warn("`nthreads` argument is deprecated, "
+ "pass `use_threads` instead", FutureWarning,
+ stacklevel=3)
+ if nthreads > 1:
+ use_threads = True
+ return use_threads
--
To stop receiving notification emails like this one, please contact
wesm@apache.org.