You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2016/12/02 16:48:31 UTC

arrow git commit: ARROW-389: Python: Write Parquet files to pyarrow.io.NativeFile objects

Repository: arrow
Updated Branches:
  refs/heads/master 33c731dbd -> 06be7aed0


ARROW-389: Python: Write Parquet files to pyarrow.io.NativeFile objects

Author: Uwe L. Korn <uw...@xhochy.com>

Closes #214 from xhochy/ARROW-389 and squashes the following commits:

e66c895 [Uwe L. Korn] Switch image to deprecated group
876cd65 [Uwe L. Korn] ARROW-389: Python: Write Parquet files to pyarrow.io.NativeFile objects


Project: http://git-wip-us.apache.org/repos/asf/arrow/repo
Commit: http://git-wip-us.apache.org/repos/asf/arrow/commit/06be7aed
Tree: http://git-wip-us.apache.org/repos/asf/arrow/tree/06be7aed
Diff: http://git-wip-us.apache.org/repos/asf/arrow/diff/06be7aed

Branch: refs/heads/master
Commit: 06be7aed062aca32b683f2ab3a94a201ae54b4f3
Parents: 33c731d
Author: Uwe L. Korn <uw...@xhochy.com>
Authored: Fri Dec 2 11:48:24 2016 -0500
Committer: Wes McKinney <we...@twosigma.com>
Committed: Fri Dec 2 11:48:24 2016 -0500

----------------------------------------------------------------------
 .travis.yml                          |  1 +
 python/pyarrow/includes/parquet.pxd  |  7 +++++--
 python/pyarrow/parquet.pyx           | 18 ++++++++++++------
 python/pyarrow/tests/test_parquet.py | 27 +++++++++++++++++++++++++++
 4 files changed, 45 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/arrow/blob/06be7aed/.travis.yml
----------------------------------------------------------------------
diff --git a/.travis.yml b/.travis.yml
index 052c22c..bfc2f26 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -24,6 +24,7 @@ matrix:
   - compiler: gcc
     language: cpp
     os: linux
+    group: deprecated
     before_script:
     - export CC="gcc-4.9"
     - export CXX="g++-4.9"

http://git-wip-us.apache.org/repos/asf/arrow/blob/06be7aed/python/pyarrow/includes/parquet.pxd
----------------------------------------------------------------------
diff --git a/python/pyarrow/includes/parquet.pxd b/python/pyarrow/includes/parquet.pxd
index 57c35ba..cb791e1 100644
--- a/python/pyarrow/includes/parquet.pxd
+++ b/python/pyarrow/includes/parquet.pxd
@@ -19,7 +19,7 @@
 
 from pyarrow.includes.common cimport *
 from pyarrow.includes.libarrow cimport CArray, CSchema, CStatus, CTable, MemoryPool
-from pyarrow.includes.libarrow_io cimport ReadableFileInterface
+from pyarrow.includes.libarrow_io cimport ReadableFileInterface, OutputStream
 
 
 cdef extern from "parquet/api/schema.h" namespace "parquet::schema" nogil:
@@ -131,6 +131,9 @@ cdef extern from "parquet/arrow/io.h" namespace "parquet::arrow" nogil:
         ParquetReadSource(ParquetAllocator* allocator)
         Open(const shared_ptr[ReadableFileInterface]& file)
 
