You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2017/04/14 16:16:04 UTC

arrow git commit: ARROW-528: [Python] Utilize improved Parquet writer C++ API, add write_metadata function, test _metadata files

Repository: arrow
Updated Branches:
  refs/heads/master 874666a61 -> b4892fd9f


ARROW-528: [Python] Utilize improved Parquet writer C++ API, add write_metadata function, test _metadata files

Author: Wes McKinney <we...@twosigma.com>

Closes #539 from wesm/ARROW-528 and squashes the following commits:

848ff93 [Wes McKinney] Add test for _metadata file
8b8f333 [Wes McKinney] Refactor to use APIs introduced in PARQUET-953. Add write_metadata function


Project: http://git-wip-us.apache.org/repos/asf/arrow/repo
Commit: http://git-wip-us.apache.org/repos/asf/arrow/commit/b4892fd9
Tree: http://git-wip-us.apache.org/repos/asf/arrow/tree/b4892fd9
Diff: http://git-wip-us.apache.org/repos/asf/arrow/diff/b4892fd9

Branch: refs/heads/master
Commit: b4892fd9fb676a678a966da51407b3ce4ba3ec65
Parents: 874666a
Author: Wes McKinney <we...@twosigma.com>
Authored: Fri Apr 14 12:15:57 2017 -0400
Committer: Wes McKinney <we...@twosigma.com>
Committed: Fri Apr 14 12:15:57 2017 -0400

----------------------------------------------------------------------
 python/pyarrow/_parquet.pxd          | 16 +++++++---
 python/pyarrow/_parquet.pyx          | 52 ++++++++++++++++++-------------
 python/pyarrow/parquet.py            | 34 +++++++++++++++++---
 python/pyarrow/tests/test_parquet.py | 24 ++++++++++++++
 4 files changed, 94 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/arrow/blob/b4892fd9/python/pyarrow/_parquet.pxd
----------------------------------------------------------------------
diff --git a/python/pyarrow/_parquet.pxd b/python/pyarrow/_parquet.pxd
index 1ac1f69..9f6edc0 100644
--- a/python/pyarrow/_parquet.pxd
+++ b/python/pyarrow/_parquet.pxd
@@ -235,8 +235,14 @@ cdef extern from "parquet/arrow/schema.h" namespace "parquet::arrow" nogil:
 
 
 cdef extern from "parquet/arrow/writer.h" namespace "parquet::arrow" nogil:
-    cdef CStatus WriteTable(
-        const CTable& table, CMemoryPool* pool,
-        const shared_ptr[OutputStream]& sink,
-        int64_t chunk_size,
-        const shared_ptr[WriterProperties]& properties)
+    cdef cppclass FileWriter:
+
+        @staticmethod
+        CStatus Open(const CSchema& schema, CMemoryPool* pool,
+                     const shared_ptr[OutputStream]& sink,
+                     const shared_ptr[WriterProperties]& properties,
+                     unique_ptr[FileWriter]* writer)
+
+        CStatus WriteTable(const CTable& table, int64_t chunk_size)
+        CStatus NewRowGroup(int64_t chunk_size)
+        CStatus Close()

http://git-wip-us.apache.org/repos/asf/arrow/blob/b4892fd9/python/pyarrow/_parquet.pyx
----------------------------------------------------------------------
diff --git a/python/pyarrow/_parquet.pyx b/python/pyarrow/_parquet.pyx
index 5418e1d..b7358a6 100644
--- a/python/pyarrow/_parquet.pyx
+++ b/python/pyarrow/_parquet.pyx
@@ -23,7 +23,7 @@ from cython.operator cimport dereference as deref
 from pyarrow.includes.common cimport *
 from pyarrow.includes.libarrow cimport *
 cimport pyarrow.includes.pyarrow as pyarrow
-from pyarrow._array cimport Array
+from pyarrow._array cimport Array, Schema
 from pyarrow._error cimport check_status
 from pyarrow._memory cimport MemoryPool, maybe_unbox_memory_pool
 from pyarrow._table cimport Table, table_from_ctable
@@ -108,7 +108,7 @@ cdef class FileMetaData:
         if self._schema is not None:
             return self._schema
 
-        cdef Schema schema = Schema()
+        cdef ParquetSchema schema = ParquetSchema()
         schema.init_from_filemeta(self)
         self._schema = schema
         return schema
@@ -160,7 +160,7 @@ cdef class FileMetaData:
         return result
 
 
