You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2018/05/25 01:37:52 UTC

[arrow] branch master updated: ARROW-2568: [Python] Expose thread pool size setting to Python, and deprecate "nthreads" where possible

This is an automated email from the ASF dual-hosted git repository.

wesm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new 01202cc  ARROW-2568: [Python] Expose thread pool size setting to Python, and deprecate "nthreads" where possible
01202cc is described below

commit 01202ccc84144d69723bba76e61e8afb8222437b
Author: Antoine Pitrou <an...@python.org>
AuthorDate: Thu May 24 21:28:15 2018 -0400

    ARROW-2568: [Python] Expose thread pool size setting to Python, and deprecate "nthreads" where possible
    
    There are two areas where `nthreads` cannot be replaced immediately by the global thread pool:
    
    1. when converting Pandas data to Arrow table or record batch, since it uses a Python `ThreadPoolExecutor` from pure Python code (see `dataframe_to_arrays` in `pandas_compat.py`)
    2. when reading or writing Parquet data, since `parquet-cpp` relies on parallelization facilities in the stable version of Arrow (see https://github.com/apache/parquet-cpp/pull/467)
    
    Elsewhere, we add a `use_threads` boolean argument and deprecate `nthreads`.
    
    Author: Antoine Pitrou <an...@python.org>
    
    Closes #2078 from pitrou/ARROW-2568 and squashes the following commits:
    
    91187bf6 <Antoine Pitrou> Move use_threads flag into PandasOptions
    a7aeed0e <Antoine Pitrou> Factor out secession predicate
    f601d4e9 <Antoine Pitrou> ThreadPool::State pointer is const
    4567a2c3 <Antoine Pitrou> Add a two-argument variant of ParallelFor() that uses the global CPU thread pool
    d0a527ab <Antoine Pitrou> Restore single-thread path in WriteTableToBlocks()
    2171e6e2 <Antoine Pitrou> On Windows, avoid shutting down the global thread pool at process exit
    934e5a11 <Antoine Pitrou> Add & operator between Statuses
    96397076 <Antoine Pitrou> Factor out deprecation logic
    6b685406 <Antoine Pitrou> Fix MSVC warning
    6b6f64a1 <Antoine Pitrou> Make ThreadPool capacity an int, not a size_t
    61669755 <Antoine Pitrou> Rename CPUThreadPool() to GetCpuThreadPool()
    d4eb8d47 <Antoine Pitrou> Export ThreadPool and CPUThreadPool()
    6985afe6 <Antoine Pitrou> Lint
    fcca62f5 <Antoine Pitrou> Emit FutureWarning (which is visible by default) rather than DeprecationWarning
    172fba37 <Antoine Pitrou> Use C++ API, rather than multiprocessing, in pyarrow.{cpu_count,set_cpu_count}
    50310b88 <Antoine Pitrou> Add API function to get desired ThreadPool capacity
    3417325c <Antoine Pitrou> ARROW-2568:  WIP
---
 cpp/src/arrow/python/arrow_to_pandas.cc     |  30 +++---
 cpp/src/arrow/python/arrow_to_pandas.h      |  10 +-
 cpp/src/arrow/python/python-test.cc         |   3 +-
 cpp/src/arrow/status-test.cc                |  37 +++++++
 cpp/src/arrow/status.cc                     |   6 +-
 cpp/src/arrow/status.h                      |  64 +++++++++++-
 cpp/src/arrow/util/memory.h                 |   2 +-
 cpp/src/arrow/util/parallel.h               |  23 ++++
 cpp/src/arrow/util/thread-pool-test.cc      |  28 +++--
 cpp/src/arrow/util/thread-pool.cc           | 156 ++++++++++++++++++----------
 cpp/src/arrow/util/thread-pool.h            |  58 ++++++-----
 cpp/src/plasma/client.cc                    |   2 +-
 python/pyarrow/_config.pyx                  |  57 ----------
 python/pyarrow/array.pxi                    |   3 +-
 python/pyarrow/feather.py                   |  25 +++--
 python/pyarrow/includes/libarrow.pxd        |   8 +-
 python/pyarrow/ipc.py                       |  12 ++-
 python/pyarrow/lib.pyx                      |  30 +++---
 python/pyarrow/pandas_compat.py             |  12 +--
 python/pyarrow/table.pxi                    |  33 +++---
 python/pyarrow/tests/test_convert_pandas.py |  10 +-
 python/pyarrow/tests/test_feather.py        |   6 +-
 python/pyarrow/tests/test_ipc.py            |   8 +-
 python/pyarrow/tests/test_misc.py           |  10 ++
 python/pyarrow/util.py                      |  10 ++
 25 files changed, 402 insertions(+), 241 deletions(-)

diff --git a/cpp/src/arrow/python/arrow_to_pandas.cc b/cpp/src/arrow/python/arrow_to_pandas.cc
index f8887d4..71b62cf 100644
--- a/cpp/src/arrow/python/arrow_to_pandas.cc
+++ b/cpp/src/arrow/python/arrow_to_pandas.cc
@@ -1370,14 +1370,14 @@ class DataFrameBlockCreator {
                                  const std::shared_ptr<Table>& table, MemoryPool* pool)
       : table_(table), options_(options), pool_(pool) {}
 
-  Status Convert(int nthreads, PyObject** output) {
+  Status Convert(PyObject** output) {
     column_types_.resize(table_->num_columns());
     column_block_placement_.resize(table_->num_columns());
     type_counts_.clear();
     blocks_.clear();
 
     RETURN_NOT_OK(CreateBlocks());
-    RETURN_NOT_OK(WriteTableToBlocks(nthreads));
+    RETURN_NOT_OK(WriteTableToBlocks());
 
     return GetResultList(output);
   }
@@ -1450,23 +1450,21 @@ class DataFrameBlockCreator {
     return Status::OK();
   }
 
-  Status WriteTableToBlocks(int nthreads) {
+  Status WriteTableToBlocks() {
     auto WriteColumn = [this](int i) {
       std::shared_ptr<PandasBlock> block;
       RETURN_NOT_OK(this->GetBlock(i, &block));
       return block->Write(this->table_->column(i), i, this->column_block_placement_[i]);
     };
 
-    int num_tasks = table_->num_columns();
-    nthreads = std::min<int>(nthreads, num_tasks);
-    if (nthreads == 1) {
-      for (int i = 0; i < num_tasks; ++i) {
+    if (options_.use_threads) {
+      return ParallelFor(table_->num_columns(), WriteColumn);
+    } else {
+      for (int i = 0; i < table_->num_columns(); ++i) {
         RETURN_NOT_OK(WriteColumn(i));
       }
-    } else {
-      RETURN_NOT_OK(ParallelFor(nthreads, num_tasks, WriteColumn));
+      return Status::OK();
     }
-    return Status::OK();
   }
 
   Status AppendBlocks(const BlockMap& blocks, PyObject* list) {
@@ -1866,15 +1864,15 @@ Status ConvertColumnToPandas(PandasOptions options, const std::shared_ptr<Column
 }
 
 Status ConvertTableToPandas(PandasOptions options, const std::shared_ptr<Table>& table,
-                            int nthreads, MemoryPool* pool, PyObject** out) {
-  return ConvertTableToPandas(options, std::unordered_set<std::string>(), table, nthreads,
-                              pool, out);
+                            MemoryPool* pool, PyObject** out) {
+  return ConvertTableToPandas(options, std::unordered_set<std::string>(), table, pool,
+                              out);
 }
 
 Status ConvertTableToPandas(PandasOptions options,
                             const std::unordered_set<std::string>& categorical_columns,
-                            const std::shared_ptr<Table>& table, int nthreads,
-                            MemoryPool* pool, PyObject** out) {
+                            const std::shared_ptr<Table>& table, MemoryPool* pool,
+                            PyObject** out) {
   std::shared_ptr<Table> current_table = table;
   if (!categorical_columns.empty()) {
     FunctionContext ctx;
@@ -1894,7 +1892,7 @@ Status ConvertTableToPandas(PandasOptions options,
   }
 
   DataFrameBlockCreator helper(options, current_table, pool);
-  return helper.Convert(nthreads, out);
+  return helper.Convert(out);
 }
 
 }  // namespace py
diff --git a/cpp/src/arrow/python/arrow_to_pandas.h b/cpp/src/arrow/python/arrow_to_pandas.h
index 4819eb4..2d32281 100644
--- a/cpp/src/arrow/python/arrow_to_pandas.h
+++ b/cpp/src/arrow/python/arrow_to_pandas.h
@@ -45,11 +45,13 @@ struct PandasOptions {
   bool strings_to_categorical;
   bool zero_copy_only;
   bool integer_object_nulls;
+  bool use_threads;
 
   PandasOptions()
       : strings_to_categorical(false),
         zero_copy_only(false),
-        integer_object_nulls(false) {}
+        integer_object_nulls(false),
+        use_threads(false) {}
 };
 
 ARROW_EXPORT
@@ -68,7 +70,7 @@ Status ConvertColumnToPandas(PandasOptions options, const std::shared_ptr<Column
 // tuple item: (indices: ndarray[int32], block: ndarray[TYPE, ndim=2])
 ARROW_EXPORT
 Status ConvertTableToPandas(PandasOptions options, const std::shared_ptr<Table>& table,
-                            int nthreads, MemoryPool* pool, PyObject** out);
+                            MemoryPool* pool, PyObject** out);
 
 /// Convert a whole table as efficiently as possible to a pandas.DataFrame.
 ///
@@ -77,8 +79,8 @@ Status ConvertTableToPandas(PandasOptions options, const std::shared_ptr<Table>&
 ARROW_EXPORT
 Status ConvertTableToPandas(PandasOptions options,
                             const std::unordered_set<std::string>& categorical_columns,
-                            const std::shared_ptr<Table>& table, int nthreads,
-                            MemoryPool* pool, PyObject** out);
+                            const std::shared_ptr<Table>& table, MemoryPool* pool,
+                            PyObject** out);
 
 }  // namespace py
 }  // namespace arrow
diff --git a/cpp/src/arrow/python/python-test.cc b/cpp/src/arrow/python/python-test.cc
index 60da08b..abe93b0 100644
--- a/cpp/src/arrow/python/python-test.cc
+++ b/cpp/src/arrow/python/python-test.cc
@@ -238,8 +238,9 @@ TEST(PandasConversionTest, TestObjectBlockWriteFails) {
   PyObject* out;
   Py_BEGIN_ALLOW_THREADS;
   PandasOptions options;
+  options.use_threads = true;
   MemoryPool* pool = default_memory_pool();
-  ASSERT_RAISES(UnknownError, ConvertTableToPandas(options, table, 2, pool, &out));
+  ASSERT_RAISES(UnknownError, ConvertTableToPandas(options, table, pool, &out));
   Py_END_ALLOW_THREADS;
 }
 
diff --git a/cpp/src/arrow/status-test.cc b/cpp/src/arrow/status-test.cc
index d4f84e4..b7fc61f 100644
--- a/cpp/src/arrow/status-test.cc
+++ b/cpp/src/arrow/status-test.cc
@@ -40,4 +40,41 @@ TEST(StatusTest, TestToString) {
   ASSERT_EQ(file_error.ToString(), ss.str());
 }
 
+TEST(StatusTest, AndStatus) {
+  Status a = Status::OK();
+  Status b = Status::OK();
+  Status c = Status::Invalid("invalid value");
+  Status d = Status::IOError("file error");
+
+  Status res;
+  res = a & b;
+  ASSERT_TRUE(res.ok());
+  res = a & c;
+  ASSERT_TRUE(res.IsInvalid());
+  res = d & c;
+  ASSERT_TRUE(res.IsIOError());
+
+  res = Status::OK();
+  res &= c;
+  ASSERT_TRUE(res.IsInvalid());
+  res &= d;
+  ASSERT_TRUE(res.IsInvalid());
+
+  // With rvalues
+  res = Status::OK() & Status::Invalid("foo");
+  ASSERT_TRUE(res.IsInvalid());
+  res = Status::Invalid("foo") & Status::OK();
+  ASSERT_TRUE(res.IsInvalid());
+  res = Status::Invalid("foo") & Status::IOError("bar");
+  ASSERT_TRUE(res.IsInvalid());
+
+  res = Status::OK();
+  res &= Status::OK();
+  ASSERT_TRUE(res.ok());
+  res &= Status::Invalid("foo");
+  ASSERT_TRUE(res.IsInvalid());
+  res &= Status::IOError("bar");
+  ASSERT_TRUE(res.IsInvalid());
+}
+
 }  // namespace arrow
diff --git a/cpp/src/arrow/status.cc b/cpp/src/arrow/status.cc
index 8d446ef..918dbc7 100644
--- a/cpp/src/arrow/status.cc
+++ b/cpp/src/arrow/status.cc
@@ -23,12 +23,12 @@ Status::Status(StatusCode code, const std::string& msg) {
   state_->msg = msg;
 }
 
-void Status::CopyFrom(const State* state) {
+void Status::CopyFrom(const Status& s) {
   delete state_;
-  if (state == nullptr) {
+  if (s.state_ == nullptr) {
     state_ = nullptr;
   } else {
-    state_ = new State(*state);
+    state_ = new State(*s.state_);
   }
 }
 
diff --git a/cpp/src/arrow/status.h b/cpp/src/arrow/status.h
index 0e7f014..d1e4d66 100644
--- a/cpp/src/arrow/status.h
+++ b/cpp/src/arrow/status.h
@@ -18,6 +18,7 @@
 #include <cstring>
 #include <iosfwd>
 #include <string>
+#include <utility>
 
 #ifdef ARROW_EXTRA_ERROR_CONTEXT
 #include <sstream>
@@ -100,7 +101,17 @@ class ARROW_EXPORT Status {
 
   // Copy the specified status.
   Status(const Status& s);
-  void operator=(const Status& s);
+  Status& operator=(const Status& s);
+
+  // Move the specified status.
+  Status(Status&& s);
+  Status& operator=(Status&& s);
+
+  // AND the statuses.
+  Status operator&(const Status& s) const;
+  Status operator&(Status&& s) const;
+  Status& operator&=(const Status& s);
+  Status& operator&=(Status&& s);
 
   // Return a success status.
   static Status OK() { return Status(); }
@@ -207,7 +218,8 @@ class ARROW_EXPORT Status {
   // a `State` structure containing the error code and message(s)
   State* state_;
 
-  void CopyFrom(const State* s);
+  void CopyFrom(const Status& s);
+  void MoveFrom(Status& s);
 };
 
 static inline std::ostream& operator<<(std::ostream& os, const Status& x) {
@@ -215,15 +227,59 @@ static inline std::ostream& operator<<(std::ostream& os, const Status& x) {
   return os;
 }
 
+inline void Status::MoveFrom(Status& s) {
+  delete state_;
+  state_ = s.state_;
+  s.state_ = NULL;
+}
+
 inline Status::Status(const Status& s)
     : state_((s.state_ == NULL) ? NULL : new State(*s.state_)) {}
 
-inline void Status::operator=(const Status& s) {
+inline Status& Status::operator=(const Status& s) {
   // The following condition catches both aliasing (when this == &s),
   // and the common case where both s and *this are ok.
   if (state_ != s.state_) {
-    CopyFrom(s.state_);
+    CopyFrom(s);
+  }
+  return *this;
+}
+
+inline Status::Status(Status&& s) : state_(s.state_) { s.state_ = NULL; }
+
+inline Status& Status::operator=(Status&& s) {
+  MoveFrom(s);
+  return *this;
+}
+
+inline Status Status::operator&(const Status& s) const {
+  if (ok()) {
+    return s;
+  } else {
+    return *this;
+  }
+}
+
+inline Status Status::operator&(Status&& s) const {
+  if (ok()) {
+    return std::move(s);
+  } else {
+    return *this;
+  }
+}
+
+inline Status& Status::operator&=(const Status& s) {
+  if (ok() && !s.ok()) {
+    CopyFrom(s);
+  }
+  return *this;
+}
+
+inline Status& Status::operator&=(Status&& s) {
+  if (ok() && !s.ok()) {
+    MoveFrom(s);
   }
+  return *this;
 }
 
 }  // namespace arrow
diff --git a/cpp/src/arrow/util/memory.h b/cpp/src/arrow/util/memory.h
index f391d27..56fadca 100644
--- a/cpp/src/arrow/util/memory.h
+++ b/cpp/src/arrow/util/memory.h
@@ -36,7 +36,7 @@ uint8_t* pointer_logical_and(const uint8_t* address, uintptr_t bits) {
 void parallel_memcopy(uint8_t* dst, const uint8_t* src, int64_t nbytes,
                       uintptr_t block_size, int num_threads) {
   // XXX This function is really using `num_threads + 1` threads.
-  auto pool = CPUThreadPool();
+  auto pool = GetCpuThreadPool();
 
   uint8_t* left = pointer_logical_and(src + block_size - 1, ~(block_size - 1));
   uint8_t* right = pointer_logical_and(src + nbytes, ~(block_size - 1));
diff --git a/cpp/src/arrow/util/parallel.h b/cpp/src/arrow/util/parallel.h
index 9fec000..156679e 100644
--- a/cpp/src/arrow/util/parallel.h
+++ b/cpp/src/arrow/util/parallel.h
@@ -24,9 +24,32 @@
 #include <vector>
 
 #include "arrow/status.h"
+#include "arrow/util/thread-pool.h"
 
 namespace arrow {
 
+// A parallelizer that takes a `Status(int)` function and calls it with
+// arguments between 0 and `num_tasks - 1`, on an arbitrary number of threads.
+
+template <class FUNCTION>
+Status ParallelFor(int num_tasks, FUNCTION&& func) {
+  auto pool = internal::GetCpuThreadPool();
+  std::vector<std::future<Status>> futures(num_tasks);
+
+  for (int i = 0; i < num_tasks; ++i) {
+    futures[i] = pool->Submit(func, i);
+  }
+  auto st = Status::OK();
+  for (auto& fut : futures) {
+    st &= fut.get();
+  }
+  return st;
+}
+
+// A variant of ParallelFor() with an explicit number of dedicated threads.
+// In most cases it's more appropriate to use the 2-argument ParallelFor (above),
+// or directly the global CPU thread pool (arrow/util/thread-pool.h).
+
 template <class FUNCTION>
 Status ParallelFor(int nthreads, int num_tasks, FUNCTION&& func) {
   std::vector<std::thread> thread_pool;
diff --git a/cpp/src/arrow/util/thread-pool-test.cc b/cpp/src/arrow/util/thread-pool-test.cc
index 42bbe6e..a70f016 100644
--- a/cpp/src/arrow/util/thread-pool-test.cc
+++ b/cpp/src/arrow/util/thread-pool-test.cc
@@ -127,7 +127,7 @@ class TestThreadPool : public ::testing::Test {
 
   std::shared_ptr<ThreadPool> MakeThreadPool() { return MakeThreadPool(4); }
 
-  std::shared_ptr<ThreadPool> MakeThreadPool(size_t threads) {
+  std::shared_ptr<ThreadPool> MakeThreadPool(int threads) {
     std::shared_ptr<ThreadPool> pool;
     Status st = ThreadPool::Make(threads, &pool);
     return pool;
@@ -163,7 +163,7 @@ class TestThreadPool : public ::testing::Test {
 
 TEST_F(TestThreadPool, ConstructDestruct) {
   // Stress shutdown-at-destruction logic
-  for (size_t threads : {1, 2, 3, 8, 32, 70}) {
+  for (int threads : {1, 2, 3, 8, 32, 70}) {
     auto pool = this->MakeThreadPool(threads);
   }
 }
@@ -223,21 +223,31 @@ TEST_F(TestThreadPool, QuickShutdown) {
 TEST_F(TestThreadPool, SetCapacity) {
   auto pool = this->MakeThreadPool(3);
   ASSERT_EQ(pool->GetCapacity(), 3);
+  ASSERT_EQ(pool->GetActualCapacity(), 3);
+
   ASSERT_OK(pool->SetCapacity(5));
   ASSERT_EQ(pool->GetCapacity(), 5);
+  ASSERT_EQ(pool->GetActualCapacity(), 5);
+
   ASSERT_OK(pool->SetCapacity(2));
-  // Wait for workers to wake up and secede
-  busy_wait(0.5, [&] { return pool->GetCapacity() == 2; });
   ASSERT_EQ(pool->GetCapacity(), 2);
+  // Wait for workers to wake up and secede
+  busy_wait(0.5, [&] { return pool->GetActualCapacity() == 2; });
+  ASSERT_EQ(pool->GetActualCapacity(), 2);
+
   ASSERT_OK(pool->SetCapacity(5));
   ASSERT_EQ(pool->GetCapacity(), 5);
+  ASSERT_EQ(pool->GetActualCapacity(), 5);
+
   // Downsize while tasks are pending
   for (int i = 0; i < 10; ++i) {
     ASSERT_OK(pool->Spawn(std::bind(sleep_for, 0.01 /* seconds */)));
   }
   ASSERT_OK(pool->SetCapacity(2));
-  busy_wait(0.5, [&] { return pool->GetCapacity() == 2; });
   ASSERT_EQ(pool->GetCapacity(), 2);
+  busy_wait(0.5, [&] { return pool->GetActualCapacity() == 2; });
+  ASSERT_EQ(pool->GetActualCapacity(), 2);
+
   // Ensure nothing got stuck
   ASSERT_OK(pool->Shutdown());
 }
@@ -274,14 +284,16 @@ TEST_F(TestThreadPool, Submit) {
 
 TEST(TestGlobalThreadPool, Capacity) {
   // Sanity check
-  auto pool = CPUThreadPool();
-  size_t capacity = pool->GetCapacity();
+  auto pool = GetCpuThreadPool();
+  int capacity = pool->GetCapacity();
   ASSERT_GT(capacity, 0);
+  ASSERT_EQ(pool->GetActualCapacity(), capacity);
+  ASSERT_EQ(GetCpuThreadPoolCapacity(), capacity);
 
   // Exercise default capacity heuristic
   ASSERT_OK(DelEnvVar("OMP_NUM_THREADS"));
   ASSERT_OK(DelEnvVar("OMP_THREAD_LIMIT"));
-  size_t hw_capacity = std::thread::hardware_concurrency();
+  int hw_capacity = std::thread::hardware_concurrency();
   ASSERT_EQ(ThreadPool::DefaultCapacity(), hw_capacity);
   ASSERT_OK(SetEnvVar("OMP_NUM_THREADS", "13"));
   ASSERT_EQ(ThreadPool::DefaultCapacity(), 13);
diff --git a/cpp/src/arrow/util/thread-pool.cc b/cpp/src/arrow/util/thread-pool.cc
index 0e264f3..997ff5d 100644
--- a/cpp/src/arrow/util/thread-pool.cc
+++ b/cpp/src/arrow/util/thread-pool.cc
@@ -25,14 +25,39 @@
 namespace arrow {
 namespace internal {
 
+struct ThreadPool::State {
+  State() : desired_capacity_(0), please_shutdown_(false), quick_shutdown_(false) {}
+
+  std::mutex mutex_;
+  std::condition_variable cv_;
+  std::condition_variable cv_shutdown_;
+
+  std::list<std::thread> workers_;
+  // Trashcan for finished threads
+  std::vector<std::thread> finished_workers_;
+  std::deque<std::function<void()>> pending_tasks_;
+
+  // Desired number of threads
+  int desired_capacity_;
+  // Are we shutting down?
+  bool please_shutdown_;
+  bool quick_shutdown_;
+};
+
 ThreadPool::ThreadPool()
-    : desired_capacity_(0), please_shutdown_(false), quick_shutdown_(false) {}
+    : sp_state_(std::make_shared<ThreadPool::State>()),
+      state_(sp_state_.get()),
+      shutdown_on_destroy_(true) {}
 
-ThreadPool::~ThreadPool() { ARROW_UNUSED(Shutdown(false /* wait */)); }
+ThreadPool::~ThreadPool() {
+  if (shutdown_on_destroy_) {
+    ARROW_UNUSED(Shutdown(false /* wait */));
+  }
+}
 
-Status ThreadPool::SetCapacity(size_t threads) {
-  std::unique_lock<std::mutex> lock(mutex_);
-  if (please_shutdown_) {
+Status ThreadPool::SetCapacity(int threads) {
+  std::unique_lock<std::mutex> lock(state_->mutex_);
+  if (state_->please_shutdown_) {
     return Status::Invalid("operation forbidden during or after shutdown");
   }
   if (threads <= 0) {
@@ -40,91 +65,103 @@ Status ThreadPool::SetCapacity(size_t threads) {
   }
   CollectFinishedWorkersUnlocked();
 
-  desired_capacity_ = threads;
-  int64_t diff = desired_capacity_ - workers_.size();
+  state_->desired_capacity_ = threads;
+  int diff = static_cast<int>(threads - state_->workers_.size());
   if (diff > 0) {
-    LaunchWorkersUnlocked(static_cast<size_t>(diff));
+    LaunchWorkersUnlocked(diff);
   } else if (diff < 0) {
     // Wake threads to ask them to stop
-    cv_.notify_all();
+    state_->cv_.notify_all();
   }
   return Status::OK();
 }
 
-size_t ThreadPool::GetCapacity() {
-  std::unique_lock<std::mutex> lock(mutex_);
-  return workers_.size();
+int ThreadPool::GetCapacity() {
+  std::unique_lock<std::mutex> lock(state_->mutex_);
+  return state_->desired_capacity_;
+}
+
+int ThreadPool::GetActualCapacity() {
+  std::unique_lock<std::mutex> lock(state_->mutex_);
+  return static_cast<int>(state_->workers_.size());
 }
 
 Status ThreadPool::Shutdown(bool wait) {
-  std::unique_lock<std::mutex> lock(mutex_);
+  std::unique_lock<std::mutex> lock(state_->mutex_);
 
-  if (please_shutdown_) {
+  if (state_->please_shutdown_) {
     return Status::Invalid("Shutdown() already called");
   }
-  please_shutdown_ = true;
-  quick_shutdown_ = !wait;
-  cv_.notify_all();
-  cv_shutdown_.wait(lock, [this] { return workers_.empty(); });
-  if (!quick_shutdown_) {
-    DCHECK_EQ(pending_tasks_.size(), 0);
+  state_->please_shutdown_ = true;
+  state_->quick_shutdown_ = !wait;
+  state_->cv_.notify_all();
+  state_->cv_shutdown_.wait(lock, [this] { return state_->workers_.empty(); });
+  if (!state_->quick_shutdown_) {
+    DCHECK_EQ(state_->pending_tasks_.size(), 0);
   } else {
-    pending_tasks_.clear();
+    state_->pending_tasks_.clear();
   }
   CollectFinishedWorkersUnlocked();
   return Status::OK();
 }
 
 void ThreadPool::CollectFinishedWorkersUnlocked() {
-  for (auto& thread : finished_workers_) {
+  for (auto& thread : state_->finished_workers_) {
     // Make sure OS thread has exited
     thread.join();
   }
-  finished_workers_.clear();
+  state_->finished_workers_.clear();
 }
 
-void ThreadPool::LaunchWorkersUnlocked(size_t threads) {
-  for (size_t i = 0; i < threads; i++) {
-    workers_.emplace_back();
-    auto it = --workers_.end();
-    *it = std::thread([this, it] { WorkerLoop(it); });
+void ThreadPool::LaunchWorkersUnlocked(int threads) {
+  std::shared_ptr<State> state = sp_state_;
+
+  for (int i = 0; i < threads; i++) {
+    state_->workers_.emplace_back();
+    auto it = --(state_->workers_.end());
+    *it = std::thread([state, it] { WorkerLoop(state, it); });
   }
 }
 
-void ThreadPool::WorkerLoop(std::list<std::thread>::iterator it) {
-  std::unique_lock<std::mutex> lock(mutex_);
+void ThreadPool::WorkerLoop(std::shared_ptr<State> state,
+                            std::list<std::thread>::iterator it) {
+  std::unique_lock<std::mutex> lock(state->mutex_);
 
   // Since we hold the lock, `it` now points to the correct thread object
   // (LaunchWorkersUnlocked has exited)
   DCHECK_EQ(std::this_thread::get_id(), it->get_id());
 
+  // If too many threads, we should secede from the pool
+  const auto should_secede = [&]() -> bool {
+    return state->workers_.size() > static_cast<size_t>(state->desired_capacity_);
+  };
+
   while (true) {
     // By the time this thread is started, some tasks may have been pushed
     // or shutdown could even have been requested.  So we only wait on the
     // condition variable at the end of the loop.
 
     // Execute pending tasks if any
-    while (!pending_tasks_.empty() && !quick_shutdown_) {
-      // If too many threads, secede from the pool.
+    while (!state->pending_tasks_.empty() && !state->quick_shutdown_) {
       // We check this opportunistically at each loop iteration since
       // it releases the lock below.
-      if (workers_.size() > desired_capacity_) {
+      if (should_secede()) {
         break;
       }
       {
-        std::function<void()> task = std::move(pending_tasks_.front());
-        pending_tasks_.pop_front();
+        std::function<void()> task = std::move(state->pending_tasks_.front());
+        state->pending_tasks_.pop_front();
         lock.unlock();
         task();
       }
       lock.lock();
     }
     // Now either the queue is empty *or* a quick shutdown was requested
-    if (please_shutdown_ || workers_.size() > desired_capacity_) {
+    if (state->please_shutdown_ || should_secede()) {
       break;
     }
     // Wait for next wakeup
-    cv_.wait(lock);
+    state->cv_.wait(lock);
   }
 
   // We're done.  Move our thread object to the trashcan of finished
@@ -135,28 +172,28 @@ void ThreadPool::WorkerLoop(std::list<std::thread>::iterator it) {
   //    are exited before the ThreadPool is destroyed.  Otherwise subtle
   //    timing conditions can lead to false positives with Valgrind.
   DCHECK_EQ(std::this_thread::get_id(), it->get_id());
-  finished_workers_.push_back(std::move(*it));
-  workers_.erase(it);
-  if (please_shutdown_) {
+  state->finished_workers_.push_back(std::move(*it));
+  state->workers_.erase(it);
+  if (state->please_shutdown_) {
     // Notify the function waiting in Shutdown().
-    cv_shutdown_.notify_one();
+    state->cv_shutdown_.notify_one();
   }
 }
 
 Status ThreadPool::SpawnReal(std::function<void()> task) {
   {
-    std::lock_guard<std::mutex> lock(mutex_);
-    if (please_shutdown_) {
+    std::lock_guard<std::mutex> lock(state_->mutex_);
+    if (state_->please_shutdown_) {
       return Status::Invalid("operation forbidden during or after shutdown");
     }
     CollectFinishedWorkersUnlocked();
-    pending_tasks_.push_back(std::move(task));
+    state_->pending_tasks_.push_back(std::move(task));
   }
-  cv_.notify_one();
+  state_->cv_.notify_one();
   return Status::OK();
 }
 
-Status ThreadPool::Make(size_t threads, std::shared_ptr<ThreadPool>* out) {
+Status ThreadPool::Make(int threads, std::shared_ptr<ThreadPool>* out) {
   auto pool = std::shared_ptr<ThreadPool>(new ThreadPool());
   RETURN_NOT_OK(pool->SetCapacity(threads));
   *out = std::move(pool);
@@ -166,7 +203,7 @@ Status ThreadPool::Make(size_t threads, std::shared_ptr<ThreadPool>* out) {
 // ----------------------------------------------------------------------
 // Global thread pool
 
-static size_t ParseOMPEnvVar(const char* name) {
+static int ParseOMPEnvVar(const char* name) {
   // OMP_NUM_THREADS is a comma-separated list of positive integers.
   // We are only interested in the first (top-level) number.
   std::string str;
@@ -178,14 +215,14 @@ static size_t ParseOMPEnvVar(const char* name) {
     str = str.substr(0, first_comma);
   }
   try {
-    return static_cast<size_t>(std::max(0LL, std::stoll(str)));
+    return std::max(0, std::stoi(str));
   } catch (...) {
     return 0;
   }
 }
 
-size_t ThreadPool::DefaultCapacity() {
-  size_t capacity, limit;
+int ThreadPool::DefaultCapacity() {
+  int capacity, limit;
   capacity = ParseOMPEnvVar("OMP_NUM_THREADS");
   if (capacity == 0) {
     capacity = std::thread::hardware_concurrency();
@@ -203,21 +240,30 @@ size_t ThreadPool::DefaultCapacity() {
 }
 
 // Helper for the singleton pattern
-static std::shared_ptr<ThreadPool> MakePoolWithDefaultCapacity() {
+std::shared_ptr<ThreadPool> ThreadPool::MakeCpuThreadPool() {
   std::shared_ptr<ThreadPool> pool;
   DCHECK_OK(ThreadPool::Make(ThreadPool::DefaultCapacity(), &pool));
+  // On Windows, the global ThreadPool destructor may be called after
+  // non-main threads have been killed by the OS, and hang in a condition
+  // variable.
+  // On Unix, we want to avoid leak reports by Valgrind.
+#ifdef _WIN32
+  pool->shutdown_on_destroy_ = false;
+#endif
   return pool;
 }
 
-ThreadPool* CPUThreadPool() {
-  static std::shared_ptr<ThreadPool> singleton = MakePoolWithDefaultCapacity();
+ThreadPool* GetCpuThreadPool() {
+  static std::shared_ptr<ThreadPool> singleton = ThreadPool::MakeCpuThreadPool();
   return singleton.get();
 }
 
 }  // namespace internal
 
-Status SetCPUThreadPoolCapacity(size_t threads) {
-  return internal::CPUThreadPool()->SetCapacity(threads);
+int GetCpuThreadPoolCapacity() { return internal::GetCpuThreadPool()->GetCapacity(); }
+
+Status SetCpuThreadPoolCapacity(int threads) {
+  return internal::GetCpuThreadPool()->SetCapacity(threads);
 }
 
 }  // namespace arrow
diff --git a/cpp/src/arrow/util/thread-pool.h b/cpp/src/arrow/util/thread-pool.h
index d426cbb..c1b10f7 100644
--- a/cpp/src/arrow/util/thread-pool.h
+++ b/cpp/src/arrow/util/thread-pool.h
@@ -36,9 +36,14 @@
 
 namespace arrow {
 
+// Get the number of worker threads used by the process-global thread pool
+// for CPU-bound tasks.  This is an idealized number, the actual number
+// may lag a bit.
+ARROW_EXPORT int GetCpuThreadPoolCapacity();
+
 // Set the number of worker threads used by the process-global thread pool
 // for CPU-bound tasks.
-ARROW_EXPORT Status SetCPUThreadPoolCapacity(size_t threads);
+ARROW_EXPORT Status SetCpuThreadPoolCapacity(int threads);
 
 namespace internal {
 
@@ -59,22 +64,27 @@ struct packaged_task_wrapper {
 
 }  // namespace detail
 
-class ThreadPool {
+class ARROW_EXPORT ThreadPool {
  public:
   // Construct a thread pool with the given number of worker threads
-  static Status Make(size_t threads, std::shared_ptr<ThreadPool>* out);
+  static Status Make(int threads, std::shared_ptr<ThreadPool>* out);
 
   // Destroy thread pool; the pool will first be shut down
   ~ThreadPool();
 
+  // Return the desired number of worker threads.
+  // The actual number of workers may lag a bit before being adjusted to
+  // match this value.
+  int GetCapacity();
+
   // Dynamically change the number of worker threads.
   // This function returns quickly, but it may take more time before the
   // thread count is fully adjusted.
-  Status SetCapacity(size_t threads);
+  Status SetCapacity(int threads);
 
   // Heuristic for the default capacity of a thread pool for CPU-bound tasks.
   // This is exposed as a static method to help with testing.
-  static size_t DefaultCapacity();
+  static int DefaultCapacity();
 
   // Shutdown the pool.  Once the pool starts shutting down, new tasks
   // cannot be submitted anymore.
@@ -106,6 +116,7 @@ class ThreadPool {
 
     Status st = SpawnReal(detail::packaged_task_wrapper<Result>(std::move(task)));
     if (!st.ok()) {
+      // This happens when Submit() is called after Shutdown()
       throw std::runtime_error(st.ToString());
     }
     return fut;
@@ -114,6 +125,9 @@ class ThreadPool {
  protected:
   FRIEND_TEST(TestThreadPool, SetCapacity);
   FRIEND_TEST(TestGlobalThreadPool, Capacity);
+  friend ARROW_EXPORT ThreadPool* GetCpuThreadPool();
+
+  struct State;
 
   ThreadPool();
 
@@ -123,28 +137,24 @@ class ThreadPool {
   // Collect finished worker threads, making sure the OS threads have exited
   void CollectFinishedWorkersUnlocked();
   // Launch a given number of additional workers
-  void LaunchWorkersUnlocked(size_t threads);
-  void WorkerLoop(std::list<std::thread>::iterator it);
-  size_t GetCapacity();
-
-  std::mutex mutex_;
-  std::condition_variable cv_;
-  std::condition_variable cv_shutdown_;
-
-  std::list<std::thread> workers_;
-  // Trashcan for finished threads
-  std::vector<std::thread> finished_workers_;
-  std::deque<std::function<void()>> pending_tasks_;
-
-  // Desired number of threads
-  size_t desired_capacity_;
-  // Are we shutting down?
-  bool please_shutdown_;
-  bool quick_shutdown_;
+  void LaunchWorkersUnlocked(int threads);
+  // Get the current actual capacity
+  int GetActualCapacity();
+
+  // The worker loop is a static method so that it can keep running
+  // after the ThreadPool is destroyed
+  static void WorkerLoop(std::shared_ptr<State> state,
+                         std::list<std::thread>::iterator it);
+
+  static std::shared_ptr<ThreadPool> MakeCpuThreadPool();
+
+  const std::shared_ptr<State> sp_state_;
+  State* const state_;
+  bool shutdown_on_destroy_;
 };
 
 // Return the process-global thread pool for CPU-bound tasks.
-ThreadPool* CPUThreadPool();
+ARROW_EXPORT ThreadPool* GetCpuThreadPool();
 
 }  // namespace internal
 }  // namespace arrow
diff --git a/cpp/src/plasma/client.cc b/cpp/src/plasma/client.cc
index 43e27e0..30d2e43 100644
--- a/cpp/src/plasma/client.cc
+++ b/cpp/src/plasma/client.cc
@@ -698,7 +698,7 @@ bool PlasmaClient::Impl::compute_object_hash_parallel(XXH64_state_t* hash_state,
                                                       int64_t nbytes) {
   // Note that this function will likely be faster if the address of data is
   // aligned on a 64-byte boundary.
-  auto pool = arrow::internal::CPUThreadPool();
+  auto pool = arrow::internal::GetCpuThreadPool();
 
   const int num_threads = kHashingConcurrency;
   uint64_t threadhash[num_threads + 1];
diff --git a/python/pyarrow/_config.pyx b/python/pyarrow/_config.pyx
deleted file mode 100644
index bc9f36d..0000000
--- a/python/pyarrow/_config.pyx
+++ /dev/null
@@ -1,57 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-# cython: profile=False
-# distutils: language = c++
-# cython: embedsignature = True
-
-import numpy as np
-import multiprocessing
-import os
-
-cdef extern from 'arrow/python/init.h':
-    int arrow_init_numpy() except -1
-
-cdef extern from 'arrow/python/config.h' namespace 'arrow::py':
-    void set_numpy_nan(object o)
-
-arrow_init_numpy()
-
-set_numpy_nan(np.nan)
-
-cdef int CPU_COUNT = int(
-    os.environ.get('OMP_NUM_THREADS',
-                   max(multiprocessing.cpu_count() // 2, 1)))
-
-
-def cpu_count():
-    """
-    Returns
-    -------
-    count : Number of CPUs to use by default in parallel operations. Default is
-      max(1, multiprocessing.cpu_count() / 2), but can be overridden by the
-      OMP_NUM_THREADS environment variable. For the default, we divide the CPU
-      count by 2 because most modern computers have hyperthreading turned on,
-      so doubling the CPU count beyond the number of physical cores does not
-      help.
-    """
-    return CPU_COUNT
-
-
-def set_cpu_count(count):
-    global CPU_COUNT
-    CPU_COUNT = max(int(count), 1)
diff --git a/python/pyarrow/array.pxi b/python/pyarrow/array.pxi
index 94c37b3..3f84f17 100644
--- a/python/pyarrow/array.pxi
+++ b/python/pyarrow/array.pxi
@@ -602,7 +602,8 @@ cdef class Array:
         options = PandasOptions(
             strings_to_categorical=strings_to_categorical,
             zero_copy_only=zero_copy_only,
-            integer_object_nulls=integer_object_nulls)
+            integer_object_nulls=integer_object_nulls,
+            use_threads=False)
         with nogil:
             check_status(ConvertArrayToPandas(options, self.sp_array,
                                               self, &out))
diff --git a/python/pyarrow/feather.py b/python/pyarrow/feather.py
index c2157ab..26f22da 100644
--- a/python/pyarrow/feather.py
+++ b/python/pyarrow/feather.py
@@ -26,6 +26,8 @@ from pyarrow.compat import pdapi
 from pyarrow.lib import FeatherError  # noqa
 from pyarrow.lib import RecordBatch, Table, concat_tables
 import pyarrow.lib as ext
+from .util import _deprecate_nthreads
+
 
 try:
     infer_dtype = pdapi.infer_dtype
@@ -45,7 +47,7 @@ class FeatherReader(ext.FeatherReader):
 
     def read(self, *args, **kwargs):
         warnings.warn("read has been deprecated. Use read_pandas instead.",
-                      DeprecationWarning)
+                      FutureWarning, stacklevel=2)
         return self.read_pandas(*args, **kwargs)
 
     def read_table(self, columns=None):
@@ -66,8 +68,10 @@ class FeatherReader(ext.FeatherReader):
         table = Table.from_arrays(columns, names=names)
         return table
 
-    def read_pandas(self, columns=None, nthreads=1):
-        return self.read_table(columns=columns).to_pandas(nthreads=nthreads)
+    def read_pandas(self, columns=None, nthreads=None, use_threads=False):
+        use_threads = _deprecate_nthreads(use_threads, nthreads)
+        return self.read_table(columns=columns).to_pandas(
+            use_threads=use_threads)
 
 
 class FeatherWriter(object):
@@ -141,7 +145,7 @@ class FeatherDataset(object):
                              .format(piece, self.schema,
                                      table.schema))
 
-    def read_pandas(self, columns=None, nthreads=1):
+    def read_pandas(self, columns=None, nthreads=None, use_threads=False):
         """
         Read multiple Parquet files as a single pandas DataFrame
 
@@ -157,7 +161,9 @@ class FeatherDataset(object):
         pandas.DataFrame
             Content of the file as a pandas DataFrame (of columns)
         """
-        return self.read_table(columns=columns).to_pandas(nthreads=nthreads)
+        use_threads = _deprecate_nthreads(use_threads, nthreads)
+        return self.read_table(columns=columns).to_pandas(
+            use_threads=use_threads)
 
 
 def write_feather(df, dest):
@@ -186,7 +192,7 @@ def write_feather(df, dest):
         raise
 
 
-def read_feather(source, columns=None, nthreads=1):
+def read_feather(source, columns=None, nthreads=None, use_threads=False):
     """
     Read a pandas.DataFrame from Feather format
 
@@ -196,15 +202,16 @@ def read_feather(source, columns=None, nthreads=1):
     columns : sequence, optional
         Only read a specific set of columns. If not provided, all columns are
         read
-    nthreads : int, default 1
-        Number of CPU threads to use when reading to pandas.DataFrame
+    use_threads: bool, default False
+        Whether to parallelize reading using multiple threads
 
     Returns
     -------
     df : pandas.DataFrame
     """
+    use_threads = _deprecate_nthreads(use_threads, nthreads)
     reader = FeatherReader(source)
-    return reader.read_pandas(columns=columns, nthreads=nthreads)
+    return reader.read_pandas(columns=columns, use_threads=use_threads)
 
 
 def read_table(source, columns=None):
diff --git a/python/pyarrow/includes/libarrow.pxd b/python/pyarrow/includes/libarrow.pxd
index 9a0b468..5b0c211 100644
--- a/python/pyarrow/includes/libarrow.pxd
+++ b/python/pyarrow/includes/libarrow.pxd
@@ -959,7 +959,7 @@ cdef extern from "arrow/python/api.h" namespace "arrow::py" nogil:
         PandasOptions options,
         const unordered_set[c_string]& categorical_columns,
         const shared_ptr[CTable]& table,
-        int nthreads, CMemoryPool* pool,
+        CMemoryPool* pool,
         PyObject** out)
 
     void c_set_default_memory_pool \
@@ -987,6 +987,7 @@ cdef extern from "arrow/python/api.h" namespace "arrow::py" nogil:
         c_bool strings_to_categorical
         c_bool zero_copy_only
         c_bool integer_object_nulls
+        c_bool use_threads
 
 cdef extern from "arrow/python/api.h" namespace 'arrow::py' nogil:
 
@@ -1045,3 +1046,8 @@ cdef extern from 'arrow/util/compression.h' namespace 'arrow' nogil:
                          int64_t* output_length)
 
         int64_t MaxCompressedLen(int64_t input_len, const uint8_t* input)
+
+
+cdef extern from 'arrow/util/thread-pool.h' namespace 'arrow' nogil:
+    int GetCpuThreadPoolCapacity()
+    CStatus SetCpuThreadPoolCapacity(int threads)
diff --git a/python/pyarrow/ipc.py b/python/pyarrow/ipc.py
index 4081fc5..bed2dd6 100644
--- a/python/pyarrow/ipc.py
+++ b/python/pyarrow/ipc.py
@@ -24,6 +24,7 @@ from pyarrow.lib import (Message, MessageReader,  # noqa
                          read_tensor, write_tensor,
                          get_record_batch_size, get_tensor_size)
 import pyarrow.lib as lib
+from .util import _deprecate_nthreads
 
 
 class _ReadPandasOption(object):
@@ -168,22 +169,23 @@ def serialize_pandas(df, nthreads=None, preserve_index=True):
     return sink.get_result()
 
 
-def deserialize_pandas(buf, nthreads=None):
+def deserialize_pandas(buf, nthreads=None, use_threads=False):
     """Deserialize a buffer protocol compatible object into a pandas DataFrame.
 
     Parameters
     ----------
     buf : buffer
         An object compatible with the buffer protocol
-    nthreads : int, defualt None
-        The number of threads to use to convert the buffer to a DataFrame,
-        default all CPUs
+    use_threads: boolean, default False
+        Whether to parallelize the conversion using multiple threads
 
     Returns
     -------
     df : pandas.DataFrame
     """
+    use_threads = _deprecate_nthreads(use_threads, nthreads)
+
     buffer_reader = pa.BufferReader(buf)
     reader = pa.RecordBatchStreamReader(buffer_reader)
     table = reader.read_all()
-    return table.to_pandas(nthreads=nthreads)
+    return table.to_pandas(use_threads=use_threads)
diff --git a/python/pyarrow/lib.pyx b/python/pyarrow/lib.pyx
index 4c22fc4..9c661db 100644
--- a/python/pyarrow/lib.pyx
+++ b/python/pyarrow/lib.pyx
@@ -36,28 +36,26 @@ cimport cpython as cp
 arrow_init_numpy()
 set_numpy_nan(np.nan)
 
-cdef int CPU_COUNT = int(
-    os.environ.get('OMP_NUM_THREADS',
-                   max(multiprocessing.cpu_count() // 2, 1)))
-
 
 def cpu_count():
     """
-    Returns
-    -------
-    count : Number of CPUs to use by default in parallel operations. Default is
-      max(1, multiprocessing.cpu_count() / 2), but can be overridden by the
-      OMP_NUM_THREADS environment variable. For the default, we divide the CPU
-      count by 2 because most modern computers have hyperthreading turned on,
-      so doubling the CPU count beyond the number of physical cores does not
-      help.
+    Return the number of threads to use in parallel operations.
+
+    The number of threads is determined at startup by inspecting the
+    OMP_NUM_THREADS and OMP_THREAD_LIMIT environment variables.  If neither
+    is present, it will default to the number of hardware threads on the
+    system.  It can be modified at runtime by calling set_cpu_count().
     """
-    return CPU_COUNT
+    return GetCpuThreadPoolCapacity()
 
 
-def set_cpu_count(count):
-    global CPU_COUNT
-    CPU_COUNT = max(int(count), 1)
+def set_cpu_count(int count):
+    """
+    Set the number of threads to use in parallel operations.
+    """
+    if count < 1:
+        raise ValueError("CPU count must be strictly positive")
+    check_status(SetCpuThreadPoolCapacity(count))
 
 
 Type_NA = _Type_NA
diff --git a/python/pyarrow/pandas_compat.py b/python/pyarrow/pandas_compat.py
index c288c7f..18d8ed3 100644
--- a/python/pyarrow/pandas_compat.py
+++ b/python/pyarrow/pandas_compat.py
@@ -493,8 +493,7 @@ def _make_datetimetz(tz):
 # Converting pyarrow.Table efficiently to pandas.DataFrame
 
 
-def table_to_blockmanager(options, table, memory_pool, nthreads=1,
-                          categories=None):
+def table_to_blockmanager(options, table, memory_pool, categories=None):
     from pyarrow.compat import DatetimeTZDtype
 
     index_columns = []
@@ -568,8 +567,7 @@ def table_to_blockmanager(options, table, memory_pool, nthreads=1,
                 block_table.schema.get_field_index(raw_name)
             )
 
-    blocks = _table_to_blocks(options, block_table, nthreads, memory_pool,
-                              categories)
+    blocks = _table_to_blocks(options, block_table, memory_pool, categories)
 
     # Construct the row index
     if len(index_arrays) > 1:
@@ -728,12 +726,12 @@ def _reconstruct_columns_from_metadata(columns, column_indexes):
     return pd.MultiIndex(levels=new_levels, labels=labels, names=columns.names)
 
 
-def _table_to_blocks(options, block_table, nthreads, memory_pool, categories):
+def _table_to_blocks(options, block_table, memory_pool, categories):
     # Part of table_to_blockmanager
 
     # Convert an arrow table to Block from the internal pandas API
-    result = pa.lib.table_to_blocks(options, block_table, nthreads,
-                                    memory_pool, categories)
+    result = pa.lib.table_to_blocks(options, block_table, memory_pool,
+                                    categories)
 
     # Defined above
     return [_reconstruct_block(item) for item in result]
diff --git a/python/pyarrow/table.pxi b/python/pyarrow/table.pxi
index c867657..4edf5af 100644
--- a/python/pyarrow/table.pxi
+++ b/python/pyarrow/table.pxi
@@ -27,6 +27,8 @@ except ImportError:
 else:
     import pyarrow.pandas_compat as pdcompat
 
+from .util import _deprecate_nthreads
+
 
 cdef class ChunkedArray:
     """
@@ -365,7 +367,8 @@ cdef class Column:
         options = PandasOptions(
             strings_to_categorical=strings_to_categorical,
             zero_copy_only=zero_copy_only,
-            integer_object_nulls=integer_object_nulls)
+            integer_object_nulls=integer_object_nulls,
+            use_threads=False)
 
         with nogil:
             check_status(libarrow.ConvertColumnToPandas(options,
@@ -746,7 +749,7 @@ cdef class RecordBatch:
             entries.append((name, column))
         return OrderedDict(entries)
 
-    def to_pandas(self, nthreads=None):
+    def to_pandas(self, nthreads=None, use_threads=False):
         """
         Convert the arrow::RecordBatch to a pandas DataFrame
 
@@ -754,7 +757,8 @@ cdef class RecordBatch:
         -------
         pandas.DataFrame
         """
-        return Table.from_batches([self]).to_pandas(nthreads=nthreads)
+        use_threads = _deprecate_nthreads(use_threads, nthreads)
+        return Table.from_batches([self]).to_pandas(use_threads=use_threads)
 
     @classmethod
     def from_pandas(cls, df, Schema schema=None, bint preserve_index=True,
@@ -825,7 +829,7 @@ cdef class RecordBatch:
             CRecordBatch.Make(schema, num_rows, c_arrays))
 
 
-def table_to_blocks(PandasOptions options, Table table, int nthreads,
+def table_to_blocks(PandasOptions options, Table table,
                     MemoryPool memory_pool, categories):
     cdef:
         PyObject* result_obj
@@ -840,9 +844,7 @@ def table_to_blocks(PandasOptions options, Table table, int nthreads,
     with nogil:
         check_status(
             libarrow.ConvertTableToPandas(
-                options, categorical_columns, c_table, nthreads, pool,
-                &result_obj
-            )
+                options, categorical_columns, c_table, pool, &result_obj)
         )
 
     return PyObject_to_object(result_obj)
@@ -1148,16 +1150,12 @@ cdef class Table:
 
     def to_pandas(self, nthreads=None, strings_to_categorical=False,
                   memory_pool=None, zero_copy_only=False, categories=None,
-                  integer_object_nulls=False):
+                  integer_object_nulls=False, use_threads=False):
         """
         Convert the arrow::Table to a pandas DataFrame
 
         Parameters
         ----------
-        nthreads : int, default max(1, multiprocessing.cpu_count() / 2)
-            For the default, we divide the CPU count by 2 because most modern
-            computers have hyperthreading turned on, so doubling the CPU count
-            beyond the number of physical cores does not help
         strings_to_categorical : boolean, default False
             Encode string (UTF8) and binary types to pandas.Categorical
         memory_pool: MemoryPool, optional
@@ -1169,6 +1167,8 @@ cdef class Table:
             List of columns that should be returned as pandas.Categorical
         integer_object_nulls : boolean, default False
             Cast integers with nulls to objects
+        use_threads: boolean, default False
+            Whether to parallelize the conversion using multiple threads
 
         Returns
         -------
@@ -1177,15 +1177,16 @@ cdef class Table:
         cdef:
             PandasOptions options
 
+        use_threads = _deprecate_nthreads(use_threads, nthreads)
+
         options = PandasOptions(
             strings_to_categorical=strings_to_categorical,
             zero_copy_only=zero_copy_only,
-            integer_object_nulls=integer_object_nulls)
+            integer_object_nulls=integer_object_nulls,
+            use_threads=use_threads)
         self._check_nullptr()
-        if nthreads is None:
-            nthreads = cpu_count()
         mgr = pdcompat.table_to_blockmanager(options, self, memory_pool,
-                                             nthreads, categories)
+                                             categories)
         return pd.DataFrame(mgr)
 
     def to_pydict(self):
diff --git a/python/pyarrow/tests/test_convert_pandas.py b/python/pyarrow/tests/test_convert_pandas.py
index bdb84c7..6bdbcca 100644
--- a/python/pyarrow/tests/test_convert_pandas.py
+++ b/python/pyarrow/tests/test_convert_pandas.py
@@ -60,7 +60,7 @@ def _alltypes_example(size=100):
     })
 
 
-def _check_pandas_roundtrip(df, expected=None, nthreads=1,
+def _check_pandas_roundtrip(df, expected=None, use_threads=False,
                             expected_schema=None,
                             check_dtype=True, schema=None,
                             preserve_index=False,
@@ -68,9 +68,9 @@ def _check_pandas_roundtrip(df, expected=None, nthreads=1,
     klass = pa.RecordBatch if as_batch else pa.Table
     table = klass.from_pandas(df, schema=schema,
                               preserve_index=preserve_index,
-                              nthreads=nthreads)
+                              nthreads=2 if use_threads else 1)
 
-    result = table.to_pandas(nthreads=nthreads)
+    result = table.to_pandas(use_threads=use_threads)
     if expected_schema:
         assert table.schema.equals(expected_schema)
     if expected is None:
@@ -1836,8 +1836,8 @@ class TestConvertMisc(object):
 
     def test_threaded_conversion(self):
         df = _alltypes_example()
-        _check_pandas_roundtrip(df, nthreads=2)
-        _check_pandas_roundtrip(df, nthreads=2, as_batch=True)
+        _check_pandas_roundtrip(df, use_threads=True)
+        _check_pandas_roundtrip(df, use_threads=True, as_batch=True)
 
     def test_category(self):
         repeats = 5
diff --git a/python/pyarrow/tests/test_feather.py b/python/pyarrow/tests/test_feather.py
index 46e4e5f..1fcc608 100644
--- a/python/pyarrow/tests/test_feather.py
+++ b/python/pyarrow/tests/test_feather.py
@@ -65,7 +65,7 @@ class TestFeatherReader(unittest.TestCase):
 
     def _check_pandas_roundtrip(self, df, expected=None, path=None,
                                 columns=None, null_counts=None,
-                                nthreads=1):
+                                use_threads=False):
         if path is None:
             path = random_path()
 
@@ -74,7 +74,7 @@ class TestFeatherReader(unittest.TestCase):
         if not os.path.exists(path):
             raise Exception('file not written')
 
-        result = read_feather(path, columns, nthreads=nthreads)
+        result = read_feather(path, columns, use_threads=use_threads)
         if expected is None:
             expected = df
 
@@ -384,7 +384,7 @@ class TestFeatherReader(unittest.TestCase):
         data = {'c{0}'.format(i): [''] * 10
                 for i in range(100)}
         df = pd.DataFrame(data)
-        self._check_pandas_roundtrip(df, nthreads=4)
+        self._check_pandas_roundtrip(df, use_threads=True)
 
     def test_nan_as_null(self):
         # Create a nan that is not numpy.nan
diff --git a/python/pyarrow/tests/test_ipc.py b/python/pyarrow/tests/test_ipc.py
index c31c322..07de19f 100644
--- a/python/pyarrow/tests/test_ipc.py
+++ b/python/pyarrow/tests/test_ipc.py
@@ -406,9 +406,9 @@ def test_get_record_batch_size():
     assert pa.get_record_batch_size(batch) > (N * itemsize)
 
 
-def _check_serialize_pandas_round_trip(df, nthreads=1):
-    buf = pa.serialize_pandas(df, nthreads=nthreads)
-    result = pa.deserialize_pandas(buf, nthreads=nthreads)
+def _check_serialize_pandas_round_trip(df, use_threads=False):
+    buf = pa.serialize_pandas(df, nthreads=2 if use_threads else 1)
+    result = pa.deserialize_pandas(buf, use_threads=use_threads)
     assert_frame_equal(result, df)
 
 
@@ -429,7 +429,7 @@ def test_pandas_serialize_round_trip_nthreads():
         {'foo': [1.5, 1.6, 1.7], 'bar': list('abc')},
         index=index, columns=columns
     )
-    _check_serialize_pandas_round_trip(df, nthreads=2)
+    _check_serialize_pandas_round_trip(df, use_threads=True)
 
 
 def test_pandas_serialize_round_trip_multi_index():
diff --git a/python/pyarrow/tests/test_misc.py b/python/pyarrow/tests/test_misc.py
index 55787f1..28553a0 100644
--- a/python/pyarrow/tests/test_misc.py
+++ b/python/pyarrow/tests/test_misc.py
@@ -23,3 +23,13 @@ import pyarrow as pa
 def test_get_include():
     include_dir = pa.get_include()
     assert os.path.exists(os.path.join(include_dir, 'arrow', 'api.h'))
+
+
+def test_cpu_count():
+    n = pa.cpu_count()
+    assert n > 0
+    try:
+        pa.set_cpu_count(n + 5)
+        assert pa.cpu_count() == n + 5
+    finally:
+        pa.set_cpu_count(n)
diff --git a/python/pyarrow/util.py b/python/pyarrow/util.py
index b882565..2954b62 100644
--- a/python/pyarrow/util.py
+++ b/python/pyarrow/util.py
@@ -35,3 +35,13 @@ def _deprecate_api(old_name, new_name, api, next_version):
         warnings.warn(msg, FutureWarning)
         return api(*args)
     return wrapper
+
+
+def _deprecate_nthreads(use_threads, nthreads):
+    if nthreads is not None:
+        warnings.warn("`nthreads` argument is deprecated, "
+                      "pass `use_threads` instead", FutureWarning,
+                      stacklevel=3)
+        if nthreads > 1:
+            use_threads = True
+    return use_threads

-- 
To stop receiving notification emails like this one, please contact
wesm@apache.org.