You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2016/12/20 19:09:39 UTC
arrow git commit: ARROW-434: [Python] Correctly handle Python file
objects in Parquet read/write paths
Repository: arrow
Updated Branches:
refs/heads/master 6ff5fcf1b -> f6bf112cd
ARROW-434: [Python] Correctly handle Python file objects in Parquet read/write paths
While we'd enabled Python file objects for IPC file reader/writer, they hadn't been enabled in the Parquet read/write paths. For example:
```python
with open(filename, 'wb') as f:
A.parquet.write_table(arrow_table, f, version="1.0")
data = io.BytesIO(open(filename, 'rb').read())
table_read = pq.read_table(data)
```
There was a separate bug reported in ARROW-434, but that's a Parquet type mapping issue, will be fixed in PARQUET-812.
Author: Wes McKinney <we...@twosigma.com>
Closes #247 from wesm/ARROW-434 and squashes the following commits:
c704088 [Wes McKinney] Correctly handle Python file objects in Parquet read/write paths
Project: http://git-wip-us.apache.org/repos/asf/arrow/repo
Commit: http://git-wip-us.apache.org/repos/asf/arrow/commit/f6bf112c
Tree: http://git-wip-us.apache.org/repos/asf/arrow/tree/f6bf112c
Diff: http://git-wip-us.apache.org/repos/asf/arrow/diff/f6bf112c
Branch: refs/heads/master
Commit: f6bf112cd22eeb03725dff79a28c205324fa4f45
Parents: 6ff5fcf
Author: Wes McKinney <we...@twosigma.com>
Authored: Tue Dec 20 14:09:32 2016 -0500
Committer: Wes McKinney <we...@twosigma.com>
Committed: Tue Dec 20 14:09:32 2016 -0500
----------------------------------------------------------------------
python/pyarrow/io.pxd | 3 ++
python/pyarrow/io.pyx | 40 +++++++++++++++++++++++++
python/pyarrow/ipc.pyx | 43 +--------------------------
python/pyarrow/parquet.pyx | 49 +++++++++++++++++--------------
python/pyarrow/tests/test_parquet.py | 36 +++++++++++++++++++++--
5 files changed, 104 insertions(+), 67 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/arrow/blob/f6bf112c/python/pyarrow/io.pxd
----------------------------------------------------------------------
diff --git a/python/pyarrow/io.pxd b/python/pyarrow/io.pxd
index d6966cd..02265d0 100644
--- a/python/pyarrow/io.pxd
+++ b/python/pyarrow/io.pxd
@@ -42,3 +42,6 @@ cdef class NativeFile:
# suite of Arrow C++ libraries
cdef read_handle(self, shared_ptr[ReadableFileInterface]* file)
cdef write_handle(self, shared_ptr[OutputStream]* file)
+
+cdef get_reader(object source, shared_ptr[ReadableFileInterface]* reader)
+cdef get_writer(object source, shared_ptr[OutputStream]* writer)
http://git-wip-us.apache.org/repos/asf/arrow/blob/f6bf112c/python/pyarrow/io.pyx
----------------------------------------------------------------------
diff --git a/python/pyarrow/io.pyx b/python/pyarrow/io.pyx
index 6b0e392..8491aa8 100644
--- a/python/pyarrow/io.pyx
+++ b/python/pyarrow/io.pyx
@@ -256,6 +256,46 @@ def buffer_from_bytes(object obj):
result.init(buf)
return result
+cdef get_reader(object source, shared_ptr[ReadableFileInterface]* reader):
+ cdef NativeFile nf
+
+ if isinstance(source, bytes):
+ source = BytesReader(source)
+ elif not isinstance(source, NativeFile) and hasattr(source, 'read'):
+ # Optimistically hope this is file-like
+ source = PythonFileInterface(source, mode='r')
+
+ if isinstance(source, NativeFile):
+ nf = source
+
+ # TODO: what about read-write sources (e.g. memory maps)
+ if not nf.is_readonly:
+ raise IOError('Native file is not readable')
+
+ nf.read_handle(reader)
+ else:
+ raise TypeError('Unable to read from object of type: {0}'
+ .format(type(source)))
+
+
+cdef get_writer(object source, shared_ptr[OutputStream]* writer):
+ cdef NativeFile nf
+
+ if not isinstance(source, NativeFile) and hasattr(source, 'write'):
+ # Optimistically hope this is file-like
+ source = PythonFileInterface(source, mode='w')
+
+ if isinstance(source, NativeFile):
+ nf = source
+
+ if nf.is_readonly:
+ raise IOError('Native file is not writeable')
+
+ nf.write_handle(writer)
+ else:
+ raise TypeError('Unable to read from object of type: {0}'
+ .format(type(source)))
+
# ----------------------------------------------------------------------
# HDFS IO implementation
http://git-wip-us.apache.org/repos/asf/arrow/blob/f6bf112c/python/pyarrow/ipc.pyx
----------------------------------------------------------------------
diff --git a/python/pyarrow/ipc.pyx b/python/pyarrow/ipc.pyx
index 46deb5a..abc5e1b 100644
--- a/python/pyarrow/ipc.pyx
+++ b/python/pyarrow/ipc.pyx
@@ -27,7 +27,7 @@ from pyarrow.includes.libarrow_ipc cimport *
cimport pyarrow.includes.pyarrow as pyarrow
from pyarrow.error cimport check_status
-from pyarrow.io cimport NativeFile
+from pyarrow.io cimport NativeFile, get_reader, get_writer
from pyarrow.schema cimport Schema
from pyarrow.table cimport RecordBatch
@@ -37,47 +37,6 @@ import pyarrow.io as io
cimport cpython as cp
-cdef get_reader(source, shared_ptr[ReadableFileInterface]* reader):
- cdef NativeFile nf
-
- if isinstance(source, bytes):
- source = io.BytesReader(source)
- elif not isinstance(source, io.NativeFile) and hasattr(source, 'read'):
- # Optimistically hope this is file-like
- source = io.PythonFileInterface(source, mode='r')
-
- if isinstance(source, NativeFile):
- nf = source
-
- # TODO: what about read-write sources (e.g. memory maps)
- if not nf.is_readonly:
- raise IOError('Native file is not readable')
-
- nf.read_handle(reader)
- else:
- raise TypeError('Unable to read from object of type: {0}'
- .format(type(source)))
-
-
-cdef get_writer(source, shared_ptr[OutputStream]* writer):
- cdef NativeFile nf
-
- if not isinstance(source, io.NativeFile) and hasattr(source, 'write'):
- # Optimistically hope this is file-like
- source = io.PythonFileInterface(source, mode='w')
-
- if isinstance(source, io.NativeFile):
- nf = source
-
- if nf.is_readonly:
- raise IOError('Native file is not writeable')
-
- nf.write_handle(writer)
- else:
- raise TypeError('Unable to read from object of type: {0}'
- .format(type(source)))
-
-
cdef class ArrowFileWriter:
cdef:
shared_ptr[CFileWriter] writer
http://git-wip-us.apache.org/repos/asf/arrow/blob/f6bf112c/python/pyarrow/parquet.pyx
----------------------------------------------------------------------
diff --git a/python/pyarrow/parquet.pyx b/python/pyarrow/parquet.pyx
index 83fddb2..043ccf1 100644
--- a/python/pyarrow/parquet.pyx
+++ b/python/pyarrow/parquet.pyx
@@ -31,7 +31,7 @@ from pyarrow.error cimport check_status
from pyarrow.io import NativeFile
from pyarrow.table cimport Table
-from pyarrow.io cimport NativeFile
+from pyarrow.io cimport NativeFile, get_reader, get_writer
import six
@@ -49,22 +49,27 @@ cdef class ParquetReader:
def __cinit__(self):
self.allocator.set_pool(default_memory_pool())
- cdef open_local_file(self, file_path):
- cdef c_string path = tobytes(file_path)
+ def open(self, source):
+ self._open(source)
- # Must be in one expression to avoid calling std::move which is not
- # possible in Cython (due to missing rvalue support)
+ cdef _open(self, object source):
+ cdef:
+ shared_ptr[ReadableFileInterface] rd_handle
+ c_string path
- # TODO(wesm): ParquetFileReader::OpenFIle can throw?
- self.reader = unique_ptr[FileReader](
- new FileReader(default_memory_pool(),
- ParquetFileReader.OpenFile(path)))
+ if isinstance(source, six.string_types):
+ path = tobytes(source)
- cdef open_native_file(self, NativeFile file):
- cdef shared_ptr[ReadableFileInterface] cpp_handle
- file.read_handle(&cpp_handle)
+ # Must be in one expression to avoid calling std::move which is not
+ # possible in Cython (due to missing rvalue support)
- check_status(OpenFile(cpp_handle, &self.allocator, &self.reader))
+ # TODO(wesm): ParquetFileReader::OpenFile can throw?
+ self.reader = unique_ptr[FileReader](
+ new FileReader(default_memory_pool(),
+ ParquetFileReader.OpenFile(path)))
+ else:
+ get_reader(source, &rd_handle)
+ check_status(OpenFile(rd_handle, &self.allocator, &self.reader))
def read_all(self):
cdef:
@@ -137,11 +142,7 @@ def read_table(source, columns=None):
Content of the file as a table (of columns)
"""
cdef ParquetReader reader = ParquetReader()
-
- if isinstance(source, six.string_types):
- reader.open_local_file(source)
- elif isinstance(source, NativeFile):
- reader.open_native_file(source)
+ reader._open(source)
if columns is None:
return reader.read_all()
@@ -174,7 +175,10 @@ def write_table(table, sink, chunk_size=None, version=None,
cdef Table table_ = table
cdef CTable* ctable_ = table_.table
cdef shared_ptr[ParquetWriteSink] sink_
+
cdef shared_ptr[FileOutputStream] filesink_
+ cdef shared_ptr[OutputStream] general_sink
+
cdef WriterProperties.Builder properties_builder
cdef int64_t chunk_size_ = 0
if chunk_size is None:
@@ -232,10 +236,11 @@ def write_table(table, sink, chunk_size=None, version=None,
raise ArrowException("Unsupport compression codec")
if isinstance(sink, six.string_types):
- check_status(FileOutputStream.Open(tobytes(sink), &filesink_))
- sink_.reset(new ParquetWriteSink(<shared_ptr[OutputStream]>filesink_))
- elif isinstance(sink, NativeFile):
- sink_.reset(new ParquetWriteSink((<NativeFile>sink).wr_file))
+ check_status(FileOutputStream.Open(tobytes(sink), &filesink_))
+ sink_.reset(new ParquetWriteSink(<shared_ptr[OutputStream]>filesink_))
+ else:
+ get_writer(sink, &general_sink)
+ sink_.reset(new ParquetWriteSink(general_sink))
with nogil:
check_status(WriteFlatTable(ctable_, default_memory_pool(), sink_,
http://git-wip-us.apache.org/repos/asf/arrow/blob/f6bf112c/python/pyarrow/tests/test_parquet.py
----------------------------------------------------------------------
diff --git a/python/pyarrow/tests/test_parquet.py b/python/pyarrow/tests/test_parquet.py
index 841830f..7c45732 100644
--- a/python/pyarrow/tests/test_parquet.py
+++ b/python/pyarrow/tests/test_parquet.py
@@ -15,6 +15,7 @@
# specific language governing permissions and limitations
# under the License.
+import io
import pytest
import pyarrow as A
@@ -132,9 +133,8 @@ def test_pandas_column_selection(tmpdir):
pdt.assert_frame_equal(df[['uint8']], df_read)
-@parquet
-def test_pandas_parquet_native_file_roundtrip(tmpdir):
- size = 10000
+
+def _test_dataframe(size=10000):
np.random.seed(0)
df = pd.DataFrame({
'uint8': np.arange(size, dtype=np.uint8),
@@ -149,6 +149,12 @@ def test_pandas_parquet_native_file_roundtrip(tmpdir):
'float64': np.arange(size, dtype=np.float64),
'bool': np.random.randn(size) > 0
})
+ return df
+
+
+@parquet
+def test_pandas_parquet_native_file_roundtrip(tmpdir):
+ df = _test_dataframe(10000)
arrow_table = A.from_pandas_dataframe(df)
imos = paio.InMemoryOutputStream()
pq.write_table(arrow_table, imos, version="2.0")
@@ -159,6 +165,30 @@ def test_pandas_parquet_native_file_roundtrip(tmpdir):
@parquet
+def test_pandas_parquet_pyfile_roundtrip(tmpdir):
+ filename = tmpdir.join('pandas_pyfile_roundtrip.parquet').strpath
+ size = 5
+ df = pd.DataFrame({
+ 'int64': np.arange(size, dtype=np.int64),
+ 'float32': np.arange(size, dtype=np.float32),
+ 'float64': np.arange(size, dtype=np.float64),
+ 'bool': np.random.randn(size) > 0,
+ 'strings': ['foo', 'bar', None, 'baz', 'qux']
+ })
+
+ arrow_table = A.from_pandas_dataframe(df)
+
+ with open(filename, 'wb') as f:
+ A.parquet.write_table(arrow_table, f, version="1.0")
+
+ data = io.BytesIO(open(filename, 'rb').read())
+
+ table_read = pq.read_table(data)
+ df_read = table_read.to_pandas()
+ pdt.assert_frame_equal(df, df_read)
+
+
+@parquet
def test_pandas_parquet_configuration_options(tmpdir):
size = 10000
np.random.seed(0)