You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2016/12/20 19:09:39 UTC

arrow git commit: ARROW-434: [Python] Correctly handle Python file objects in Parquet read/write paths

Repository: arrow
Updated Branches:
  refs/heads/master 6ff5fcf1b -> f6bf112cd


ARROW-434: [Python] Correctly handle Python file objects in Parquet read/write paths

While we'd enabled Python file objects for IPC file reader/writer, they hadn't been enabled in the Parquet read/write paths. For example:

```python
with open(filename, 'wb') as f:
    A.parquet.write_table(arrow_table, f, version="1.0")

data = io.BytesIO(open(filename, 'rb').read())

table_read = pq.read_table(data)
```

There was a separate bug reported in ARROW-434, but that's a Parquet type mapping issue, will be fixed in PARQUET-812.

Author: Wes McKinney <we...@twosigma.com>

Closes #247 from wesm/ARROW-434 and squashes the following commits:

c704088 [Wes McKinney] Correctly handle Python file objects in Parquet read/write paths


Project: http://git-wip-us.apache.org/repos/asf/arrow/repo
Commit: http://git-wip-us.apache.org/repos/asf/arrow/commit/f6bf112c
Tree: http://git-wip-us.apache.org/repos/asf/arrow/tree/f6bf112c
Diff: http://git-wip-us.apache.org/repos/asf/arrow/diff/f6bf112c

Branch: refs/heads/master
Commit: f6bf112cd22eeb03725dff79a28c205324fa4f45
Parents: 6ff5fcf
Author: Wes McKinney <we...@twosigma.com>
Authored: Tue Dec 20 14:09:32 2016 -0500
Committer: Wes McKinney <we...@twosigma.com>
Committed: Tue Dec 20 14:09:32 2016 -0500

----------------------------------------------------------------------
 python/pyarrow/io.pxd                |  3 ++
 python/pyarrow/io.pyx                | 40 +++++++++++++++++++++++++
 python/pyarrow/ipc.pyx               | 43 +--------------------------
 python/pyarrow/parquet.pyx           | 49 +++++++++++++++++--------------
 python/pyarrow/tests/test_parquet.py | 36 +++++++++++++++++++++--
 5 files changed, 104 insertions(+), 67 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/arrow/blob/f6bf112c/python/pyarrow/io.pxd
----------------------------------------------------------------------
diff --git a/python/pyarrow/io.pxd b/python/pyarrow/io.pxd
index d6966cd..02265d0 100644
--- a/python/pyarrow/io.pxd
+++ b/python/pyarrow/io.pxd
@@ -42,3 +42,6 @@ cdef class NativeFile:
     # suite of Arrow C++ libraries
     cdef read_handle(self, shared_ptr[ReadableFileInterface]* file)
     cdef write_handle(self, shared_ptr[OutputStream]* file)
+
+cdef get_reader(object source, shared_ptr[ReadableFileInterface]* reader)
+cdef get_writer(object source, shared_ptr[OutputStream]* writer)

http://git-wip-us.apache.org/repos/asf/arrow/blob/f6bf112c/python/pyarrow/io.pyx
----------------------------------------------------------------------
diff --git a/python/pyarrow/io.pyx b/python/pyarrow/io.pyx
index 6b0e392..8491aa8 100644
--- a/python/pyarrow/io.pyx
+++ b/python/pyarrow/io.pyx
@@ -256,6 +256,46 @@ def buffer_from_bytes(object obj):
     result.init(buf)
     return result
 
+cdef get_reader(object source, shared_ptr[ReadableFileInterface]* reader):
+    cdef NativeFile nf
+
+    if isinstance(source, bytes):
+        source = BytesReader(source)
+    elif not isinstance(source, NativeFile) and hasattr(source, 'read'):
+        # Optimistically hope this is file-like
+        source = PythonFileInterface(source, mode='r')
+
+    if isinstance(source, NativeFile):
+        nf = source
+
+        # TODO: what about read-write sources (e.g. memory maps)
+        if not nf.is_readonly:
+            raise IOError('Native file is not readable')
+
+        nf.read_handle(reader)
+    else:
+        raise TypeError('Unable to read from object of type: {0}'
+                        .format(type(source)))
+
+
+cdef get_writer(object source, shared_ptr[OutputStream]* writer):
+    cdef NativeFile nf
+
+    if not isinstance(source, NativeFile) and hasattr(source, 'write'):
+        # Optimistically hope this is file-like
+        source = PythonFileInterface(source, mode='w')
+
+    if isinstance(source, NativeFile):
+        nf = source
+
+        if nf.is_readonly:
+            raise IOError('Native file is not writeable')
+
+        nf.write_handle(writer)
+    else:
+        raise TypeError('Unable to read from object of type: {0}'
+                        .format(type(source)))
+
 # ----------------------------------------------------------------------
 # HDFS IO implementation
 

