You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2017/04/14 16:16:04 UTC
arrow git commit: ARROW-528: [Python] Utilize improved Parquet writer
C++ API, add write_metadata function, test _metadata files
Repository: arrow
Updated Branches:
refs/heads/master 874666a61 -> b4892fd9f
ARROW-528: [Python] Utilize improved Parquet writer C++ API, add write_metadata function, test _metadata files
Author: Wes McKinney <we...@twosigma.com>
Closes #539 from wesm/ARROW-528 and squashes the following commits:
848ff93 [Wes McKinney] Add test for _metadata file
8b8f333 [Wes McKinney] Refactor to use APIs introduced in PARQUET-953. Add write_metadata function
Project: http://git-wip-us.apache.org/repos/asf/arrow/repo
Commit: http://git-wip-us.apache.org/repos/asf/arrow/commit/b4892fd9
Tree: http://git-wip-us.apache.org/repos/asf/arrow/tree/b4892fd9
Diff: http://git-wip-us.apache.org/repos/asf/arrow/diff/b4892fd9
Branch: refs/heads/master
Commit: b4892fd9fb676a678a966da51407b3ce4ba3ec65
Parents: 874666a
Author: Wes McKinney <we...@twosigma.com>
Authored: Fri Apr 14 12:15:57 2017 -0400
Committer: Wes McKinney <we...@twosigma.com>
Committed: Fri Apr 14 12:15:57 2017 -0400
----------------------------------------------------------------------
python/pyarrow/_parquet.pxd | 16 +++++++---
python/pyarrow/_parquet.pyx | 52 ++++++++++++++++++-------------
python/pyarrow/parquet.py | 34 +++++++++++++++++---
python/pyarrow/tests/test_parquet.py | 24 ++++++++++++++
4 files changed, 94 insertions(+), 32 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/arrow/blob/b4892fd9/python/pyarrow/_parquet.pxd
----------------------------------------------------------------------
diff --git a/python/pyarrow/_parquet.pxd b/python/pyarrow/_parquet.pxd
index 1ac1f69..9f6edc0 100644
--- a/python/pyarrow/_parquet.pxd
+++ b/python/pyarrow/_parquet.pxd
@@ -235,8 +235,14 @@ cdef extern from "parquet/arrow/schema.h" namespace "parquet::arrow" nogil:
cdef extern from "parquet/arrow/writer.h" namespace "parquet::arrow" nogil:
- cdef CStatus WriteTable(
- const CTable& table, CMemoryPool* pool,
- const shared_ptr[OutputStream]& sink,
- int64_t chunk_size,
- const shared_ptr[WriterProperties]& properties)
+ cdef cppclass FileWriter:
+
+ @staticmethod
+ CStatus Open(const CSchema& schema, CMemoryPool* pool,
+ const shared_ptr[OutputStream]& sink,
+ const shared_ptr[WriterProperties]& properties,
+ unique_ptr[FileWriter]* writer)
+
+ CStatus WriteTable(const CTable& table, int64_t chunk_size)
+ CStatus NewRowGroup(int64_t chunk_size)
+ CStatus Close()
http://git-wip-us.apache.org/repos/asf/arrow/blob/b4892fd9/python/pyarrow/_parquet.pyx
----------------------------------------------------------------------
diff --git a/python/pyarrow/_parquet.pyx b/python/pyarrow/_parquet.pyx
index 5418e1d..b7358a6 100644
--- a/python/pyarrow/_parquet.pyx
+++ b/python/pyarrow/_parquet.pyx
@@ -23,7 +23,7 @@ from cython.operator cimport dereference as deref
from pyarrow.includes.common cimport *
from pyarrow.includes.libarrow cimport *
cimport pyarrow.includes.pyarrow as pyarrow
-from pyarrow._array cimport Array
+from pyarrow._array cimport Array, Schema
from pyarrow._error cimport check_status
from pyarrow._memory cimport MemoryPool, maybe_unbox_memory_pool
from pyarrow._table cimport Table, table_from_ctable
@@ -108,7 +108,7 @@ cdef class FileMetaData:
if self._schema is not None:
return self._schema
- cdef Schema schema = Schema()
+ cdef ParquetSchema schema = ParquetSchema()
schema.init_from_filemeta(self)
self._schema = schema
return schema
@@ -160,7 +160,7 @@ cdef class FileMetaData:
return result
-cdef class Schema:
+cdef class ParquetSchema:
cdef:
object parent # the FileMetaData owning the SchemaDescriptor
const SchemaDescriptor* schema
@@ -194,7 +194,7 @@ cdef class Schema:
def __getitem__(self, i):
return self.column(i)
- def equals(self, Schema other):
+ def equals(self, ParquetSchema other):
"""
Returns True if the Parquet schemas are equal
"""
@@ -217,7 +217,7 @@ cdef class ColumnSchema:
def __cinit__(self):
self.descr = NULL
- cdef init_from_schema(self, Schema schema, int i):
+ cdef init_from_schema(self, ParquetSchema schema, int i):
self.parent = schema
self.descr = schema.schema.Column(i)
@@ -373,7 +373,8 @@ cdef class ParquetReader:
if self._metadata is not None:
return self._metadata
- metadata = self.reader.get().parquet_reader().metadata()
+ with nogil:
+ metadata = self.reader.get().parquet_reader().metadata()
self._metadata = result = FileMetaData()
result.init(metadata)
@@ -487,9 +488,7 @@ cdef ParquetCompression compression_from_name(object name):
cdef class ParquetWriter:
cdef:
- shared_ptr[WriterProperties] properties
- shared_ptr[OutputStream] sink
- CMemoryPool* allocator
+ unique_ptr[FileWriter] writer
cdef readonly:
object use_dictionary
@@ -497,28 +496,34 @@ cdef class ParquetWriter:
object version
int row_group_size
- def __cinit__(self, where, use_dictionary=None, compression=None,
- version=None, MemoryPool memory_pool=None):
- cdef shared_ptr[FileOutputStream] filestream
+ def __cinit__(self, where, Schema schema, use_dictionary=None,
+ compression=None, version=None,
+ MemoryPool memory_pool=None):
+ cdef:
+ shared_ptr[FileOutputStream] filestream
+ shared_ptr[OutputStream] sink
+ shared_ptr[WriterProperties] properties
if isinstance(where, six.string_types):
check_status(FileOutputStream.Open(tobytes(where), &filestream))
- self.sink = <shared_ptr[OutputStream]> filestream
+ sink = <shared_ptr[OutputStream]> filestream
else:
- get_writer(where, &self.sink)
- self.allocator = maybe_unbox_memory_pool(memory_pool)
+ get_writer(where, &sink)
self.use_dictionary = use_dictionary
self.compression = compression
self.version = version
- self._setup_properties()
- cdef _setup_properties(self):
cdef WriterProperties.Builder properties_builder
self._set_version(&properties_builder)
self._set_compression_props(&properties_builder)
self._set_dictionary_props(&properties_builder)
- self.properties = properties_builder.build()
+ properties = properties_builder.build()
+
+ check_status(
+ FileWriter.Open(deref(schema.schema),
+ maybe_unbox_memory_pool(memory_pool),
+ sink, properties, &self.writer))
cdef _set_version(self, WriterProperties.Builder* props):
if self.version is not None:
@@ -546,12 +551,16 @@ cdef class ParquetWriter:
props.enable_dictionary()
else:
props.disable_dictionary()
- else:
+ elif self.use_dictionary is not None:
# Deactivate dictionary encoding by default
props.disable_dictionary()
for column in self.use_dictionary:
props.enable_dictionary(column)
+ def close(self):
+ with nogil:
+ check_status(self.writer.get().Close())
+
def write_table(self, Table table, row_group_size=None):
cdef CTable* ctable = table.table
@@ -563,6 +572,5 @@ cdef class ParquetWriter:
cdef int c_row_group_size = row_group_size
with nogil:
- check_status(WriteTable(deref(ctable), self.allocator,
- self.sink, c_row_group_size,
- self.properties))
+ check_status(self.writer.get()
+ .WriteTable(deref(ctable), c_row_group_size))
http://git-wip-us.apache.org/repos/asf/arrow/blob/b4892fd9/python/pyarrow/parquet.py
----------------------------------------------------------------------
diff --git a/python/pyarrow/parquet.py b/python/pyarrow/parquet.py
index aaec43a..4ff7e03 100644
--- a/python/pyarrow/parquet.py
+++ b/python/pyarrow/parquet.py
@@ -21,7 +21,8 @@ import numpy as np
from pyarrow.filesystem import LocalFilesystem
from pyarrow._parquet import (ParquetReader, FileMetaData, # noqa
- RowGroupMetaData, Schema, ParquetWriter)
+ RowGroupMetaData, ParquetSchema,
+ ParquetWriter)
import pyarrow._parquet as _parquet # noqa
import pyarrow._array as _array
import pyarrow._table as _table
@@ -471,7 +472,8 @@ class ParquetDataset(object):
else:
self.fs = filesystem
- self.pieces, self.partitions = _make_manifest(path_or_paths, self.fs)
+ (self.pieces, self.partitions,
+ self.metadata_path) = _make_manifest(path_or_paths, self.fs)
self.metadata = metadata
self.schema = schema
@@ -488,7 +490,10 @@ class ParquetDataset(object):
open_file = self._get_open_file_func()
if self.metadata is None and self.schema is None:
- self.schema = self.pieces[0].get_metadata(open_file).schema
+ if self.metadata_path is not None:
+ self.schema = open_file(self.metadata_path).schema
+ else:
+ self.schema = self.pieces[0].get_metadata(open_file).schema
elif self.schema is None:
self.schema = self.metadata.schema
@@ -543,10 +548,12 @@ class ParquetDataset(object):
def _make_manifest(path_or_paths, fs, pathsep='/'):
partitions = None
+ metadata_path = None
if is_string(path_or_paths) and fs.isdir(path_or_paths):
manifest = ParquetManifest(path_or_paths, filesystem=fs,
pathsep=pathsep)
+ metadata_path = manifest.metadata_path
pieces = manifest.pieces
partitions = manifest.partitions
else:
@@ -565,7 +572,7 @@ def _make_manifest(path_or_paths, fs, pathsep='/'):
piece = ParquetDatasetPiece(path)
pieces.append(piece)
- return pieces, partitions
+ return pieces, partitions, metadata_path
def read_table(source, columns=None, nthreads=1, metadata=None):
@@ -622,7 +629,24 @@ def write_table(table, where, row_group_size=None, version='1.0',
Specify the compression codec, either on a general basis or per-column.
"""
row_group_size = kwargs.get('chunk_size', row_group_size)
- writer = ParquetWriter(where, use_dictionary=use_dictionary,
+ writer = ParquetWriter(where, table.schema,
+ use_dictionary=use_dictionary,
compression=compression,
version=version)
writer.write_table(table, row_group_size=row_group_size)
+ writer.close()
+
+
+def write_metadata(schema, where, version='1.0'):
+ """
+ Write metadata-only Parquet file from schema
+
+ Parameters
+ ----------
+ schema : pyarrow.Schema
+ where: string or pyarrow.io.NativeFile
+ version : {"1.0", "2.0"}, default "1.0"
+ The Parquet format version, defaults to 1.0
+ """
+ writer = ParquetWriter(where, schema, version=version)
+ writer.close()
http://git-wip-us.apache.org/repos/asf/arrow/blob/b4892fd9/python/pyarrow/tests/test_parquet.py
----------------------------------------------------------------------
diff --git a/python/pyarrow/tests/test_parquet.py b/python/pyarrow/tests/test_parquet.py
index a5c70aa..ca6ae2d 100644
--- a/python/pyarrow/tests/test_parquet.py
+++ b/python/pyarrow/tests/test_parquet.py
@@ -529,6 +529,30 @@ def _generate_partition_directories(base_dir, partition_spec, df):
_visit_level(base_dir, 0, [])
+@parquet
+def test_read_common_metadata_files(tmpdir):
+ N = 100
+ df = pd.DataFrame({
+ 'index': np.arange(N),
+ 'values': np.random.randn(N)
+ }, columns=['index', 'values'])
+
+ base_path = str(tmpdir)
+ data_path = pjoin(base_path, 'data.parquet')
+
+ table = pa.Table.from_pandas(df)
+ pq.write_table(table, data_path)
+
+ metadata_path = pjoin(base_path, '_metadata')
+ pq.write_metadata(table.schema, metadata_path)
+
+ dataset = pq.ParquetDataset(base_path)
+ assert dataset.metadata_path == metadata_path
+
+ pf = pq.ParquetFile(data_path)
+ assert dataset.schema.equals(pf.schema)
+
+
def _filter_partition(df, part_keys):
predicate = np.ones(len(df), dtype=bool)