You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2017/04/06 17:49:41 UTC

arrow git commit: ARROW-771: [Python] Add read_row_group / num_row_groups to ParquetFile

Repository: arrow
Updated Branches:
  refs/heads/master ff744ef13 -> 56f1e91d2


ARROW-771: [Python] Add read_row_group / num_row_groups to ParquetFile

requires PARQUET-946 https://github.com/apache/parquet-cpp/pull/291

cc @cpcloud @jreback @mrocklin

Author: Wes McKinney <we...@twosigma.com>

Closes #494 from wesm/ARROW-771 and squashes the following commits:

126789a [Wes McKinney] Fix docstring
1009423 [Wes McKinney] Add read_row_group / num_row_groups to ParquetFile


Project: http://git-wip-us.apache.org/repos/asf/arrow/repo
Commit: http://git-wip-us.apache.org/repos/asf/arrow/commit/56f1e91d
Tree: http://git-wip-us.apache.org/repos/asf/arrow/tree/56f1e91d
Diff: http://git-wip-us.apache.org/repos/asf/arrow/diff/56f1e91d

Branch: refs/heads/master
Commit: 56f1e91d2961a13b7f677785fa705bed06d9639d
Parents: ff744ef
Author: Wes McKinney <we...@twosigma.com>
Authored: Thu Apr 6 13:49:32 2017 -0400
Committer: Wes McKinney <we...@twosigma.com>
Committed: Thu Apr 6 13:49:32 2017 -0400

----------------------------------------------------------------------
 python/pyarrow/_parquet.pxd          | 17 +++++++---
 python/pyarrow/_parquet.pyx          | 37 ++++++++++++++++-----
 python/pyarrow/parquet.py            | 53 +++++++++++++++++++++++--------
 python/pyarrow/tests/test_parquet.py | 29 +++++++++++++++++
 4 files changed, 109 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/arrow/blob/56f1e91d/python/pyarrow/_parquet.pxd
----------------------------------------------------------------------
diff --git a/python/pyarrow/_parquet.pxd b/python/pyarrow/_parquet.pxd
index f12c86f..1ac1f69 100644
--- a/python/pyarrow/_parquet.pxd
+++ b/python/pyarrow/_parquet.pxd
@@ -179,7 +179,7 @@ cdef extern from "parquet/api/reader.h" namespace "parquet" nogil:
 
         @staticmethod
         unique_ptr[ParquetFileReader] OpenFile(const c_string& path)
-        shared_ptr[CFileMetaData] metadata();
+        shared_ptr[CFileMetaData] metadata()
 
 
 cdef extern from "parquet/api/writer.h" namespace "parquet" nogil:
@@ -211,11 +211,18 @@ cdef extern from "parquet/arrow/reader.h" namespace "parquet::arrow" nogil:
 
     cdef cppclass FileReader:
         FileReader(CMemoryPool* pool, unique_ptr[ParquetFileReader] reader)
-        CStatus ReadColumn(int i, shared_ptr[CArray]* out);
-        CStatus ReadTable(shared_ptr[CTable]* out);
+        CStatus ReadColumn(int i, shared_ptr[CArray]* out)
+
+        int num_row_groups()
+        CStatus ReadRowGroup(int i, shared_ptr[CTable]* out)
+        CStatus ReadRowGroup(int i, const vector[int]& column_indices,
+                             shared_ptr[CTable]* out)
+
+        CStatus ReadTable(shared_ptr[CTable]* out)
         CStatus ReadTable(const vector[int]& column_indices,
-                              shared_ptr[CTable]* out);
-        const ParquetFileReader* parquet_reader();
+                          shared_ptr[CTable]* out)
+
+        const ParquetFileReader* parquet_reader()
 
         void set_num_threads(int num_threads)
 

http://git-wip-us.apache.org/repos/asf/arrow/blob/56f1e91d/python/pyarrow/_parquet.pyx
----------------------------------------------------------------------
diff --git a/python/pyarrow/_parquet.pyx b/python/pyarrow/_parquet.pyx
index cfd2816..079bf5e 100644
--- a/python/pyarrow/_parquet.pyx
+++ b/python/pyarrow/_parquet.pyx
@@ -31,7 +31,7 @@ from pyarrow.error import ArrowException
 from pyarrow.error cimport check_status
 from pyarrow.io import NativeFile
 from pyarrow.memory cimport MemoryPool, maybe_unbox_memory_pool
-from pyarrow.table cimport Table
+from pyarrow.table cimport Table, table_from_ctable
 
 from pyarrow.io cimport NativeFile, get_reader, get_writer
 
@@ -381,16 +381,39 @@ cdef class ParquetReader:
         result.init(metadata)
         return result
 
-    def read(self, column_indices=None, nthreads=1):
+    property num_row_groups:
+
+        def __get__(self):
+            return self.reader.get().num_row_groups()
+
+    def set_num_threads(self, int nthreads):
+        self.reader.get().set_num_threads(nthreads)
+
+    def read_row_group(self, int i, column_indices=None):
         cdef:
-            Table table = Table()
             shared_ptr[CTable] ctable
             vector[int] c_column_indices
 
-        self.reader.get().set_num_threads(nthreads)
+        if column_indices is not None:
+            for index in column_indices:
+                c_column_indices.push_back(index)
+
+            with nogil:
+                check_status(self.reader.get()
+                             .ReadRowGroup(i, c_column_indices, &ctable))
+        else:
+            # Read all columns
+            with nogil:
+                check_status(self.reader.get()
+                             .ReadRowGroup(i, &ctable))
+        return table_from_ctable(ctable)
+
+    def read_all(self, column_indices=None):
+        cdef:
+            shared_ptr[CTable] ctable
+            vector[int] c_column_indices
 
         if column_indices is not None:
