You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2017/04/06 17:49:41 UTC
arrow git commit: ARROW-771: [Python] Add read_row_group /
num_row_groups to ParquetFile
Repository: arrow
Updated Branches:
refs/heads/master ff744ef13 -> 56f1e91d2
ARROW-771: [Python] Add read_row_group / num_row_groups to ParquetFile
requires PARQUET-946 https://github.com/apache/parquet-cpp/pull/291
cc @cpcloud @jreback @mrocklin
Author: Wes McKinney <we...@twosigma.com>
Closes #494 from wesm/ARROW-771 and squashes the following commits:
126789a [Wes McKinney] Fix docstring
1009423 [Wes McKinney] Add read_row_group / num_row_groups to ParquetFile
Project: http://git-wip-us.apache.org/repos/asf/arrow/repo
Commit: http://git-wip-us.apache.org/repos/asf/arrow/commit/56f1e91d
Tree: http://git-wip-us.apache.org/repos/asf/arrow/tree/56f1e91d
Diff: http://git-wip-us.apache.org/repos/asf/arrow/diff/56f1e91d
Branch: refs/heads/master
Commit: 56f1e91d2961a13b7f677785fa705bed06d9639d
Parents: ff744ef
Author: Wes McKinney <we...@twosigma.com>
Authored: Thu Apr 6 13:49:32 2017 -0400
Committer: Wes McKinney <we...@twosigma.com>
Committed: Thu Apr 6 13:49:32 2017 -0400
----------------------------------------------------------------------
python/pyarrow/_parquet.pxd | 17 +++++++---
python/pyarrow/_parquet.pyx | 37 ++++++++++++++++-----
python/pyarrow/parquet.py | 53 +++++++++++++++++++++++--------
python/pyarrow/tests/test_parquet.py | 29 +++++++++++++++++
4 files changed, 109 insertions(+), 27 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/arrow/blob/56f1e91d/python/pyarrow/_parquet.pxd
----------------------------------------------------------------------
diff --git a/python/pyarrow/_parquet.pxd b/python/pyarrow/_parquet.pxd
index f12c86f..1ac1f69 100644
--- a/python/pyarrow/_parquet.pxd
+++ b/python/pyarrow/_parquet.pxd
@@ -179,7 +179,7 @@ cdef extern from "parquet/api/reader.h" namespace "parquet" nogil:
@staticmethod
unique_ptr[ParquetFileReader] OpenFile(const c_string& path)
- shared_ptr[CFileMetaData] metadata();
+ shared_ptr[CFileMetaData] metadata()
cdef extern from "parquet/api/writer.h" namespace "parquet" nogil:
@@ -211,11 +211,18 @@ cdef extern from "parquet/arrow/reader.h" namespace "parquet::arrow" nogil:
cdef cppclass FileReader:
FileReader(CMemoryPool* pool, unique_ptr[ParquetFileReader] reader)
- CStatus ReadColumn(int i, shared_ptr[CArray]* out);
- CStatus ReadTable(shared_ptr[CTable]* out);
+ CStatus ReadColumn(int i, shared_ptr[CArray]* out)
+
+ int num_row_groups()
+ CStatus ReadRowGroup(int i, shared_ptr[CTable]* out)
+ CStatus ReadRowGroup(int i, const vector[int]& column_indices,
+ shared_ptr[CTable]* out)
+
+ CStatus ReadTable(shared_ptr[CTable]* out)
CStatus ReadTable(const vector[int]& column_indices,
- shared_ptr[CTable]* out);
- const ParquetFileReader* parquet_reader();
+ shared_ptr[CTable]* out)
+
+ const ParquetFileReader* parquet_reader()
void set_num_threads(int num_threads)
http://git-wip-us.apache.org/repos/asf/arrow/blob/56f1e91d/python/pyarrow/_parquet.pyx
----------------------------------------------------------------------
diff --git a/python/pyarrow/_parquet.pyx b/python/pyarrow/_parquet.pyx
index cfd2816..079bf5e 100644
--- a/python/pyarrow/_parquet.pyx
+++ b/python/pyarrow/_parquet.pyx
@@ -31,7 +31,7 @@ from pyarrow.error import ArrowException
from pyarrow.error cimport check_status
from pyarrow.io import NativeFile
from pyarrow.memory cimport MemoryPool, maybe_unbox_memory_pool
-from pyarrow.table cimport Table
+from pyarrow.table cimport Table, table_from_ctable
from pyarrow.io cimport NativeFile, get_reader, get_writer
@@ -381,16 +381,39 @@ cdef class ParquetReader:
result.init(metadata)
return result
- def read(self, column_indices=None, nthreads=1):
+ property num_row_groups:
+
+ def __get__(self):
+ return self.reader.get().num_row_groups()
+
+ def set_num_threads(self, int nthreads):
+ self.reader.get().set_num_threads(nthreads)
+
+ def read_row_group(self, int i, column_indices=None):
cdef:
- Table table = Table()
shared_ptr[CTable] ctable
vector[int] c_column_indices
- self.reader.get().set_num_threads(nthreads)
+ if column_indices is not None:
+ for index in column_indices:
+ c_column_indices.push_back(index)
+
+ with nogil:
+ check_status(self.reader.get()
+ .ReadRowGroup(i, c_column_indices, &ctable))
+ else:
+ # Read all columns
+ with nogil:
+ check_status(self.reader.get()
+ .ReadRowGroup(i, &ctable))
+ return table_from_ctable(ctable)
+
+ def read_all(self, column_indices=None):
+ cdef:
+ shared_ptr[CTable] ctable
+ vector[int] c_column_indices
if column_indices is not None:
- # Read only desired column indices
for index in column_indices:
c_column_indices.push_back(index)
@@ -402,9 +425,7 @@ cdef class ParquetReader:
with nogil:
check_status(self.reader.get()
.ReadTable(&ctable))
-
- table.init(ctable)
- return table
+ return table_from_ctable(ctable)
def column_name_idx(self, column_name):
"""
http://git-wip-us.apache.org/repos/asf/arrow/blob/56f1e91d/python/pyarrow/parquet.py
----------------------------------------------------------------------
diff --git a/python/pyarrow/parquet.py b/python/pyarrow/parquet.py
index 2985316..d95c3b3 100644
--- a/python/pyarrow/parquet.py
+++ b/python/pyarrow/parquet.py
@@ -50,7 +50,32 @@ class ParquetFile(object):
def schema(self):
return self.metadata.schema
- def read(self, nrows=None, columns=None, nthreads=1):
+ @property
+ def num_row_groups(self):
+ return self.reader.num_row_groups
+
+ def read_row_group(self, i, columns=None, nthreads=1):
+ """
+ Read a single row group from a Parquet file
+
+ Parameters
+ ----------
+ columns: list
+ If not None, only these columns will be read from the row group.
+ nthreads : int, default 1
+ Number of columns to read in parallel. If > 1, requires that the
+ underlying file source is threadsafe
+
+ Returns
+ -------
+ pyarrow.table.Table
+ Content of the row group as a table (of columns)
+ """
+ column_indices = self._get_column_indices(columns)
+ self.reader.set_num_threads(nthreads)
+ return self.reader.read_row_group(i, column_indices=column_indices)
+
+ def read(self, columns=None, nthreads=1):
"""
Read a Table from Parquet format
@@ -67,17 +92,16 @@ class ParquetFile(object):
pyarrow.table.Table
Content of the file as a table (of columns)
"""
- if nrows is not None:
- raise NotImplementedError("nrows argument")
+ column_indices = self._get_column_indices(columns)
+ self.reader.set_num_threads(nthreads)
+ return self.reader.read_all(column_indices=column_indices)
- if columns is None:
- column_indices = None
+ def _get_column_indices(self, column_names):
+ if column_names is None:
+ return None
else:
- column_indices = [self.reader.column_name_idx(column)
- for column in columns]
-
- return self.reader.read(column_indices=column_indices,
- nthreads=nthreads)
+ return [self.reader.column_name_idx(column)
+ for column in column_names]
def read_table(source, columns=None, nthreads=1, metadata=None):
@@ -178,8 +202,8 @@ def read_multiple_files(paths, columns=None, filesystem=None, nthreads=1,
return all_data
-def write_table(table, sink, chunk_size=None, version='1.0',
- use_dictionary=True, compression='snappy'):
+def write_table(table, sink, row_group_size=None, version='1.0',
+ use_dictionary=True, compression='snappy', **kwargs):
"""
Write a Table to Parquet format
@@ -187,7 +211,7 @@ def write_table(table, sink, chunk_size=None, version='1.0',
----------
table : pyarrow.Table
sink: string or pyarrow.io.NativeFile
- chunk_size : int, default None
+ row_group_size : int, default None
The maximum number of rows in each Parquet RowGroup. As a default,
we will write a single RowGroup per file.
version : {"1.0", "2.0"}, default "1.0"
@@ -198,7 +222,8 @@ def write_table(table, sink, chunk_size=None, version='1.0',
compression : str or dict
Specify the compression codec, either on a general basis or per-column.
"""
+ row_group_size = kwargs.get('chunk_size', row_group_size)
writer = ParquetWriter(sink, use_dictionary=use_dictionary,
compression=compression,
version=version)
- writer.write_table(table, row_group_size=chunk_size)
+ writer.write_table(table, row_group_size=row_group_size)
http://git-wip-us.apache.org/repos/asf/arrow/blob/56f1e91d/python/pyarrow/tests/test_parquet.py
----------------------------------------------------------------------
diff --git a/python/pyarrow/tests/test_parquet.py b/python/pyarrow/tests/test_parquet.py
index b8b2800..86165be 100644
--- a/python/pyarrow/tests/test_parquet.py
+++ b/python/pyarrow/tests/test_parquet.py
@@ -403,6 +403,35 @@ def test_pass_separate_metadata():
@parquet
+def test_read_single_row_group():
+ # ARROW-471
+ N, K = 10000, 4
+ df = alltypes_sample(size=N)
+
+ a_table = pa.Table.from_pandas(df, timestamps_to_ms=True)
+
+ buf = io.BytesIO()
+ pq.write_table(a_table, buf, row_group_size=N / K,
+ compression='snappy', version='2.0')
+
+ buf.seek(0)
+
+ pf = pq.ParquetFile(buf)
+
+ assert pf.num_row_groups == K
+
+ row_groups = [pf.read_row_group(i) for i in range(K)]
+ result = pa.concat_tables(row_groups)
+ pdt.assert_frame_equal(df, result.to_pandas())
+
+ cols = df.columns[:2]
+ row_groups = [pf.read_row_group(i, columns=cols)
+ for i in range(K)]
+ result = pa.concat_tables(row_groups)
+ pdt.assert_frame_equal(df[cols], result.to_pandas())
+
+
+@parquet
def test_read_multiple_files(tmpdir):
nfiles = 10
size = 5