+    cdef cppclass ParquetWriteSink:
+        ParquetWriteSink(const shared_ptr[OutputStream]& file)
+
 
 cdef extern from "parquet/arrow/reader.h" namespace "parquet::arrow" nogil:
     CStatus OpenFile(const shared_ptr[ReadableFileInterface]& file,
@@ -154,6 +157,6 @@ cdef extern from "parquet/arrow/schema.h" namespace "parquet::arrow" nogil:
 cdef extern from "parquet/arrow/writer.h" namespace "parquet::arrow" nogil:
     cdef CStatus WriteFlatTable(
         const CTable* table, MemoryPool* pool,
-        const shared_ptr[ParquetOutputStream]& sink,
+        const shared_ptr[ParquetWriteSink]& sink,
         int64_t chunk_size,
         const shared_ptr[WriterProperties]& properties)

http://git-wip-us.apache.org/repos/asf/arrow/blob/06be7aed/python/pyarrow/parquet.pyx
----------------------------------------------------------------------
diff --git a/python/pyarrow/parquet.pyx b/python/pyarrow/parquet.pyx
index a6e3ac3..83fddb2 100644
--- a/python/pyarrow/parquet.pyx
+++ b/python/pyarrow/parquet.pyx
@@ -21,7 +21,7 @@
 
 from pyarrow.includes.libarrow cimport *
 from pyarrow.includes.parquet cimport *
-from pyarrow.includes.libarrow_io cimport ReadableFileInterface
+from pyarrow.includes.libarrow_io cimport ReadableFileInterface, OutputStream, FileOutputStream
 cimport pyarrow.includes.pyarrow as pyarrow
 
 from pyarrow.array cimport Array
@@ -151,7 +151,7 @@ def read_table(source, columns=None):
         return Table.from_arrays(columns, arrays)
 
 
-def write_table(table, filename, chunk_size=None, version=None,
+def write_table(table, sink, chunk_size=None, version=None,
                 use_dictionary=True, compression=None):
     """
     Write a Table to Parquet format
@@ -159,7 +159,7 @@ def write_table(table, filename, chunk_size=None, version=None,
     Parameters
     ----------
     table : pyarrow.Table
-    filename : string
+    sink: string or pyarrow.io.NativeFile
     chunk_size : int
         The maximum number of rows in each Parquet RowGroup. As a default,
         we will write a single RowGroup per file.
@@ -173,7 +173,8 @@ def write_table(table, filename, chunk_size=None, version=None,
     """
     cdef Table table_ = table
     cdef CTable* ctable_ = table_.table
-    cdef shared_ptr[ParquetOutputStream] sink
+    cdef shared_ptr[ParquetWriteSink] sink_
+    cdef shared_ptr[FileOutputStream] filesink_
     cdef WriterProperties.Builder properties_builder
     cdef int64_t chunk_size_ = 0
     if chunk_size is None:
@@ -230,7 +231,12 @@ def write_table(table, filename, chunk_size=None, version=None,
             else:
                 raise ArrowException("Unsupport compression codec")
 
-    sink.reset(new LocalFileOutputStream(tobytes(filename)))
+    if isinstance(sink, six.string_types):
+       check_status(FileOutputStream.Open(tobytes(sink), &filesink_))
+       sink_.reset(new ParquetWriteSink(<shared_ptr[OutputStream]>filesink_))
+    elif isinstance(sink, NativeFile):
+        sink_.reset(new ParquetWriteSink((<NativeFile>sink).wr_file))
+
     with nogil:
-        check_status(WriteFlatTable(ctable_, default_memory_pool(), sink,
+        check_status(WriteFlatTable(ctable_, default_memory_pool(), sink_,
                                     chunk_size_, properties_builder.build()))

http://git-wip-us.apache.org/repos/asf/arrow/blob/06be7aed/python/pyarrow/tests/test_parquet.py
----------------------------------------------------------------------
diff --git a/python/pyarrow/tests/test_parquet.py b/python/pyarrow/tests/test_parquet.py
index c1d44ce..841830f 100644
--- a/python/pyarrow/tests/test_parquet.py
+++ b/python/pyarrow/tests/test_parquet.py
@@ -18,6 +18,7 @@
 import pytest
 
 import pyarrow as A
+import pyarrow.io as paio
 
 import numpy as np
 import pandas as pd
@@ -132,6 +133,32 @@ def test_pandas_column_selection(tmpdir):
     pdt.assert_frame_equal(df[['uint8']], df_read)
 
 @parquet
+def test_pandas_parquet_native_file_roundtrip(tmpdir):
+    size = 10000
+    np.random.seed(0)
+    df = pd.DataFrame({
+        'uint8': np.arange(size, dtype=np.uint8),
+        'uint16': np.arange(size, dtype=np.uint16),
+        'uint32': np.arange(size, dtype=np.uint32),
+        'uint64': np.arange(size, dtype=np.uint64),
+        'int8': np.arange(size, dtype=np.int16),
+        'int16': np.arange(size, dtype=np.int16),
+        'int32': np.arange(size, dtype=np.int32),
+        'int64': np.arange(size, dtype=np.int64),
+        'float32': np.arange(size, dtype=np.float32),
+        'float64': np.arange(size, dtype=np.float64),
+        'bool': np.random.randn(size) > 0
+    })
+    arrow_table = A.from_pandas_dataframe(df)
+    imos = paio.InMemoryOutputStream()
+    pq.write_table(arrow_table, imos, version="2.0")
+    buf = imos.get_result()
+    reader = paio.BufferReader(buf)
+    df_read = pq.read_table(reader).to_pandas()
+    pdt.assert_frame_equal(df, df_read)
+
+
+@parquet
 def test_pandas_parquet_configuration_options(tmpdir):
     size = 10000
     np.random.seed(0)