You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2017/01/24 13:30:44 UTC
arrow git commit: ARROW-509: [Python] Add support for multithreaded
Parquet reads
Repository: arrow
Updated Branches:
refs/heads/master c90ca60c1 -> 61a54f8a6
ARROW-509: [Python] Add support for multithreaded Parquet reads
I'm getting very nice speedups on a Parquet file storing a ~4.5 GB dataset:
```
In [1]: import pyarrow.parquet as pq
In [2]: %time table = pq.read_table('/home/wesm/data/airlines_parquet/4345e5eef217aa1b-c8f16177f35fd983_1150363067_data.0.parq')
CPU times: user 8.21 s, sys: 468 ms, total: 8.68 s
Wall time: 8.68 s
In [3]: %time table = pq.read_table('/home/wesm/data/airlines_parquet/4345e5eef217aa1b-c8f16177f35fd983_1150363067_data.0.parq', nthreads=4)
CPU times: user 8.84 s, sys: 4.28 s, total: 13.1 s
Wall time: 3.91 s
In [4]: %time table = pq.read_table('/home/wesm/data/airlines_parquet/4345e5eef217aa1b-c8f16177f35fd983_1150363067_data.0.parq', nthreads=8)
CPU times: user 13.3 s, sys: 1.15 s, total: 14.4 s
Wall time: 2.86 s
```
This requires a bugfix in parquet-cpp that will come soon in a patch for PARQUET-836
Author: Wes McKinney <we...@twosigma.com>
Closes #301 from wesm/ARROW-509 and squashes the following commits:
9816689 [Wes McKinney] Update docs slightly, flake8 warning
239b086 [Wes McKinney] Add support for nthreads option in parquet::arrow, unit tests
Project: http://git-wip-us.apache.org/repos/asf/arrow/repo
Commit: http://git-wip-us.apache.org/repos/asf/arrow/commit/61a54f8a
Tree: http://git-wip-us.apache.org/repos/asf/arrow/tree/61a54f8a
Diff: http://git-wip-us.apache.org/repos/asf/arrow/diff/61a54f8a
Branch: refs/heads/master
Commit: 61a54f8a619efc4fd256c446be29905d6484c5e9
Parents: c90ca60
Author: Wes McKinney <we...@twosigma.com>
Authored: Tue Jan 24 08:30:37 2017 -0500
Committer: Wes McKinney <we...@twosigma.com>
Committed: Tue Jan 24 08:30:37 2017 -0500
----------------------------------------------------------------------
python/pyarrow/_parquet.pxd | 4 ++++
python/pyarrow/_parquet.pyx | 21 ++++++++++++++----
python/pyarrow/parquet.py | 36 ++++++++++++++++++++-----------
python/pyarrow/tests/test_parquet.py | 18 ++++++++++++++++
4 files changed, 62 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/arrow/blob/61a54f8a/python/pyarrow/_parquet.pxd
----------------------------------------------------------------------
diff --git a/python/pyarrow/_parquet.pxd b/python/pyarrow/_parquet.pxd
index cf1da1c..fabee5d 100644
--- a/python/pyarrow/_parquet.pxd
+++ b/python/pyarrow/_parquet.pxd
@@ -213,8 +213,12 @@ cdef extern from "parquet/arrow/reader.h" namespace "parquet::arrow" nogil:
FileReader(MemoryPool* pool, unique_ptr[ParquetFileReader] reader)
CStatus ReadFlatColumn(int i, shared_ptr[CArray]* out);
CStatus ReadFlatTable(shared_ptr[CTable]* out);
+ CStatus ReadFlatTable(const vector[int]& column_indices,
+ shared_ptr[CTable]* out);
const ParquetFileReader* parquet_reader();
+ void set_num_threads(int num_threads)
+
cdef extern from "parquet/arrow/schema.h" namespace "parquet::arrow" nogil:
CStatus FromParquetSchema(const SchemaDescriptor* parquet_schema,
http://git-wip-us.apache.org/repos/asf/arrow/blob/61a54f8a/python/pyarrow/_parquet.pyx
----------------------------------------------------------------------
diff --git a/python/pyarrow/_parquet.pyx b/python/pyarrow/_parquet.pyx
index b11cee3..3f847e9 100644
--- a/python/pyarrow/_parquet.pyx
+++ b/python/pyarrow/_parquet.pyx
@@ -382,14 +382,27 @@ cdef class ParquetReader:
result.init(metadata)
return result
- def read_all(self):
+ def read(self, column_indices=None, nthreads=1):
cdef:
Table table = Table()
shared_ptr[CTable] ctable
+ vector[int] c_column_indices
- with nogil:
- check_status(self.reader.get()
- .ReadFlatTable(&ctable))
+ self.reader.get().set_num_threads(nthreads)
+
+ if column_indices is not None:
+ # Read only desired column indices
+ for index in column_indices:
+ c_column_indices.push_back(index)
+
+ with nogil:
+ check_status(self.reader.get()
+ .ReadFlatTable(c_column_indices, &ctable))
+ else:
+ # Read all columns
+ with nogil:
+ check_status(self.reader.get()
+ .ReadFlatTable(&ctable))
table.init(ctable)
return table
http://git-wip-us.apache.org/repos/asf/arrow/blob/61a54f8a/python/pyarrow/parquet.py
----------------------------------------------------------------------
diff --git a/python/pyarrow/parquet.py b/python/pyarrow/parquet.py
index cbe1c6e..6654b77 100644
--- a/python/pyarrow/parquet.py
+++ b/python/pyarrow/parquet.py
@@ -18,7 +18,7 @@
from pyarrow._parquet import (ParquetReader, FileMetaData, # noqa
RowGroupMetaData, Schema, ParquetWriter)
import pyarrow._parquet as _parquet # noqa
-from pyarrow.table import Table, concat_tables
+from pyarrow.table import concat_tables
class ParquetFile(object):
@@ -45,7 +45,7 @@ class ParquetFile(object):
def schema(self):
return self.metadata.schema
- def read(self, nrows=None, columns=None):
+ def read(self, nrows=None, columns=None, nthreads=1):
"""
Read a Table from Parquet format
@@ -53,6 +53,9 @@ class ParquetFile(object):
----------
columns: list
If not None, only these columns will be read from the file.
+ nthreads : int, default 1
+ Number of columns to read in parallel. Requires that the underlying
+ file source is threadsafe
Returns
-------
@@ -63,16 +66,16 @@ class ParquetFile(object):
raise NotImplementedError("nrows argument")
if columns is None:
- return self.reader.read_all()
+ column_indices = None
else:
- column_idxs = [self.reader.column_name_idx(column)
- for column in columns]
- arrays = [self.reader.read_column(column_idx)
- for column_idx in column_idxs]
- return Table.from_arrays(arrays, names=columns)
+ column_indices = [self.reader.column_name_idx(column)
+ for column in columns]
+ return self.reader.read(column_indices=column_indices,
+ nthreads=nthreads)
-def read_table(source, columns=None, metadata=None):
+
+def read_table(source, columns=None, nthreads=1, metadata=None):
"""
Read a Table from Parquet format
@@ -83,6 +86,9 @@ def read_table(source, columns=None, metadata=None):
pyarrow.io.PythonFileInterface or pyarrow.io.BufferReader.
columns: list
If not None, only these columns will be read from the file.
+ nthreads : int, default 1
+ Number of columns to read in parallel. Requires that the underlying
+ file source is threadsafe
metadata : FileMetaData
If separately computed
@@ -91,11 +97,12 @@ def read_table(source, columns=None, metadata=None):
pyarrow.Table
Content of the file as a table (of columns)
"""
- return ParquetFile(source, metadata=metadata).read(columns=columns)
+ pf = ParquetFile(source, metadata=metadata)
+ return pf.read(columns=columns, nthreads=nthreads)
-def read_multiple_files(paths, columns=None, filesystem=None, metadata=None,
- schema=None):
+def read_multiple_files(paths, columns=None, filesystem=None, nthreads=1,
+ metadata=None, schema=None):
"""
Read multiple Parquet files as a single pyarrow.Table
@@ -108,6 +115,9 @@ def read_multiple_files(paths, columns=None, filesystem=None, metadata=None,
filesystem : Filesystem, default None
If nothing passed, paths assumed to be found in the local on-disk
filesystem
+ nthreads : int, default 1
+ Number of columns to read in parallel. Requires that the underlying
+ file source is threadsafe
metadata : pyarrow.parquet.FileMetaData
Use metadata obtained elsewhere to validate file schemas
schema : pyarrow.parquet.Schema
@@ -147,7 +157,7 @@ def read_multiple_files(paths, columns=None, filesystem=None, metadata=None,
tables = []
for path, path_metadata in zip(paths, all_file_metadata):
reader = open_file(path, meta=path_metadata)
- table = reader.read(columns=columns)
+ table = reader.read(columns=columns, nthreads=nthreads)
tables.append(table)
all_data = concat_tables(tables)
http://git-wip-us.apache.org/repos/asf/arrow/blob/61a54f8a/python/pyarrow/tests/test_parquet.py
----------------------------------------------------------------------
diff --git a/python/pyarrow/tests/test_parquet.py b/python/pyarrow/tests/test_parquet.py
index a94fe45..d85f0e5 100644
--- a/python/pyarrow/tests/test_parquet.py
+++ b/python/pyarrow/tests/test_parquet.py
@@ -321,6 +321,24 @@ def test_compare_schemas():
@parquet
+def test_multithreaded_read():
+ df = alltypes_sample(size=10000)
+
+ table = pa.Table.from_pandas(df, timestamps_to_ms=True)
+
+ buf = io.BytesIO()
+ pq.write_table(table, buf, compression='SNAPPY', version='2.0')
+
+ buf.seek(0)
+ table1 = pq.read_table(buf, nthreads=4)
+
+ buf.seek(0)
+ table2 = pq.read_table(buf, nthreads=1)
+
+ assert table1.equals(table2)
+
+
+@parquet
def test_pass_separate_metadata():
# ARROW-471
df = alltypes_sample(size=10000)