You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by uw...@apache.org on 2017/01/27 09:46:50 UTC

arrow git commit: ARROW-515: [Python] Add read_all methods to FileReader, StreamReader

Repository: arrow
Updated Branches:
  refs/heads/master 30bb0d97d -> 4226adfbc


ARROW-515: [Python] Add read_all methods to FileReader, StreamReader

Stacked on top of ARROW-514

Author: Wes McKinney <we...@twosigma.com>

Closes #307 from wesm/ARROW-515 and squashes the following commits:

6f2185c [Wes McKinney] Add read_all method to StreamReader, FileReader


Project: http://git-wip-us.apache.org/repos/asf/arrow/repo
Commit: http://git-wip-us.apache.org/repos/asf/arrow/commit/4226adfb
Tree: http://git-wip-us.apache.org/repos/asf/arrow/tree/4226adfb
Diff: http://git-wip-us.apache.org/repos/asf/arrow/diff/4226adfb

Branch: refs/heads/master
Commit: 4226adfbc6b3dff10b3fe7a6691b30bcc94140bd
Parents: 30bb0d9
Author: Wes McKinney <we...@twosigma.com>
Authored: Fri Jan 27 10:46:34 2017 +0100
Committer: Uwe L. Korn <uw...@xhochy.com>
Committed: Fri Jan 27 10:46:34 2017 +0100

----------------------------------------------------------------------
 python/pyarrow/io.pyx            | 44 ++++++++++++++++++++++++++++++++++-
 python/pyarrow/table.pyx         |  4 +---
 python/pyarrow/tests/test_ipc.py | 19 +++++++++++++++
 3 files changed, 63 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/arrow/blob/4226adfb/python/pyarrow/io.pyx
----------------------------------------------------------------------
diff --git a/python/pyarrow/io.pyx b/python/pyarrow/io.pyx
index e5f8b7a..8b56508 100644
--- a/python/pyarrow/io.pyx
+++ b/python/pyarrow/io.pyx
@@ -34,7 +34,8 @@ cimport pyarrow.includes.pyarrow as pyarrow
 from pyarrow.compat import frombytes, tobytes, encode_file_path
 from pyarrow.error cimport check_status
 from pyarrow.schema cimport Schema
-from pyarrow.table cimport RecordBatch, batch_from_cbatch
+from pyarrow.table cimport (RecordBatch, batch_from_cbatch,
+                            table_from_ctable)
 
 cimport cpython as cp
 
@@ -936,6 +937,27 @@ cdef class _StreamReader:
 
         return batch_from_cbatch(batch)
 
+    def read_all(self):
+        """
+        Read all record batches as a pyarrow.Table
+        """
+        cdef:
+            vector[shared_ptr[CRecordBatch]] batches
+            shared_ptr[CRecordBatch] batch
+            shared_ptr[CTable] table
+            c_string name = b''
+
+        with nogil:
+            while True:
+                check_status(self.reader.get().GetNextRecordBatch(&batch))
+                if batch.get() == NULL:
+                    break
+                batches.push_back(batch)
+
+            check_status(CTable.FromRecordBatches(name, batches, &table))
+
+        return table_from_ctable(table)
+
 
 cdef class _FileWriter(_StreamWriter):
 
@@ -997,3 +1019,23 @@ cdef class _FileReader:
     # TODO(wesm): ARROW-503: Function was renamed. Remove after a period of
     # time has passed
     get_record_batch = get_batch
+
+    def read_all(self):
+        """
+        Read all record batches as a pyarrow.Table
+        """
+        cdef:
+            vector[shared_ptr[CRecordBatch]] batches
+            shared_ptr[CTable] table
+            c_string name = b''
+            int i, nbatches
+
+        nbatches = self.num_record_batches
+
+        batches.resize(nbatches)
+        with nogil:
+            for i in range(nbatches):
+                check_status(self.reader.get().GetRecordBatch(i, &batches[i]))
+            check_status(CTable.FromRecordBatches(name, batches, &table))
+
+        return table_from_ctable(table)

http://git-wip-us.apache.org/repos/asf/arrow/blob/4226adfb/python/pyarrow/table.pyx
----------------------------------------------------------------------
diff --git a/python/pyarrow/table.pyx b/python/pyarrow/table.pyx
index 9242330..1707210 100644
--- a/python/pyarrow/table.pyx
+++ b/python/pyarrow/table.pyx
@@ -690,9 +690,7 @@ cdef class Table:
         with nogil:
             check_status(CTable.FromRecordBatches(c_name, c_batches, &c_table))
 
-        table = Table()
-        table.init(c_table)
-        return table
+        return table_from_ctable(c_table)
 
     def to_pandas(self, nthreads=None):
         """

http://git-wip-us.apache.org/repos/asf/arrow/blob/4226adfb/python/pyarrow/tests/test_ipc.py
----------------------------------------------------------------------
diff --git a/python/pyarrow/tests/test_ipc.py b/python/pyarrow/tests/test_ipc.py
index 8ca464f..665a63b 100644
--- a/python/pyarrow/tests/test_ipc.py
+++ b/python/pyarrow/tests/test_ipc.py
@@ -83,6 +83,16 @@ class TestFile(MessagingTest, unittest.TestCase):
             batch = reader.get_batch(i)
             assert batches[i].equals(batch)
 
+    def test_read_all(self):
+        batches = self.write_batches()
+        file_contents = self._get_source()
+
+        reader = pa.FileReader(file_contents)
+
+        result = reader.read_all()
+        expected = pa.Table.from_batches(batches)
+        assert result.equals(expected)
+
 
 class TestStream(MessagingTest, unittest.TestCase):
 
@@ -104,6 +114,15 @@ class TestStream(MessagingTest, unittest.TestCase):
         with pytest.raises(StopIteration):
             reader.get_next_batch()
 
+    def test_read_all(self):
+        batches = self.write_batches()
+        file_contents = self._get_source()
+        reader = pa.StreamReader(file_contents)
+
+        result = reader.read_all()
+        expected = pa.Table.from_batches(batches)
+        assert result.equals(expected)
+
 
 class TestInMemoryFile(TestFile):