You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2017/01/24 13:30:44 UTC

arrow git commit: ARROW-509: [Python] Add support for multithreaded Parquet reads

Repository: arrow
Updated Branches:
  refs/heads/master c90ca60c1 -> 61a54f8a6


ARROW-509: [Python] Add support for multithreaded Parquet reads

I'm getting very nice speedups on a Parquet file storing a ~4.5 GB dataset:

```
In [1]: import pyarrow.parquet as pq

In [2]: %time table = pq.read_table('/home/wesm/data/airlines_parquet/4345e5eef217aa1b-c8f16177f35fd983_1150363067_data.0.parq')
CPU times: user 8.21 s, sys: 468 ms, total: 8.68 s
Wall time: 8.68 s

In [3]: %time table = pq.read_table('/home/wesm/data/airlines_parquet/4345e5eef217aa1b-c8f16177f35fd983_1150363067_data.0.parq', nthreads=4)
CPU times: user 8.84 s, sys: 4.28 s, total: 13.1 s
Wall time: 3.91 s

In [4]: %time table = pq.read_table('/home/wesm/data/airlines_parquet/4345e5eef217aa1b-c8f16177f35fd983_1150363067_data.0.parq', nthreads=8)
CPU times: user 13.3 s, sys: 1.15 s, total: 14.4 s
Wall time: 2.86 s
```

This requires a bugfix in parquet-cpp that will come soon in a patch for PARQUET-836

Author: Wes McKinney <we...@twosigma.com>

Closes #301 from wesm/ARROW-509 and squashes the following commits:

9816689 [Wes McKinney] Update docs slightly, flake8 warning
239b086 [Wes McKinney] Add support for nthreads option in parquet::arrow, unit tests


Project: http://git-wip-us.apache.org/repos/asf/arrow/repo
Commit: http://git-wip-us.apache.org/repos/asf/arrow/commit/61a54f8a
Tree: http://git-wip-us.apache.org/repos/asf/arrow/tree/61a54f8a
Diff: http://git-wip-us.apache.org/repos/asf/arrow/diff/61a54f8a

Branch: refs/heads/master
Commit: 61a54f8a619efc4fd256c446be29905d6484c5e9
Parents: c90ca60
Author: Wes McKinney <we...@twosigma.com>
Authored: Tue Jan 24 08:30:37 2017 -0500
Committer: Wes McKinney <we...@twosigma.com>
Committed: Tue Jan 24 08:30:37 2017 -0500

----------------------------------------------------------------------
 python/pyarrow/_parquet.pxd          |  4 ++++
 python/pyarrow/_parquet.pyx          | 21 ++++++++++++++----
 python/pyarrow/parquet.py            | 36 ++++++++++++++++++++-----------
 python/pyarrow/tests/test_parquet.py | 18 ++++++++++++++++
 4 files changed, 62 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/arrow/blob/61a54f8a/python/pyarrow/_parquet.pxd
----------------------------------------------------------------------
diff --git a/python/pyarrow/_parquet.pxd b/python/pyarrow/_parquet.pxd
index cf1da1c..fabee5d 100644
--- a/python/pyarrow/_parquet.pxd
+++ b/python/pyarrow/_parquet.pxd
@@ -213,8 +213,12 @@ cdef extern from "parquet/arrow/reader.h" namespace "parquet::arrow" nogil:
         FileReader(MemoryPool* pool, unique_ptr[ParquetFileReader] reader)
         CStatus ReadFlatColumn(int i, shared_ptr[CArray]* out);
         CStatus ReadFlatTable(shared_ptr[CTable]* out);
+        CStatus ReadFlatTable(const vector[int]& column_indices,
+                              shared_ptr[CTable]* out);
         const ParquetFileReader* parquet_reader();
 
