You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2020/06/28 14:40:10 UTC
[arrow] branch master updated: ARROW-8888: [Python] Do not use
thread pool when converting pandas columns that are definitely
zero-copyable
This is an automated email from the ASF dual-hosted git repository.
wesm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new 2b37fd4 ARROW-8888: [Python] Do not use thread pool when converting pandas columns that are definitely zero-copyable
2b37fd4 is described below
commit 2b37fd422889c337010083147891c1132802ac50
Author: Wes McKinney <we...@apache.org>
AuthorDate: Sun Jun 28 09:39:40 2020 -0500
ARROW-8888: [Python] Do not use thread pool when converting pandas columns that are definitely zero-copyable
The ThreadPoolExecutor has a good amount of per-column overhead
Closes #7563 from wesm/ARROW-8888
Authored-by: Wes McKinney <we...@apache.org>
Signed-off-by: Wes McKinney <we...@apache.org>
---
python/pyarrow/pandas_compat.py | 22 ++++++++++++++++++----
python/pyarrow/table.pxi | 7 +++++--
2 files changed, 23 insertions(+), 6 deletions(-)
diff --git a/python/pyarrow/pandas_compat.py b/python/pyarrow/pandas_compat.py
index 2d390b5..cbc48a7 100644
--- a/python/pyarrow/pandas_compat.py
+++ b/python/pyarrow/pandas_compat.py
@@ -539,10 +539,10 @@ def dataframe_to_arrays(df, schema, preserve_index, nthreads=1, columns=None,
# NOTE(wesm): If nthreads=None, then we use a heuristic to decide whether
# using a thread pool is worth it. Currently the heuristic is whether the
- # nrows > 100 * ncols.
+ # nrows > 100 * ncols and ncols > 1.
if nthreads is None:
nrows, ncols = len(df), len(df.columns)
- if nrows > ncols * 100:
+ if nrows > ncols * 100 and ncols > 1:
nthreads = pa.cpu_count()
else:
nthreads = 1
@@ -569,14 +569,28 @@ def dataframe_to_arrays(df, schema, preserve_index, nthreads=1, columns=None,
result.null_count))
return result
+ def _can_definitely_zero_copy(arr):
+ return (isinstance(arr, np.ndarray) and
+ arr.flags.contiguous and
+ issubclass(arr.dtype.type, np.integer))
+
if nthreads == 1:
arrays = [convert_column(c, f)
for c, f in zip(columns_to_convert, convert_fields)]
else:
from concurrent import futures
+
+ arrays = []
with futures.ThreadPoolExecutor(nthreads) as executor:
- arrays = list(executor.map(convert_column, columns_to_convert,
- convert_fields))
+ for c, f in zip(columns_to_convert, convert_fields):
+ if _can_definitely_zero_copy(c.values):
+ arrays.append(convert_column(c, f))
+ else:
+ arrays.append(executor.submit(convert_column, c, f))
+
+ for i, maybe_fut in enumerate(arrays):
+ if isinstance(maybe_fut, futures.Future):
+ arrays[i] = maybe_fut.result()
types = [x.type for x in arrays]
diff --git a/python/pyarrow/table.pxi b/python/pyarrow/table.pxi
index 0d061e6..e0cbac6 100644
--- a/python/pyarrow/table.pxi
+++ b/python/pyarrow/table.pxi
@@ -1926,7 +1926,7 @@ def record_batch(data, names=None, schema=None, metadata=None):
raise TypeError("Expected pandas DataFrame or list of arrays")
-def table(data, names=None, schema=None, metadata=None):
+def table(data, names=None, schema=None, metadata=None, nthreads=None):
"""
Create a pyarrow.Table from a Python data structure or sequence of arrays.
@@ -1946,6 +1946,9 @@ def table(data, names=None, schema=None, metadata=None):
specified in the schema, when data is a dict or DataFrame).
metadata : dict or Mapping, default None
Optional metadata for the schema (if schema not passed).
+ nthreads : int, default None (may use up to system CPU count threads)
+ For pandas.DataFrame inputs: if greater than 1, convert columns to
+ Arrow in parallel using indicated number of threads.
Returns
-------
@@ -1973,7 +1976,7 @@ def table(data, names=None, schema=None, metadata=None):
raise ValueError(
"The 'names' and 'metadata' arguments are not valid when "
"passing a pandas DataFrame")
- return Table.from_pandas(data, schema=schema)
+ return Table.from_pandas(data, schema=schema, nthreads=nthreads)
else:
raise TypeError(
"Expected pandas DataFrame, python dictionary or list of arrays")