You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2016/12/02 16:48:31 UTC
arrow git commit: ARROW-389: Python: Write Parquet files to
pyarrow.io.NativeFile objects
Repository: arrow
Updated Branches:
refs/heads/master 33c731dbd -> 06be7aed0
ARROW-389: Python: Write Parquet files to pyarrow.io.NativeFile objects
Author: Uwe L. Korn <uw...@xhochy.com>
Closes #214 from xhochy/ARROW-389 and squashes the following commits:
e66c895 [Uwe L. Korn] Switch image to deprecated group
876cd65 [Uwe L. Korn] ARROW-389: Python: Write Parquet files to pyarrow.io.NativeFile objects
Project: http://git-wip-us.apache.org/repos/asf/arrow/repo
Commit: http://git-wip-us.apache.org/repos/asf/arrow/commit/06be7aed
Tree: http://git-wip-us.apache.org/repos/asf/arrow/tree/06be7aed
Diff: http://git-wip-us.apache.org/repos/asf/arrow/diff/06be7aed
Branch: refs/heads/master
Commit: 06be7aed062aca32b683f2ab3a94a201ae54b4f3
Parents: 33c731d
Author: Uwe L. Korn <uw...@xhochy.com>
Authored: Fri Dec 2 11:48:24 2016 -0500
Committer: Wes McKinney <we...@twosigma.com>
Committed: Fri Dec 2 11:48:24 2016 -0500
----------------------------------------------------------------------
.travis.yml | 1 +
python/pyarrow/includes/parquet.pxd | 7 +++++--
python/pyarrow/parquet.pyx | 18 ++++++++++++------
python/pyarrow/tests/test_parquet.py | 27 +++++++++++++++++++++++++++
4 files changed, 45 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/arrow/blob/06be7aed/.travis.yml
----------------------------------------------------------------------
diff --git a/.travis.yml b/.travis.yml
index 052c22c..bfc2f26 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -24,6 +24,7 @@ matrix:
- compiler: gcc
language: cpp
os: linux
+ group: deprecated
before_script:
- export CC="gcc-4.9"
- export CXX="g++-4.9"
http://git-wip-us.apache.org/repos/asf/arrow/blob/06be7aed/python/pyarrow/includes/parquet.pxd
----------------------------------------------------------------------
diff --git a/python/pyarrow/includes/parquet.pxd b/python/pyarrow/includes/parquet.pxd
index 57c35ba..cb791e1 100644
--- a/python/pyarrow/includes/parquet.pxd
+++ b/python/pyarrow/includes/parquet.pxd
@@ -19,7 +19,7 @@
from pyarrow.includes.common cimport *
from pyarrow.includes.libarrow cimport CArray, CSchema, CStatus, CTable, MemoryPool
-from pyarrow.includes.libarrow_io cimport ReadableFileInterface
+from pyarrow.includes.libarrow_io cimport ReadableFileInterface, OutputStream
cdef extern from "parquet/api/schema.h" namespace "parquet::schema" nogil:
@@ -131,6 +131,9 @@ cdef extern from "parquet/arrow/io.h" namespace "parquet::arrow" nogil:
ParquetReadSource(ParquetAllocator* allocator)
Open(const shared_ptr[ReadableFileInterface]& file)
+ cdef cppclass ParquetWriteSink:
+ ParquetWriteSink(const shared_ptr[OutputStream]& file)
+
cdef extern from "parquet/arrow/reader.h" namespace "parquet::arrow" nogil:
CStatus OpenFile(const shared_ptr[ReadableFileInterface]& file,
@@ -154,6 +157,6 @@ cdef extern from "parquet/arrow/schema.h" namespace "parquet::arrow" nogil:
cdef extern from "parquet/arrow/writer.h" namespace "parquet::arrow" nogil:
cdef CStatus WriteFlatTable(
const CTable* table, MemoryPool* pool,
- const shared_ptr[ParquetOutputStream]& sink,
+ const shared_ptr[ParquetWriteSink]& sink,
int64_t chunk_size,
const shared_ptr[WriterProperties]& properties)
http://git-wip-us.apache.org/repos/asf/arrow/blob/06be7aed/python/pyarrow/parquet.pyx
----------------------------------------------------------------------
diff --git a/python/pyarrow/parquet.pyx b/python/pyarrow/parquet.pyx
index a6e3ac3..83fddb2 100644
--- a/python/pyarrow/parquet.pyx
+++ b/python/pyarrow/parquet.pyx
@@ -21,7 +21,7 @@
from pyarrow.includes.libarrow cimport *
from pyarrow.includes.parquet cimport *
-from pyarrow.includes.libarrow_io cimport ReadableFileInterface
+from pyarrow.includes.libarrow_io cimport ReadableFileInterface, OutputStream, FileOutputStream
cimport pyarrow.includes.pyarrow as pyarrow
from pyarrow.array cimport Array
@@ -151,7 +151,7 @@ def read_table(source, columns=None):
return Table.from_arrays(columns, arrays)
-def write_table(table, filename, chunk_size=None, version=None,
+def write_table(table, sink, chunk_size=None, version=None,
use_dictionary=True, compression=None):
"""
Write a Table to Parquet format
@@ -159,7 +159,7 @@ def write_table(table, filename, chunk_size=None, version=None,
Parameters
----------
table : pyarrow.Table
- filename : string
+ sink: string or pyarrow.io.NativeFile
chunk_size : int
The maximum number of rows in each Parquet RowGroup. As a default,
we will write a single RowGroup per file.
@@ -173,7 +173,8 @@ def write_table(table, filename, chunk_size=None, version=None,
"""
cdef Table table_ = table
cdef CTable* ctable_ = table_.table
- cdef shared_ptr[ParquetOutputStream] sink
+ cdef shared_ptr[ParquetWriteSink] sink_
+ cdef shared_ptr[FileOutputStream] filesink_
cdef WriterProperties.Builder properties_builder
cdef int64_t chunk_size_ = 0
if chunk_size is None:
@@ -230,7 +231,12 @@ def write_table(table, filename, chunk_size=None, version=None,
else:
raise ArrowException("Unsupport compression codec")
- sink.reset(new LocalFileOutputStream(tobytes(filename)))
+ if isinstance(sink, six.string_types):
+ check_status(FileOutputStream.Open(tobytes(sink), &filesink_))
+ sink_.reset(new ParquetWriteSink(<shared_ptr[OutputStream]>filesink_))
+ elif isinstance(sink, NativeFile):
+ sink_.reset(new ParquetWriteSink((<NativeFile>sink).wr_file))
+
with nogil:
- check_status(WriteFlatTable(ctable_, default_memory_pool(), sink,
+ check_status(WriteFlatTable(ctable_, default_memory_pool(), sink_,
chunk_size_, properties_builder.build()))
http://git-wip-us.apache.org/repos/asf/arrow/blob/06be7aed/python/pyarrow/tests/test_parquet.py
----------------------------------------------------------------------
diff --git a/python/pyarrow/tests/test_parquet.py b/python/pyarrow/tests/test_parquet.py
index c1d44ce..841830f 100644
--- a/python/pyarrow/tests/test_parquet.py
+++ b/python/pyarrow/tests/test_parquet.py
@@ -18,6 +18,7 @@
import pytest
import pyarrow as A
+import pyarrow.io as paio
import numpy as np
import pandas as pd
@@ -132,6 +133,32 @@ def test_pandas_column_selection(tmpdir):
pdt.assert_frame_equal(df[['uint8']], df_read)
@parquet
+def test_pandas_parquet_native_file_roundtrip(tmpdir):
+ size = 10000
+ np.random.seed(0)
+ df = pd.DataFrame({
+ 'uint8': np.arange(size, dtype=np.uint8),
+ 'uint16': np.arange(size, dtype=np.uint16),
+ 'uint32': np.arange(size, dtype=np.uint32),
+ 'uint64': np.arange(size, dtype=np.uint64),
+ 'int8': np.arange(size, dtype=np.int16),
+ 'int16': np.arange(size, dtype=np.int16),
+ 'int32': np.arange(size, dtype=np.int32),
+ 'int64': np.arange(size, dtype=np.int64),
+ 'float32': np.arange(size, dtype=np.float32),
+ 'float64': np.arange(size, dtype=np.float64),
+ 'bool': np.random.randn(size) > 0
+ })
+ arrow_table = A.from_pandas_dataframe(df)
+ imos = paio.InMemoryOutputStream()
+ pq.write_table(arrow_table, imos, version="2.0")
+ buf = imos.get_result()
+ reader = paio.BufferReader(buf)
+ df_read = pq.read_table(reader).to_pandas()
+ pdt.assert_frame_equal(df, df_read)
+
+
+@parquet
def test_pandas_parquet_configuration_options(tmpdir):
size = 10000
np.random.seed(0)