You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2020/06/28 14:40:10 UTC

[arrow] branch master updated: ARROW-8888: [Python] Do not use thread pool when converting pandas columns that are definitely zero-copyable

This is an automated email from the ASF dual-hosted git repository.

wesm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new 2b37fd4  ARROW-8888: [Python] Do not use thread pool when converting pandas columns that are definitely zero-copyable
2b37fd4 is described below

commit 2b37fd422889c337010083147891c1132802ac50
Author: Wes McKinney <we...@apache.org>
AuthorDate: Sun Jun 28 09:39:40 2020 -0500

    ARROW-8888: [Python] Do not use thread pool when converting pandas columns that are definitely zero-copyable
    
    The ThreadPoolExecutor has a good amount of per-column overhead
    
    Closes #7563 from wesm/ARROW-8888
    
    Authored-by: Wes McKinney <we...@apache.org>
    Signed-off-by: Wes McKinney <we...@apache.org>
---
 python/pyarrow/pandas_compat.py | 22 ++++++++++++++++++----
 python/pyarrow/table.pxi        |  7 +++++--
 2 files changed, 23 insertions(+), 6 deletions(-)

diff --git a/python/pyarrow/pandas_compat.py b/python/pyarrow/pandas_compat.py
index 2d390b5..cbc48a7 100644
--- a/python/pyarrow/pandas_compat.py
+++ b/python/pyarrow/pandas_compat.py
@@ -539,10 +539,10 @@ def dataframe_to_arrays(df, schema, preserve_index, nthreads=1, columns=None,
 
     # NOTE(wesm): If nthreads=None, then we use a heuristic to decide whether
     # using a thread pool is worth it. Currently the heuristic is whether the
-    # nrows > 100 * ncols.
+    # nrows > 100 * ncols and ncols > 1.
     if nthreads is None:
         nrows, ncols = len(df), len(df.columns)
-        if nrows > ncols * 100:
+        if nrows > ncols * 100 and ncols > 1:
             nthreads = pa.cpu_count()
         else:
             nthreads = 1
@@ -569,14 +569,28 @@ def dataframe_to_arrays(df, schema, preserve_index, nthreads=1, columns=None,
                                                          result.null_count))
         return result
 
+    def _can_definitely_zero_copy(arr):
+        return (isinstance(arr, np.ndarray) and
+                arr.flags.contiguous and
+                issubclass(arr.dtype.type, np.integer))
+
     if nthreads == 1:
         arrays = [convert_column(c, f)
                   for c, f in zip(columns_to_convert, convert_fields)]
     else:
         from concurrent import futures
+
+        arrays = []
         with futures.ThreadPoolExecutor(nthreads) as executor:
-            arrays = list(executor.map(convert_column, columns_to_convert,
-                                       convert_fields))
+            for c, f in zip(columns_to_convert, convert_fields):
+                if _can_definitely_zero_copy(c.values):
+                    arrays.append(convert_column(c, f))
+                else:
+                    arrays.append(executor.submit(convert_column, c, f))
+
+        for i, maybe_fut in enumerate(arrays):
+            if isinstance(maybe_fut, futures.Future):
+                arrays[i] = maybe_fut.result()
 
     types = [x.type for x in arrays]
 
diff --git a/python/pyarrow/table.pxi b/python/pyarrow/table.pxi
index 0d061e6..e0cbac6 100644
--- a/python/pyarrow/table.pxi
+++ b/python/pyarrow/table.pxi
@@ -1926,7 +1926,7 @@ def record_batch(data, names=None, schema=None, metadata=None):
         raise TypeError("Expected pandas DataFrame or list of arrays")
 
 
-def table(data, names=None, schema=None, metadata=None):
+def table(data, names=None, schema=None, metadata=None, nthreads=None):
     """
     Create a pyarrow.Table from a Python data structure or sequence of arrays.
 
@@ -1946,6 +1946,9 @@ def table(data, names=None, schema=None, metadata=None):
         specified in the schema, when data is a dict or DataFrame).
     metadata : dict or Mapping, default None
         Optional metadata for the schema (if schema not passed).
+    nthreads : int, default None (may use up to system CPU count threads)
+        For pandas.DataFrame inputs: if greater than 1, convert columns to
+        Arrow in parallel using indicated number of threads.
 
     Returns
     -------
@@ -1973,7 +1976,7 @@ def table(data, names=None, schema=None, metadata=None):
             raise ValueError(
                 "The 'names' and 'metadata' arguments are not valid when "
                 "passing a pandas DataFrame")
-        return Table.from_pandas(data, schema=schema)
+        return Table.from_pandas(data, schema=schema, nthreads=nthreads)
     else:
         raise TypeError(
             "Expected pandas DataFrame, python dictionary or list of arrays")