http://git-wip-us.apache.org/repos/asf/arrow/blob/f6bf112c/python/pyarrow/ipc.pyx
----------------------------------------------------------------------
diff --git a/python/pyarrow/ipc.pyx b/python/pyarrow/ipc.pyx
index 46deb5a..abc5e1b 100644
--- a/python/pyarrow/ipc.pyx
+++ b/python/pyarrow/ipc.pyx
@@ -27,7 +27,7 @@ from pyarrow.includes.libarrow_ipc cimport *
 cimport pyarrow.includes.pyarrow as pyarrow
 
 from pyarrow.error cimport check_status
-from pyarrow.io cimport NativeFile
+from pyarrow.io cimport NativeFile, get_reader, get_writer
 from pyarrow.schema cimport Schema
 from pyarrow.table cimport RecordBatch
 
@@ -37,47 +37,6 @@ import pyarrow.io as io
 cimport cpython as cp
 
 
-cdef get_reader(source, shared_ptr[ReadableFileInterface]* reader):
-    cdef NativeFile nf
-
-    if isinstance(source, bytes):
-        source = io.BytesReader(source)
-    elif not isinstance(source, io.NativeFile) and hasattr(source, 'read'):
-        # Optimistically hope this is file-like
-        source = io.PythonFileInterface(source, mode='r')
-
-    if isinstance(source, NativeFile):
-        nf = source
-
-        # TODO: what about read-write sources (e.g. memory maps)
-        if not nf.is_readonly:
-            raise IOError('Native file is not readable')
-
-        nf.read_handle(reader)
-    else:
-        raise TypeError('Unable to read from object of type: {0}'
-                        .format(type(source)))
-
-
-cdef get_writer(source, shared_ptr[OutputStream]* writer):
-    cdef NativeFile nf
-
-    if not isinstance(source, io.NativeFile) and hasattr(source, 'write'):
-        # Optimistically hope this is file-like
-        source = io.PythonFileInterface(source, mode='w')
-
-    if isinstance(source, io.NativeFile):
-        nf = source
-
-        if nf.is_readonly:
-            raise IOError('Native file is not writeable')
-
-        nf.write_handle(writer)
-    else:
-        raise TypeError('Unable to read from object of type: {0}'
-                        .format(type(source)))
-
-
 cdef class ArrowFileWriter:
     cdef:
         shared_ptr[CFileWriter] writer

http://git-wip-us.apache.org/repos/asf/arrow/blob/f6bf112c/python/pyarrow/parquet.pyx
----------------------------------------------------------------------
diff --git a/python/pyarrow/parquet.pyx b/python/pyarrow/parquet.pyx
index 83fddb2..043ccf1 100644
--- a/python/pyarrow/parquet.pyx
+++ b/python/pyarrow/parquet.pyx
@@ -31,7 +31,7 @@ from pyarrow.error cimport check_status
 from pyarrow.io import NativeFile
 from pyarrow.table cimport Table
 
-from pyarrow.io cimport NativeFile
+from pyarrow.io cimport NativeFile, get_reader, get_writer
 
 import six
 
@@ -49,22 +49,27 @@ cdef class ParquetReader:
     def __cinit__(self):
         self.allocator.set_pool(default_memory_pool())
 
-    cdef open_local_file(self, file_path):
-        cdef c_string path = tobytes(file_path)
+    def open(self, source):
+        self._open(source)
 
-        # Must be in one expression to avoid calling std::move which is not
-        # possible in Cython (due to missing rvalue support)
+    cdef _open(self, object source):
+        cdef:
+            shared_ptr[ReadableFileInterface] rd_handle
+            c_string path
 
-        # TODO(wesm): ParquetFileReader::OpenFIle can throw?
-        self.reader = unique_ptr[FileReader](
-            new FileReader(default_memory_pool(),
-                           ParquetFileReader.OpenFile(path)))
+        if isinstance(source, six.string_types):
+            path = tobytes(source)
 
-    cdef open_native_file(self, NativeFile file):
-        cdef shared_ptr[ReadableFileInterface] cpp_handle
-        file.read_handle(&cpp_handle)
+            # Must be in one expression to avoid calling std::move which is not
+            # possible in Cython (due to missing rvalue support)
 
-        check_status(OpenFile(cpp_handle, &self.allocator, &self.reader))
+            # TODO(wesm): ParquetFileReader::OpenFile can throw?
+            self.reader = unique_ptr[FileReader](
+                new FileReader(default_memory_pool(),
+                               ParquetFileReader.OpenFile(path)))
+        else:
+            get_reader(source, &rd_handle)
+            check_status(OpenFile(rd_handle, &self.allocator, &self.reader))
 
     def read_all(self):
         cdef:
