You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ks...@apache.org on 2019/04/19 11:22:52 UTC
[arrow] branch master updated: ARROW-5144: [Python] ParquetDataset
and ParquetPiece not serializable
This is an automated email from the ASF dual-hosted git repository.
kszucs pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new 2b5add5 ARROW-5144: [Python] ParquetDataset and ParquetPiece not serializable
2b5add5 is described below
commit 2b5add59608a6b042f5f9e81e41cf8729a1e3a22
Author: Krisztián Szűcs <sz...@gmail.com>
AuthorDate: Fri Apr 19 13:22:33 2019 +0200
ARROW-5144: [Python] ParquetDataset and ParquetPiece not serializable
Author: Krisztián Szűcs <sz...@gmail.com>
Closes #4156 from kszucs/ARROW-5144 and squashes the following commits:
2bdedd069 <Krisztián Szűcs> implement not equal
7cecfaae4 <Krisztián Szűcs> remove debug line
d08c17ce0 <Krisztián Szűcs> more thorough testing; explicitinteger cast
1f738e302 <Krisztián Szűcs> don't mark as pandas
994e329ab <Krisztián Szűcs> use partial
ea40dcc2d <Krisztián Szűcs> fix review comments
739f183b6 <Krisztián Szűcs> skip depending on protocol
080170ff4 <Krisztián Szűcs> support standard pickle
69362cc28 <Krisztián Szűcs> picklable ParquetDataset
---
python/pyarrow/_parquet.pxd | 13 ++--
python/pyarrow/_parquet.pyx | 111 +++++++++++++++++++++++++++++++----
python/pyarrow/parquet.py | 69 +++++++++++++++++-----
python/pyarrow/tests/test_parquet.py | 55 +++++++++++++++++
4 files changed, 218 insertions(+), 30 deletions(-)
diff --git a/python/pyarrow/_parquet.pxd b/python/pyarrow/_parquet.pxd
index b63e72c..eedddff 100644
--- a/python/pyarrow/_parquet.pxd
+++ b/python/pyarrow/_parquet.pxd
@@ -20,7 +20,7 @@
from pyarrow.includes.common cimport *
from pyarrow.includes.libarrow cimport (CChunkedArray, CSchema, CStatus,
- CTable, CMemoryPool,
+ CTable, CMemoryPool, CBuffer,
CKeyValueMetadata,
RandomAccessFile, OutputStream,
TimeUnit)
@@ -225,6 +225,11 @@ cdef extern from "parquet/api/reader.h" namespace "parquet" nogil:
unique_ptr[CRowGroupMetaData] RowGroup(int i)
const SchemaDescriptor* schema()
shared_ptr[const CKeyValueMetadata] key_value_metadata() const
+ void WriteTo(ParquetOutputStream* dst) const
+
+ cdef shared_ptr[CFileMetaData] CFileMetaData_Make \
+ " parquet::FileMetaData::Make"(const void* serialized_metadata,
+ uint32_t* metadata_len)
cdef cppclass ReaderProperties:
pass
@@ -247,9 +252,9 @@ cdef extern from "parquet/api/writer.h" namespace "parquet" nogil:
cdef cppclass ParquetOutputStream" parquet::OutputStream":
pass
- cdef cppclass LocalFileOutputStream(ParquetOutputStream):
- LocalFileOutputStream(const c_string& path)
- void Close()
+ cdef cppclass ParquetInMemoryOutputStream \
+ " parquet::InMemoryOutputStream"(ParquetOutputStream):
+ shared_ptr[CBuffer] GetBuffer()
cdef cppclass WriterProperties:
cppclass Builder:
diff --git a/python/pyarrow/_parquet.pyx b/python/pyarrow/_parquet.pyx
index 87fe3d3..3ee174e 100644
--- a/python/pyarrow/_parquet.pyx
+++ b/python/pyarrow/_parquet.pyx
@@ -19,25 +19,27 @@
# distutils: language = c++
# cython: embedsignature = True
+import io
+import six
+import warnings
+
from cython.operator cimport dereference as deref
from pyarrow.includes.common cimport *
from pyarrow.includes.libarrow cimport *
-from pyarrow.lib cimport (Array, Schema,
+from pyarrow.lib cimport (Buffer, Array, Schema,
check_status,
MemoryPool, maybe_unbox_memory_pool,
Table,
pyarrow_wrap_chunked_array,
pyarrow_wrap_schema,
pyarrow_wrap_table,
+ pyarrow_wrap_buffer,
NativeFile, get_reader, get_writer)
from pyarrow.compat import tobytes, frombytes
from pyarrow.lib import ArrowException, NativeFile, _stringify_path
from pyarrow.util import indent
-import six
-import warnings
-
cdef class RowGroupStatistics:
cdef:
@@ -66,6 +68,22 @@ cdef class RowGroupStatistics:
self.num_values,
self.physical_type)
+ def __eq__(self, other):
+ try:
+ return self.equals(other)
+ except TypeError:
+ return NotImplemented
+
+ def equals(self, RowGroupStatistics other):
+ # TODO(kszucs): implement native Equals method for RowGroupStatistics
+ return (self.has_min_max == other.has_min_max and
+ self.min == other.min and
+ self.max == other.max and
+ self.null_count == other.null_count and
+ self.distinct_count == other.distinct_count and
+ self.num_values == other.num_values and
+ self.physical_type == other.physical_type)
+
cdef inline _cast_statistic(self, object value):
# Input value is bytes
cdef ParquetType physical_type = self.statistics.get().physical_type()
@@ -174,6 +192,29 @@ cdef class ColumnChunkMetaData:
self.total_compressed_size,
self.total_uncompressed_size)
+ def __eq__(self, other):
+ try:
+ return self.equals(other)
+ except TypeError:
+ return NotImplemented
+
+ def equals(self, ColumnChunkMetaData other):
+ # TODO(kszucs): implement native Equals method for CColumnChunkMetaData
+ return (self.file_offset == other.file_offset and
+ self.file_path == other.file_path and
+ self.physical_type == other.physical_type and
+ self.num_values == other.num_values and
+ self.path_in_schema == other.path_in_schema and
+ self.is_stats_set == other.is_stats_set and
+ self.statistics == other.statistics and
+ self.compression == other.compression and
+ self.encodings == other.encodings and
+ self.has_dictionary_page == other.has_dictionary_page and
+ self.dictionary_page_offset == other.dictionary_page_offset and
+ self.data_page_offset == other.data_page_offset and
+ self.total_compressed_size == other.total_compressed_size and
+ self.total_uncompressed_size == other.total_uncompressed_size)
+
@property
def file_offset(self):
return self.metadata.file_offset()
@@ -249,16 +290,39 @@ cdef class ColumnChunkMetaData:
cdef class RowGroupMetaData:
cdef:
+ int index # for pickling support
unique_ptr[CRowGroupMetaData] up_metadata
CRowGroupMetaData* metadata
FileMetaData parent
- def __cinit__(self, FileMetaData parent, int i):
- if i < 0 or i >= parent.num_row_groups:
- raise IndexError('{0} out of bounds'.format(i))
- self.up_metadata = parent._metadata.RowGroup(i)
+ def __cinit__(self, FileMetaData parent, int index):
+ if index < 0 or index >= parent.num_row_groups:
+ raise IndexError('{0} out of bounds'.format(index))
+ self.up_metadata = parent._metadata.RowGroup(index)
self.metadata = self.up_metadata.get()
self.parent = parent
+ self.index = index
+
+ def __reduce__(self):
+ return RowGroupMetaData, (self.parent, self.index)
+
+ def __eq__(self, other):
+ try:
+ return self.equals(other)
+ except TypeError:
+ return NotImplemented
+
+ def equals(self, RowGroupMetaData other):
+ if not (self.num_columns == other.num_columns and
+ self.num_rows == other.num_rows and
+ self.total_byte_size == other.total_byte_size):
+ return False
+
+ for i in range(self.num_columns):
+ if self.column(i) != other.column(i):
+ return False
+
+ return True
def column(self, int i):
chunk = ColumnChunkMetaData()
@@ -287,6 +351,17 @@ cdef class RowGroupMetaData:
return self.metadata.total_byte_size()
+def _reconstruct_filemetadata(Buffer serialized):
+ cdef:
+ FileMetaData metadata = FileMetaData.__new__(FileMetaData)
+ CBuffer *buffer = serialized.buffer.get()
+ uint32_t metadata_len = <uint32_t>buffer.size()
+
+ metadata.init(CFileMetaData_Make(buffer.data(), &metadata_len))
+
+ return metadata
+
+
cdef class FileMetaData:
cdef:
shared_ptr[CFileMetaData] sp_metadata
@@ -300,6 +375,14 @@ cdef class FileMetaData:
self.sp_metadata = metadata
self._metadata = metadata.get()
+ def __reduce__(self):
+ cdef ParquetInMemoryOutputStream sink
+ with nogil:
+ self._metadata.WriteTo(&sink)
+
+ cdef Buffer buffer = pyarrow_wrap_buffer(sink.GetBuffer())
+ return _reconstruct_filemetadata, (buffer,)
+
def __repr__(self):
return """{0}
created_by: {1}
@@ -406,6 +489,9 @@ cdef class ParquetSchema:
{1}
""".format(object.__repr__(self), '\n'.join(elements))
+ def __reduce__(self):
+ return ParquetSchema, (self.parent,)
+
def __len__(self):
return self.schema.num_columns()
@@ -454,12 +540,14 @@ cdef class ParquetSchema:
cdef class ColumnSchema:
cdef:
+ int index
ParquetSchema parent
const ColumnDescriptor* descr
- def __cinit__(self, ParquetSchema schema, int i):
+ def __cinit__(self, ParquetSchema schema, int index):
self.parent = schema
- self.descr = schema.schema.Column(i)
+ self.index = index # for pickling support
+ self.descr = schema.schema.Column(index)
def __eq__(self, other):
try:
@@ -467,6 +555,9 @@ cdef class ColumnSchema:
except TypeError:
return NotImplemented
+ def __reduce__(self):
+ return ColumnSchema, (self.parent, self.index)
+
def equals(self, ColumnSchema other):
"""
Returns True if the column schemas are equal
diff --git a/python/pyarrow/parquet.py b/python/pyarrow/parquet.py
index 7ed7818..69187bc 100644
--- a/python/pyarrow/parquet.py
+++ b/python/pyarrow/parquet.py
@@ -663,6 +663,23 @@ class ParquetPartitions(object):
def __getitem__(self, i):
return self.levels[i]
+ def equals(self, other):
+ if not isinstance(other, ParquetPartitions):
+ raise TypeError('`other` must be an instance of ParquetPartitions')
+
+ return (self.levels == other.levels and
+ self.partition_names == other.partition_names)
+
+ def __eq__(self, other):
+ try:
+ return self.equals(other)
+ except TypeError:
+ return NotImplemented
+
+ def __ne__(self, other):
+ # required for python 2, cython implements it by default
+ return not (self == other)
+
def get_index(self, level, name, key):
"""
Record a partition value at a particular level, returning the distinct
@@ -867,6 +884,16 @@ def _path_split(path, sep):
EXCLUDED_PARQUET_PATHS = {'_SUCCESS'}
+def _open_dataset_file(dataset, path, meta=None):
+ if dataset.fs is None or isinstance(dataset.fs, LocalFileSystem):
+ return ParquetFile(path, metadata=meta, memory_map=dataset.memory_map,
+ common_metadata=dataset.common_metadata)
+ else:
+ return ParquetFile(dataset.fs.open(path, mode='rb'), metadata=meta,
+ memory_map=dataset.memory_map,
+ common_metadata=dataset.common_metadata)
+
+
class ParquetDataset(object):
"""
Encapsulates details of reading a complete Parquet dataset possibly
@@ -925,14 +952,13 @@ class ParquetDataset(object):
self.paths = _parse_uri(path_or_paths)
self.memory_map = memory_map
- self._open_file_func = self._get_open_file_func()
(self.pieces,
self.partitions,
self.common_metadata_path,
self.metadata_path) = _make_manifest(
path_or_paths, self.fs, metadata_nthreads=metadata_nthreads,
- open_file_func=self._open_file_func)
+ open_file_func=partial(_open_dataset_file, self))
if self.common_metadata_path is not None:
with self.fs.open(self.common_metadata_path) as f:
@@ -961,6 +987,31 @@ class ParquetDataset(object):
filters = _check_filters(filters)
self._filter(filters)
+ def equals(self, other):
+ if not isinstance(other, ParquetDataset):
+ raise TypeError('`other` must be an instance of ParquetDataset')
+
+ if self.fs.__class__ != other.fs.__class__:
+ return False
+ for prop in ('paths', 'memory_map', 'pieces', 'partitions',
+ 'common_metadata_path', 'metadata_path',
+ 'common_metadata', 'metadata', 'schema',
+ 'split_row_groups'):
+ if getattr(self, prop) != getattr(other, prop):
+ return False
+
+ return True
+
+ def __eq__(self, other):
+ try:
+ return self.equals(other)
+ except TypeError:
+ return NotImplemented
+
+ def __ne__(self, other):
+ # required for python 2, cython implements it by default
+ return not (self == other)
+
def validate_schemas(self):
if self.metadata is None and self.schema is None:
if self.common_metadata is not None:
@@ -1048,20 +1099,6 @@ class ParquetDataset(object):
keyvalues = self.common_metadata.metadata
return keyvalues.get(b'pandas', None)
- def _get_open_file_func(self):
- if self.fs is None or isinstance(self.fs, LocalFileSystem):
- def open_file(path, meta=None):
- return ParquetFile(path, metadata=meta,
- memory_map=self.memory_map,
- common_metadata=self.common_metadata)
- else:
- def open_file(path, meta=None):
- return ParquetFile(self.fs.open(path, mode='rb'),
- memory_map=self.memory_map,
- metadata=meta,
- common_metadata=self.common_metadata)
- return open_file
-
def _filter(self, filters):
accepts_filter = self.partitions.filter_accepts_partition
diff --git a/python/pyarrow/tests/test_parquet.py b/python/pyarrow/tests/test_parquet.py
index 3190b0a..f480215 100644
--- a/python/pyarrow/tests/test_parquet.py
+++ b/python/pyarrow/tests/test_parquet.py
@@ -22,6 +22,7 @@ import io
import json
import os
import six
+import pickle
import pytest
import numpy as np
@@ -2306,6 +2307,60 @@ def test_backwards_compatible_column_metadata_handling(datadir):
tm.assert_frame_equal(result, expected[['a']].reset_index(drop=True))
+def _make_dataset_for_pickling(tempdir, N=100):
+ path = tempdir / 'data.parquet'
+ fs = LocalFileSystem.get_instance()
+
+ df = pd.DataFrame({
+ 'index': np.arange(N),
+ 'values': np.random.randn(N)
+ }, columns=['index', 'values'])
+ table = pa.Table.from_pandas(df)
+
+ num_groups = 3
+ with pq.ParquetWriter(path, table.schema) as writer:
+ for i in range(num_groups):
+ writer.write_table(table)
+
+ reader = pq.ParquetFile(path)
+ assert reader.metadata.num_row_groups == num_groups
+
+ metadata_path = tempdir / '_metadata'
+ with fs.open(metadata_path, 'wb') as f:
+ pq.write_metadata(table.schema, f)
+
+ dataset = pq.ParquetDataset(tempdir, filesystem=fs)
+ assert dataset.metadata_path == str(metadata_path)
+
+ return dataset
+
+
+@pytest.mark.pandas
+@pytest.mark.parametrize('pickler', [
+ pytest.param(pickle, id='builtin'),
+ pytest.param(pytest.importorskip('cloudpickle'), id='cloudpickle')
+])
+def test_pickle_dataset(tempdir, datadir, pickler):
+ def is_pickleable(obj):
+ return obj == pickler.loads(pickler.dumps(obj))
+
+ dataset = _make_dataset_for_pickling(tempdir)
+
+ assert is_pickleable(dataset)
+ assert is_pickleable(dataset.metadata)
+ assert is_pickleable(dataset.metadata.schema)
+ assert len(dataset.metadata.schema)
+ for column in dataset.metadata.schema:
+ assert is_pickleable(column)
+
+ for piece in dataset.pieces:
+ assert is_pickleable(piece)
+ metadata = piece.get_metadata()
+ assert metadata.num_row_groups
+ for i in range(metadata.num_row_groups):
+ assert is_pickleable(metadata.row_group(i))
+
+
@pytest.mark.pandas
def test_decimal_roundtrip(tempdir):
num_values = 10