-cdef class Schema:
+cdef class ParquetSchema:
     cdef:
         object parent  # the FileMetaData owning the SchemaDescriptor
         const SchemaDescriptor* schema
@@ -194,7 +194,7 @@ cdef class Schema:
     def __getitem__(self, i):
         return self.column(i)
 
-    def equals(self, Schema other):
+    def equals(self, ParquetSchema other):
         """
         Returns True if the Parquet schemas are equal
         """
@@ -217,7 +217,7 @@ cdef class ColumnSchema:
     def __cinit__(self):
         self.descr = NULL
 
-    cdef init_from_schema(self, Schema schema, int i):
+    cdef init_from_schema(self, ParquetSchema schema, int i):
         self.parent = schema
         self.descr = schema.schema.Column(i)
 
@@ -373,7 +373,8 @@ cdef class ParquetReader:
         if self._metadata is not None:
             return self._metadata
 
-        metadata = self.reader.get().parquet_reader().metadata()
+        with nogil:
+            metadata = self.reader.get().parquet_reader().metadata()
 
         self._metadata = result = FileMetaData()
         result.init(metadata)
@@ -487,9 +488,7 @@ cdef ParquetCompression compression_from_name(object name):
 
 cdef class ParquetWriter:
     cdef:
-        shared_ptr[WriterProperties] properties
-        shared_ptr[OutputStream] sink
-        CMemoryPool* allocator
+        unique_ptr[FileWriter] writer
 
     cdef readonly:
         object use_dictionary
@@ -497,28 +496,34 @@ cdef class ParquetWriter:
         object version
         int row_group_size
 
-    def __cinit__(self, where, use_dictionary=None, compression=None,
-                  version=None, MemoryPool memory_pool=None):
-        cdef shared_ptr[FileOutputStream] filestream
+    def __cinit__(self, where, Schema schema, use_dictionary=None,
+                  compression=None, version=None,
+                  MemoryPool memory_pool=None):
+        cdef:
+            shared_ptr[FileOutputStream] filestream
+            shared_ptr[OutputStream] sink
+            shared_ptr[WriterProperties] properties
 
         if isinstance(where, six.string_types):
             check_status(FileOutputStream.Open(tobytes(where), &filestream))
-            self.sink = <shared_ptr[OutputStream]> filestream
+            sink = <shared_ptr[OutputStream]> filestream
         else:
-            get_writer(where, &self.sink)
-        self.allocator = maybe_unbox_memory_pool(memory_pool)
+            get_writer(where, &sink)
 
         self.use_dictionary = use_dictionary
         self.compression = compression
         self.version = version
-        self._setup_properties()
 
-    cdef _setup_properties(self):
         cdef WriterProperties.Builder properties_builder
         self._set_version(&properties_builder)
         self._set_compression_props(&properties_builder)
         self._set_dictionary_props(&properties_builder)
-        self.properties = properties_builder.build()
+        properties = properties_builder.build()
+
+        check_status(
+            FileWriter.Open(deref(schema.schema),
+                            maybe_unbox_memory_pool(memory_pool),
+                            sink, properties, &self.writer))
 
     cdef _set_version(self, WriterProperties.Builder* props):
         if self.version is not None:
@@ -546,12 +551,16 @@ cdef class ParquetWriter:
                 props.enable_dictionary()
             else:
                 props.disable_dictionary()
-        else:
+        elif self.use_dictionary is not None:
             # Deactivate dictionary encoding by default
             props.disable_dictionary()
             for column in self.use_dictionary:
                 props.enable_dictionary(column)
 
+    def close(self):
+        with nogil:
+            check_status(self.writer.get().Close())
+
     def write_table(self, Table table, row_group_size=None):
         cdef CTable* ctable = table.table
 
@@ -563,6 +572,5 @@ cdef class ParquetWriter:
         cdef int c_row_group_size = row_group_size
 
         with nogil:
-            check_status(WriteTable(deref(ctable), self.allocator,
-                                    self.sink, c_row_group_size,
-                                    self.properties))
+            check_status(self.writer.get()
+                         .WriteTable(deref(ctable), c_row_group_size))

http://git-wip-us.apache.org/repos/asf/arrow/blob/b4892fd9/python/pyarrow/parquet.py
----------------------------------------------------------------------
diff --git a/python/pyarrow/parquet.py b/python/pyarrow/parquet.py
index aaec43a..4ff7e03 100644
--- a/python/pyarrow/parquet.py
+++ b/python/pyarrow/parquet.py
@@ -21,7 +21,8 @@ import numpy as np
 
 from pyarrow.filesystem import LocalFilesystem
 from pyarrow._parquet import (ParquetReader, FileMetaData,  # noqa
-                              RowGroupMetaData, Schema, ParquetWriter)
+                              RowGroupMetaData, ParquetSchema,
+                              ParquetWriter)
 import pyarrow._parquet as _parquet  # noqa
 import pyarrow._array as _array
 import pyarrow._table as _table
