You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2020/06/27 01:52:43 UTC

[GitHub] [arrow] wjones1 commented on a change in pull request #6979: ARROW-7800 [Python] implement iter_batches() method for ParquetFile and ParquetReader

wjones1 commented on a change in pull request #6979:
URL: https://github.com/apache/arrow/pull/6979#discussion_r446469923



##########
File path: python/pyarrow/parquet.py
##########
@@ -310,6 +310,44 @@ def read_row_groups(self, row_groups, columns=None, use_threads=True,
                                            column_indices=column_indices,
                                            use_threads=use_threads)
 
+    def iter_batches(self, batch_size, row_groups=None, columns=None,

Review comment:
       Agreed. I added the same default value for `iter_batches()`

##########
File path: python/pyarrow/_parquet.pyx
##########
@@ -1077,6 +1078,54 @@ cdef class ParquetReader:
     def set_use_threads(self, bint use_threads):
         self.reader.get().set_use_threads(use_threads)
 
+    def set_batch_size(self, int64_t batch_size):
+        self.reader.get().set_batch_size(batch_size)
+
+    def iter_batches(self, int64_t batch_size, row_groups, column_indices=None,
+                     bint use_threads=True):
+        cdef:
+            vector[int] c_row_groups
+            vector[int] c_column_indices
+            shared_ptr[CRecordBatch] record_batch
+            shared_ptr[TableBatchReader] batch_reader
+            unique_ptr[CRecordBatchReader] recordbatchreader
+
+        self.set_batch_size(batch_size)
+
+        if use_threads:
+            self.set_use_threads(use_threads)
+
+        for row_group in row_groups:
+            c_row_groups.push_back(row_group)
+
+        if column_indices is not None:
+            for index in column_indices:
+                c_column_indices.push_back(index)
+            check_status(
+                self.reader.get().GetRecordBatchReader(
+                    c_row_groups, c_column_indices, &recordbatchreader
+                )
+            )
+        else:
+            check_status(
+                self.reader.get().GetRecordBatchReader(
+                    c_row_groups, &recordbatchreader
+                )
+            )
+
+        while True:
+            with nogil:
+                check_status(
+                    recordbatchreader.get().ReadNext(&record_batch)
+                )
+
+            if record_batch.get() == NULL:
+                break
+
+            py_record_batch = pyarrow_wrap_batch(record_batch)

Review comment:
       👍 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org