+        void set_num_threads(int num_threads)
+
 
 cdef extern from "parquet/arrow/schema.h" namespace "parquet::arrow" nogil:
     CStatus FromParquetSchema(const SchemaDescriptor* parquet_schema,

http://git-wip-us.apache.org/repos/asf/arrow/blob/61a54f8a/python/pyarrow/_parquet.pyx
----------------------------------------------------------------------
diff --git a/python/pyarrow/_parquet.pyx b/python/pyarrow/_parquet.pyx
index b11cee3..3f847e9 100644
--- a/python/pyarrow/_parquet.pyx
+++ b/python/pyarrow/_parquet.pyx
@@ -382,14 +382,27 @@ cdef class ParquetReader:
         result.init(metadata)
         return result
 
-    def read_all(self):
+    def read(self, column_indices=None, nthreads=1):
         cdef:
             Table table = Table()
             shared_ptr[CTable] ctable
+            vector[int] c_column_indices
 
-        with nogil:
-            check_status(self.reader.get()
-                         .ReadFlatTable(&ctable))
+        self.reader.get().set_num_threads(nthreads)
+
+        if column_indices is not None:
+            # Read only desired column indices
+            for index in column_indices:
+                c_column_indices.push_back(index)
+
+            with nogil:
+                check_status(self.reader.get()
+                             .ReadFlatTable(c_column_indices, &ctable))
+        else:
+            # Read all columns
+            with nogil:
+                check_status(self.reader.get()
+                             .ReadFlatTable(&ctable))
 
         table.init(ctable)
         return table

http://git-wip-us.apache.org/repos/asf/arrow/blob/61a54f8a/python/pyarrow/parquet.py
----------------------------------------------------------------------
diff --git a/python/pyarrow/parquet.py b/python/pyarrow/parquet.py
index cbe1c6e..6654b77 100644
--- a/python/pyarrow/parquet.py
+++ b/python/pyarrow/parquet.py
@@ -18,7 +18,7 @@
 from pyarrow._parquet import (ParquetReader, FileMetaData,  # noqa
                               RowGroupMetaData, Schema, ParquetWriter)
 import pyarrow._parquet as _parquet  # noqa
-from pyarrow.table import Table, concat_tables
+from pyarrow.table import concat_tables
 
 
 class ParquetFile(object):
@@ -45,7 +45,7 @@ class ParquetFile(object):
     def schema(self):
         return self.metadata.schema
 
-    def read(self, nrows=None, columns=None):
+    def read(self, nrows=None, columns=None, nthreads=1):
         """
         Read a Table from Parquet format
 
@@ -53,6 +53,9 @@ class ParquetFile(object):
         ----------
         columns: list
             If not None, only these columns will be read from the file.
+        nthreads : int, default 1
+            Number of columns to read in parallel. Requires that the underlying
+            file source is threadsafe
 
         Returns
         -------
@@ -63,16 +66,16 @@ class ParquetFile(object):
             raise NotImplementedError("nrows argument")
 
         if columns is None:
-            return self.reader.read_all()
+            column_indices = None
         else:
-            column_idxs = [self.reader.column_name_idx(column)
-                           for column in columns]
-            arrays = [self.reader.read_column(column_idx)
-                      for column_idx in column_idxs]
-            return Table.from_arrays(arrays, names=columns)
+            column_indices = [self.reader.column_name_idx(column)
+                              for column in columns]
 
+        return self.reader.read(column_indices=column_indices,
+                                nthreads=nthreads)
 
-def read_table(source, columns=None, metadata=None):
+
+def read_table(source, columns=None, nthreads=1, metadata=None):
     """
     Read a Table from Parquet format
 
@@ -83,6 +86,9 @@ def read_table(source, columns=None, metadata=None):
         pyarrow.io.PythonFileInterface or pyarrow.io.BufferReader.
     columns: list
         If not None, only these columns will be read from the file.
+    nthreads : int, default 1
+        Number of columns to read in parallel. Requires that the underlying
+        file source is threadsafe
     metadata : FileMetaData
         If separately computed
 
@@ -91,11 +97,12 @@ def read_table(source, columns=None, metadata=None):
     pyarrow.Table
         Content of the file as a table (of columns)
     """
-    return ParquetFile(source, metadata=metadata).read(columns=columns)
+    pf = ParquetFile(source, metadata=metadata)
+    return pf.read(columns=columns, nthreads=nthreads)
 
 
-def read_multiple_files(paths, columns=None, filesystem=None, metadata=None,
-                        schema=None):
+def read_multiple_files(paths, columns=None, filesystem=None, nthreads=1,
+                        metadata=None, schema=None):
     """
     Read multiple Parquet files as a single pyarrow.Table
 
@@ -108,6 +115,9 @@ def read_multiple_files(paths, columns=None, filesystem=None, metadata=None,
     filesystem : Filesystem, default None
         If nothing passed, paths assumed to be found in the local on-disk
         filesystem
+    nthreads : int, default 1
+        Number of columns to read in parallel. Requires that the underlying
+        file source is threadsafe
     metadata : pyarrow.parquet.FileMetaData
         Use metadata obtained elsewhere to validate file schemas
     schema : pyarrow.parquet.Schema
@@ -147,7 +157,7 @@ def read_multiple_files(paths, columns=None, filesystem=None, metadata=None,
     tables = []
     for path, path_metadata in zip(paths, all_file_metadata):
         reader = open_file(path, meta=path_metadata)
-        table = reader.read(columns=columns)
+        table = reader.read(columns=columns, nthreads=nthreads)
         tables.append(table)
 
     all_data = concat_tables(tables)

http://git-wip-us.apache.org/repos/asf/arrow/blob/61a54f8a/python/pyarrow/tests/test_parquet.py
----------------------------------------------------------------------
diff --git a/python/pyarrow/tests/test_parquet.py b/python/pyarrow/tests/test_parquet.py
index a94fe45..d85f0e5 100644
--- a/python/pyarrow/tests/test_parquet.py
+++ b/python/pyarrow/tests/test_parquet.py
@@ -321,6 +321,24 @@ def test_compare_schemas():
 
 
 @parquet
+def test_multithreaded_read():
+    df = alltypes_sample(size=10000)
+
+    table = pa.Table.from_pandas(df, timestamps_to_ms=True)
+
+    buf = io.BytesIO()
+    pq.write_table(table, buf, compression='SNAPPY', version='2.0')
+
+    buf.seek(0)
+    table1 = pq.read_table(buf, nthreads=4)
+
+    buf.seek(0)
+    table2 = pq.read_table(buf, nthreads=1)
+
+    assert table1.equals(table2)
+
+
+@parquet
 def test_pass_separate_metadata():
     # ARROW-471
     df = alltypes_sample(size=10000)