@@ -471,7 +472,8 @@ class ParquetDataset(object):
         else:
             self.fs = filesystem
 
-        self.pieces, self.partitions = _make_manifest(path_or_paths, self.fs)
+        (self.pieces, self.partitions,
+         self.metadata_path) = _make_manifest(path_or_paths, self.fs)
 
         self.metadata = metadata
         self.schema = schema
@@ -488,7 +490,10 @@ class ParquetDataset(object):
         open_file = self._get_open_file_func()
 
         if self.metadata is None and self.schema is None:
-            self.schema = self.pieces[0].get_metadata(open_file).schema
+            if self.metadata_path is not None:
+                self.schema = open_file(self.metadata_path).schema
+            else:
+                self.schema = self.pieces[0].get_metadata(open_file).schema
         elif self.schema is None:
             self.schema = self.metadata.schema
 
@@ -543,10 +548,12 @@ class ParquetDataset(object):
 
 def _make_manifest(path_or_paths, fs, pathsep='/'):
     partitions = None
+    metadata_path = None
 
     if is_string(path_or_paths) and fs.isdir(path_or_paths):
         manifest = ParquetManifest(path_or_paths, filesystem=fs,
                                    pathsep=pathsep)
+        metadata_path = manifest.metadata_path
         pieces = manifest.pieces
         partitions = manifest.partitions
     else:
@@ -565,7 +572,7 @@ def _make_manifest(path_or_paths, fs, pathsep='/'):
             piece = ParquetDatasetPiece(path)
             pieces.append(piece)
 
-    return pieces, partitions
+    return pieces, partitions, metadata_path
 
 
 def read_table(source, columns=None, nthreads=1, metadata=None):
@@ -622,7 +629,24 @@ def write_table(table, where, row_group_size=None, version='1.0',
         Specify the compression codec, either on a general basis or per-column.
     """
     row_group_size = kwargs.get('chunk_size', row_group_size)
-    writer = ParquetWriter(where, use_dictionary=use_dictionary,
+    writer = ParquetWriter(where, table.schema,
+                           use_dictionary=use_dictionary,
                            compression=compression,
                            version=version)
     writer.write_table(table, row_group_size=row_group_size)
+    writer.close()
+
+
+def write_metadata(schema, where, version='1.0'):
+    """
+    Write metadata-only Parquet file from schema
+
+    Parameters
+    ----------
+    schema : pyarrow.Schema
+    where: string or pyarrow.io.NativeFile
+    version : {"1.0", "2.0"}, default "1.0"
+        The Parquet format version, defaults to 1.0
+    """
+    writer = ParquetWriter(where, schema, version=version)
+    writer.close()

http://git-wip-us.apache.org/repos/asf/arrow/blob/b4892fd9/python/pyarrow/tests/test_parquet.py
----------------------------------------------------------------------
diff --git a/python/pyarrow/tests/test_parquet.py b/python/pyarrow/tests/test_parquet.py
index a5c70aa..ca6ae2d 100644
--- a/python/pyarrow/tests/test_parquet.py
+++ b/python/pyarrow/tests/test_parquet.py
@@ -529,6 +529,30 @@ def _generate_partition_directories(base_dir, partition_spec, df):
     _visit_level(base_dir, 0, [])
 
 
+@parquet
+def test_read_common_metadata_files(tmpdir):
+    N = 100
+    df = pd.DataFrame({
+        'index': np.arange(N),
+        'values': np.random.randn(N)
+    }, columns=['index', 'values'])
+
+    base_path = str(tmpdir)
+    data_path = pjoin(base_path, 'data.parquet')
+
+    table = pa.Table.from_pandas(df)
+    pq.write_table(table, data_path)
+
+    metadata_path = pjoin(base_path, '_metadata')
+    pq.write_metadata(table.schema, metadata_path)
+
+    dataset = pq.ParquetDataset(base_path)
+    assert dataset.metadata_path == metadata_path
+
+    pf = pq.ParquetFile(data_path)
+    assert dataset.schema.equals(pf.schema)
+
+
 def _filter_partition(df, part_keys):
     predicate = np.ones(len(df), dtype=bool)