You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2020/10/08 16:48:10 UTC

[GitHub] [arrow] wesm commented on a change in pull request #6979: ARROW-7800 [Python] implement iter_batches() method for ParquetFile and ParquetReader

wesm commented on a change in pull request #6979:
URL: https://github.com/apache/arrow/pull/6979#discussion_r501865414



##########
File path: python/pyarrow/parquet.py
##########
@@ -301,6 +301,45 @@ def read_row_groups(self, row_groups, columns=None, use_threads=True,
                                            column_indices=column_indices,
                                            use_threads=use_threads)
 
+    def iter_batches(self, batch_size=65536, row_groups=None, columns=None,
+                     use_threads=True, use_pandas_metadata=False):
+        """
+        Read streaming batches from a Parquet file
+
+        Parameters
+        ----------
+        batch_size: int, default 64K
+            Maximum number of records to yield per batch. Batches may be
+            smaller if there aren't enough rows in a rowgroup.
+        row_groups: list
+            Only these row groups will be read from the file.
+        columns: list
+            If not None, only these columns will be read from the file. A
+            column name may be a prefix of a nested field, e.g. 'a' will select
+            'a.b', 'a.c', and 'a.d.e'
+        use_threads : boolean, default True
+            Perform multi-threaded column reads
+        use_pandas_metadata : boolean, default False
+            If True and file has custom pandas schema metadata, ensure that
+            index columns are also loaded
+
+        Returns
+        -------
+        iterator of pyarrow.RecordBatch
+            Contents of each batch as a record batch
+        """
+        if row_groups is None:
+            row_groups = range(0, self.metadata.num_row_groups)
+        column_indices = self._get_column_indices(
+            columns, use_pandas_metadata=use_pandas_metadata)
+
+        batches = self.reader.iter_batches(batch_size,
+                                           row_groups=row_groups,
+                                           column_indices=column_indices,
+                                           use_threads=use_threads)
+        for batch in batches:
+            yield batch

Review comment:
       Doesn't `iter_batches` return a generator, so you can just `return self.reader.iter_batches(...)`? 

##########
File path: python/pyarrow/_parquet.pyx
##########
@@ -1074,6 +1075,52 @@ cdef class ParquetReader(_Weakrefable):
     def set_use_threads(self, bint use_threads):
         self.reader.get().set_use_threads(use_threads)
 
+    def set_batch_size(self, int64_t batch_size):
+        self.reader.get().set_batch_size(batch_size)
+
+    def iter_batches(self, int64_t batch_size, row_groups, column_indices=None,
+                     bint use_threads=True):
+        cdef:
+            vector[int] c_row_groups
+            vector[int] c_column_indices
+            shared_ptr[CRecordBatch] record_batch
+            shared_ptr[TableBatchReader] batch_reader
+            unique_ptr[CRecordBatchReader] recordbatchreader
+
+        self.set_batch_size(batch_size)
+
+        if use_threads:
+            self.set_use_threads(use_threads)
+
+        for row_group in row_groups:
+            c_row_groups.push_back(row_group)
+
+        if column_indices is not None:
+            for index in column_indices:
+                c_column_indices.push_back(index)
+            check_status(
+                self.reader.get().GetRecordBatchReader(
+                    c_row_groups, c_column_indices, &recordbatchreader
+                )
+            )
+        else:
+            check_status(
+                self.reader.get().GetRecordBatchReader(
+                    c_row_groups, &recordbatchreader
+                )
+            )

Review comment:
       These function calls do non-trivial computing -- need to release the GIL




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org