-            # Read only desired column indices
             for index in column_indices:
                 c_column_indices.push_back(index)
 
@@ -402,9 +425,7 @@ cdef class ParquetReader:
             with nogil:
                 check_status(self.reader.get()
                              .ReadTable(&ctable))
-
-        table.init(ctable)
-        return table
+        return table_from_ctable(ctable)
 
     def column_name_idx(self, column_name):
         """

http://git-wip-us.apache.org/repos/asf/arrow/blob/56f1e91d/python/pyarrow/parquet.py
----------------------------------------------------------------------
diff --git a/python/pyarrow/parquet.py b/python/pyarrow/parquet.py
index 2985316..d95c3b3 100644
--- a/python/pyarrow/parquet.py
+++ b/python/pyarrow/parquet.py
@@ -50,7 +50,32 @@ class ParquetFile(object):
     def schema(self):
         return self.metadata.schema
 
-    def read(self, nrows=None, columns=None, nthreads=1):
+    @property
+    def num_row_groups(self):
+        return self.reader.num_row_groups
+
+    def read_row_group(self, i, columns=None, nthreads=1):
+        """
+        Read a single row group from a Parquet file
+
+        Parameters
+        ----------
+        columns: list
+            If not None, only these columns will be read from the row group.
+        nthreads : int, default 1
+            Number of columns to read in parallel. If > 1, requires that the
+            underlying file source is threadsafe
+
+        Returns
+        -------
+        pyarrow.table.Table
+            Content of the row group as a table (of columns)
+        """
+        column_indices = self._get_column_indices(columns)
+        self.reader.set_num_threads(nthreads)
+        return self.reader.read_row_group(i, column_indices=column_indices)
+
+    def read(self, columns=None, nthreads=1):
         """
         Read a Table from Parquet format
 
@@ -67,17 +92,16 @@ class ParquetFile(object):
         pyarrow.table.Table
             Content of the file as a table (of columns)
         """
-        if nrows is not None:
-            raise NotImplementedError("nrows argument")
+        column_indices = self._get_column_indices(columns)
+        self.reader.set_num_threads(nthreads)
+        return self.reader.read_all(column_indices=column_indices)
 
-        if columns is None:
-            column_indices = None
+    def _get_column_indices(self, column_names):
+        if column_names is None:
+            return None
         else:
-            column_indices = [self.reader.column_name_idx(column)
-                              for column in columns]
-
-        return self.reader.read(column_indices=column_indices,
-                                nthreads=nthreads)
+            return [self.reader.column_name_idx(column)
+                    for column in column_names]
 
 
 def read_table(source, columns=None, nthreads=1, metadata=None):
@@ -178,8 +202,8 @@ def read_multiple_files(paths, columns=None, filesystem=None, nthreads=1,
     return all_data
 
 
-def write_table(table, sink, chunk_size=None, version='1.0',
-                use_dictionary=True, compression='snappy'):
+def write_table(table, sink, row_group_size=None, version='1.0',
+                use_dictionary=True, compression='snappy', **kwargs):
     """
     Write a Table to Parquet format
 
@@ -187,7 +211,7 @@ def write_table(table, sink, chunk_size=None, version='1.0',
     ----------
     table : pyarrow.Table
     sink: string or pyarrow.io.NativeFile
-    chunk_size : int, default None
+    row_group_size : int, default None
         The maximum number of rows in each Parquet RowGroup. As a default,
         we will write a single RowGroup per file.
     version : {"1.0", "2.0"}, default "1.0"
@@ -198,7 +222,8 @@ def write_table(table, sink, chunk_size=None, version='1.0',
     compression : str or dict
         Specify the compression codec, either on a general basis or per-column.
     """
+    row_group_size = kwargs.get('chunk_size', row_group_size)
     writer = ParquetWriter(sink, use_dictionary=use_dictionary,
                            compression=compression,
                            version=version)
-    writer.write_table(table, row_group_size=chunk_size)
+    writer.write_table(table, row_group_size=row_group_size)

http://git-wip-us.apache.org/repos/asf/arrow/blob/56f1e91d/python/pyarrow/tests/test_parquet.py
----------------------------------------------------------------------
diff --git a/python/pyarrow/tests/test_parquet.py b/python/pyarrow/tests/test_parquet.py
index b8b2800..86165be 100644
--- a/python/pyarrow/tests/test_parquet.py
+++ b/python/pyarrow/tests/test_parquet.py
@@ -403,6 +403,35 @@ def test_pass_separate_metadata():
 
 
 @parquet
+def test_read_single_row_group():
+    # ARROW-471
+    N, K = 10000, 4
+    df = alltypes_sample(size=N)
+
+    a_table = pa.Table.from_pandas(df, timestamps_to_ms=True)
+
+    buf = io.BytesIO()
+    pq.write_table(a_table, buf, row_group_size=N / K,
+                   compression='snappy', version='2.0')
+
+    buf.seek(0)
+
+    pf = pq.ParquetFile(buf)
+
+    assert pf.num_row_groups == K
+
+    row_groups = [pf.read_row_group(i) for i in range(K)]
+    result = pa.concat_tables(row_groups)
+    pdt.assert_frame_equal(df, result.to_pandas())
+
+    cols = df.columns[:2]
+    row_groups = [pf.read_row_group(i, columns=cols)
+                  for i in range(K)]
+    result = pa.concat_tables(row_groups)
+    pdt.assert_frame_equal(df[cols], result.to_pandas())
+
+
+@parquet
 def test_read_multiple_files(tmpdir):
     nfiles = 10
     size = 5