@@ -137,11 +142,7 @@ def read_table(source, columns=None):
         Content of the file as a table (of columns)
     """
     cdef ParquetReader reader = ParquetReader()
-
-    if isinstance(source, six.string_types):
-        reader.open_local_file(source)
-    elif isinstance(source, NativeFile):
-        reader.open_native_file(source)
+    reader._open(source)
 
     if columns is None:
         return reader.read_all()
@@ -174,7 +175,10 @@ def write_table(table, sink, chunk_size=None, version=None,
     cdef Table table_ = table
     cdef CTable* ctable_ = table_.table
     cdef shared_ptr[ParquetWriteSink] sink_
+
     cdef shared_ptr[FileOutputStream] filesink_
+    cdef shared_ptr[OutputStream] general_sink
+
     cdef WriterProperties.Builder properties_builder
     cdef int64_t chunk_size_ = 0
     if chunk_size is None:
@@ -232,10 +236,11 @@ def write_table(table, sink, chunk_size=None, version=None,
                 raise ArrowException("Unsupport compression codec")
 
     if isinstance(sink, six.string_types):
-       check_status(FileOutputStream.Open(tobytes(sink), &filesink_))
-       sink_.reset(new ParquetWriteSink(<shared_ptr[OutputStream]>filesink_))
-    elif isinstance(sink, NativeFile):
-        sink_.reset(new ParquetWriteSink((<NativeFile>sink).wr_file))
+        check_status(FileOutputStream.Open(tobytes(sink), &filesink_))
+        sink_.reset(new ParquetWriteSink(<shared_ptr[OutputStream]>filesink_))
+    else:
+        get_writer(sink, &general_sink)
+        sink_.reset(new ParquetWriteSink(general_sink))
 
     with nogil:
         check_status(WriteFlatTable(ctable_, default_memory_pool(), sink_,

http://git-wip-us.apache.org/repos/asf/arrow/blob/f6bf112c/python/pyarrow/tests/test_parquet.py
----------------------------------------------------------------------
diff --git a/python/pyarrow/tests/test_parquet.py b/python/pyarrow/tests/test_parquet.py
index 841830f..7c45732 100644
--- a/python/pyarrow/tests/test_parquet.py
+++ b/python/pyarrow/tests/test_parquet.py
@@ -15,6 +15,7 @@
 # specific language governing permissions and limitations
 # under the License.
 
+import io
 import pytest
 
 import pyarrow as A
@@ -132,9 +133,8 @@ def test_pandas_column_selection(tmpdir):
 
     pdt.assert_frame_equal(df[['uint8']], df_read)
 
-@parquet
-def test_pandas_parquet_native_file_roundtrip(tmpdir):
-    size = 10000
+
+def _test_dataframe(size=10000):
     np.random.seed(0)
     df = pd.DataFrame({
         'uint8': np.arange(size, dtype=np.uint8),
@@ -149,6 +149,12 @@ def test_pandas_parquet_native_file_roundtrip(tmpdir):
         'float64': np.arange(size, dtype=np.float64),
         'bool': np.random.randn(size) > 0
     })
+    return df
+
+
+@parquet
+def test_pandas_parquet_native_file_roundtrip(tmpdir):
+    df = _test_dataframe(10000)
     arrow_table = A.from_pandas_dataframe(df)
     imos = paio.InMemoryOutputStream()
     pq.write_table(arrow_table, imos, version="2.0")
@@ -159,6 +165,30 @@ def test_pandas_parquet_native_file_roundtrip(tmpdir):
 
 
 @parquet
+def test_pandas_parquet_pyfile_roundtrip(tmpdir):
+    filename = tmpdir.join('pandas_pyfile_roundtrip.parquet').strpath
+    size = 5
+    df = pd.DataFrame({
+        'int64': np.arange(size, dtype=np.int64),
+        'float32': np.arange(size, dtype=np.float32),
+        'float64': np.arange(size, dtype=np.float64),
+        'bool': np.random.randn(size) > 0,
+        'strings': ['foo', 'bar', None, 'baz', 'qux']
+    })
+
+    arrow_table = A.from_pandas_dataframe(df)
+
+    with open(filename, 'wb') as f:
+        A.parquet.write_table(arrow_table, f, version="1.0")
+
+    data = io.BytesIO(open(filename, 'rb').read())
+
+    table_read = pq.read_table(data)
+    df_read = table_read.to_pandas()
+    pdt.assert_frame_equal(df, df_read)
+
+
+@parquet
 def test_pandas_parquet_configuration_options(tmpdir):
     size = 10000
     np.random.seed(0)