You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ks...@apache.org on 2019/04/19 11:22:52 UTC

[arrow] branch master updated: ARROW-5144: [Python] ParquetDataset and ParquetPiece not serializable

This is an automated email from the ASF dual-hosted git repository.

kszucs pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new 2b5add5  ARROW-5144: [Python] ParquetDataset and ParquetPiece not serializable
2b5add5 is described below

commit 2b5add59608a6b042f5f9e81e41cf8729a1e3a22
Author: Krisztián Szűcs <sz...@gmail.com>
AuthorDate: Fri Apr 19 13:22:33 2019 +0200

    ARROW-5144: [Python] ParquetDataset and ParquetPiece not serializable
    
    Author: Krisztián Szűcs <sz...@gmail.com>
    
    Closes #4156 from kszucs/ARROW-5144 and squashes the following commits:
    
    2bdedd069 <Krisztián Szűcs> implement not equal
    7cecfaae4 <Krisztián Szűcs> remove debug line
    d08c17ce0 <Krisztián Szűcs> more thorough testing; explicitinteger cast
    1f738e302 <Krisztián Szűcs> don't mark as pandas
    994e329ab <Krisztián Szűcs> use partial
    ea40dcc2d <Krisztián Szűcs> fix review comments
    739f183b6 <Krisztián Szűcs> skip depending on protocol
    080170ff4 <Krisztián Szűcs> support standard pickle
    69362cc28 <Krisztián Szűcs> picklable ParquetDataset
---
 python/pyarrow/_parquet.pxd          |  13 ++--
 python/pyarrow/_parquet.pyx          | 111 +++++++++++++++++++++++++++++++----
 python/pyarrow/parquet.py            |  69 +++++++++++++++++-----
 python/pyarrow/tests/test_parquet.py |  55 +++++++++++++++++
 4 files changed, 218 insertions(+), 30 deletions(-)

diff --git a/python/pyarrow/_parquet.pxd b/python/pyarrow/_parquet.pxd
index b63e72c..eedddff 100644
--- a/python/pyarrow/_parquet.pxd
+++ b/python/pyarrow/_parquet.pxd
@@ -20,7 +20,7 @@
 
 from pyarrow.includes.common cimport *
 from pyarrow.includes.libarrow cimport (CChunkedArray, CSchema, CStatus,
-                                        CTable, CMemoryPool,
+                                        CTable, CMemoryPool, CBuffer,
                                         CKeyValueMetadata,
                                         RandomAccessFile, OutputStream,
                                         TimeUnit)
@@ -225,6 +225,11 @@ cdef extern from "parquet/api/reader.h" namespace "parquet" nogil:
         unique_ptr[CRowGroupMetaData] RowGroup(int i)
         const SchemaDescriptor* schema()
         shared_ptr[const CKeyValueMetadata] key_value_metadata() const
+        void WriteTo(ParquetOutputStream* dst) const
+
+    cdef shared_ptr[CFileMetaData] CFileMetaData_Make \
+        " parquet::FileMetaData::Make"(const void* serialized_metadata,
+                                       uint32_t* metadata_len)
 
     cdef cppclass ReaderProperties:
         pass
@@ -247,9 +252,9 @@ cdef extern from "parquet/api/writer.h" namespace "parquet" nogil:
     cdef cppclass ParquetOutputStream" parquet::OutputStream":
         pass
 
-    cdef cppclass LocalFileOutputStream(ParquetOutputStream):
-        LocalFileOutputStream(const c_string& path)
-        void Close()
+    cdef cppclass ParquetInMemoryOutputStream \
+            " parquet::InMemoryOutputStream"(ParquetOutputStream):
+        shared_ptr[CBuffer] GetBuffer()
 
     cdef cppclass WriterProperties:
         cppclass Builder:
diff --git a/python/pyarrow/_parquet.pyx b/python/pyarrow/_parquet.pyx
index 87fe3d3..3ee174e 100644
--- a/python/pyarrow/_parquet.pyx
+++ b/python/pyarrow/_parquet.pyx
@@ -19,25 +19,27 @@
 # distutils: language = c++
 # cython: embedsignature = True
 
+import io
+import six
+import warnings
+
 from cython.operator cimport dereference as deref
 from pyarrow.includes.common cimport *
 from pyarrow.includes.libarrow cimport *
-from pyarrow.lib cimport (Array, Schema,
+from pyarrow.lib cimport (Buffer, Array, Schema,
                           check_status,
                           MemoryPool, maybe_unbox_memory_pool,
                           Table,
                           pyarrow_wrap_chunked_array,
                           pyarrow_wrap_schema,
                           pyarrow_wrap_table,
+                          pyarrow_wrap_buffer,
                           NativeFile, get_reader, get_writer)
 
 from pyarrow.compat import tobytes, frombytes
 from pyarrow.lib import ArrowException, NativeFile, _stringify_path
 from pyarrow.util import indent
 
-import six
-import warnings
-
 
 cdef class RowGroupStatistics:
     cdef:
@@ -66,6 +68,22 @@ cdef class RowGroupStatistics:
                                self.num_values,
                                self.physical_type)
 
+    def __eq__(self, other):
+        try:
+            return self.equals(other)
+        except TypeError:
+            return NotImplemented
+
+    def equals(self, RowGroupStatistics other):
+        # TODO(kszucs): implement native Equals method for RowGroupStatistics
+        return (self.has_min_max == other.has_min_max and
+                self.min == other.min and
+                self.max == other.max and
+                self.null_count == other.null_count and
+                self.distinct_count == other.distinct_count and
+                self.num_values == other.num_values and
+                self.physical_type == other.physical_type)
+
     cdef inline _cast_statistic(self, object value):
         # Input value is bytes
         cdef ParquetType physical_type = self.statistics.get().physical_type()
@@ -174,6 +192,29 @@ cdef class ColumnChunkMetaData:
                                           self.total_compressed_size,
                                           self.total_uncompressed_size)
 
+    def __eq__(self, other):
+        try:
+            return self.equals(other)
+        except TypeError:
+            return NotImplemented
+
+    def equals(self, ColumnChunkMetaData other):
+        # TODO(kszucs): implement native Equals method for CColumnChunkMetaData
+        return (self.file_offset == other.file_offset and
+                self.file_path == other.file_path and
+                self.physical_type == other.physical_type and
+                self.num_values == other.num_values and
+                self.path_in_schema == other.path_in_schema and
+                self.is_stats_set == other.is_stats_set and
+                self.statistics == other.statistics and
+                self.compression == other.compression and
+                self.encodings == other.encodings and
+                self.has_dictionary_page == other.has_dictionary_page and
+                self.dictionary_page_offset == other.dictionary_page_offset and
+                self.data_page_offset == other.data_page_offset and
+                self.total_compressed_size == other.total_compressed_size and
+                self.total_uncompressed_size == other.total_uncompressed_size)
+
     @property
     def file_offset(self):
         return self.metadata.file_offset()
@@ -249,16 +290,39 @@ cdef class ColumnChunkMetaData:
 
 cdef class RowGroupMetaData:
     cdef:
+        int index  # for pickling support
         unique_ptr[CRowGroupMetaData] up_metadata
         CRowGroupMetaData* metadata
         FileMetaData parent
 
-    def __cinit__(self, FileMetaData parent, int i):
-        if i < 0 or i >= parent.num_row_groups:
-            raise IndexError('{0} out of bounds'.format(i))
-        self.up_metadata = parent._metadata.RowGroup(i)
+    def __cinit__(self, FileMetaData parent, int index):
+        if index < 0 or index >= parent.num_row_groups:
+            raise IndexError('{0} out of bounds'.format(index))
+        self.up_metadata = parent._metadata.RowGroup(index)
         self.metadata = self.up_metadata.get()
         self.parent = parent
+        self.index = index
+
+    def __reduce__(self):
+        return RowGroupMetaData, (self.parent, self.index)
+
+    def __eq__(self, other):
+        try:
+            return self.equals(other)
+        except TypeError:
+            return NotImplemented
+
+    def equals(self, RowGroupMetaData other):
+        if not (self.num_columns == other.num_columns and
+                self.num_rows == other.num_rows and
+                self.total_byte_size == other.total_byte_size):
+            return False
+
+        for i in range(self.num_columns):
+            if self.column(i) != other.column(i):
+                return False
+
+        return True
 
     def column(self, int i):
         chunk = ColumnChunkMetaData()
@@ -287,6 +351,17 @@ cdef class RowGroupMetaData:
         return self.metadata.total_byte_size()
 
 
+def _reconstruct_filemetadata(Buffer serialized):
+    cdef:
+        FileMetaData metadata = FileMetaData.__new__(FileMetaData)
+        CBuffer *buffer = serialized.buffer.get()
+        uint32_t metadata_len = <uint32_t>buffer.size()
+
+    metadata.init(CFileMetaData_Make(buffer.data(), &metadata_len))
+
+    return metadata
+
+
 cdef class FileMetaData:
     cdef:
         shared_ptr[CFileMetaData] sp_metadata
@@ -300,6 +375,14 @@ cdef class FileMetaData:
         self.sp_metadata = metadata
         self._metadata = metadata.get()
 
+    def __reduce__(self):
+        cdef ParquetInMemoryOutputStream sink
+        with nogil:
+            self._metadata.WriteTo(&sink)
+
+        cdef Buffer buffer = pyarrow_wrap_buffer(sink.GetBuffer())
+        return _reconstruct_filemetadata, (buffer,)
+
     def __repr__(self):
         return """{0}
   created_by: {1}
@@ -406,6 +489,9 @@ cdef class ParquetSchema:
 {1}
  """.format(object.__repr__(self), '\n'.join(elements))
 
+    def __reduce__(self):
+        return ParquetSchema, (self.parent,)
+
     def __len__(self):
         return self.schema.num_columns()
 
@@ -454,12 +540,14 @@ cdef class ParquetSchema:
 
 cdef class ColumnSchema:
     cdef:
+        int index
         ParquetSchema parent
         const ColumnDescriptor* descr
 
-    def __cinit__(self, ParquetSchema schema, int i):
+    def __cinit__(self, ParquetSchema schema, int index):
         self.parent = schema
-        self.descr = schema.schema.Column(i)
+        self.index = index  # for pickling support
+        self.descr = schema.schema.Column(index)
 
     def __eq__(self, other):
         try:
@@ -467,6 +555,9 @@ cdef class ColumnSchema:
         except TypeError:
             return NotImplemented
 
+    def __reduce__(self):
+        return ColumnSchema, (self.parent, self.index)
+
     def equals(self, ColumnSchema other):
         """
         Returns True if the column schemas are equal
diff --git a/python/pyarrow/parquet.py b/python/pyarrow/parquet.py
index 7ed7818..69187bc 100644
--- a/python/pyarrow/parquet.py
+++ b/python/pyarrow/parquet.py
@@ -663,6 +663,23 @@ class ParquetPartitions(object):
     def __getitem__(self, i):
         return self.levels[i]
 
+    def equals(self, other):
+        if not isinstance(other, ParquetPartitions):
+            raise TypeError('`other` must be an instance of ParquetPartitions')
+
+        return (self.levels == other.levels and
+                self.partition_names == other.partition_names)
+
+    def __eq__(self, other):
+        try:
+            return self.equals(other)
+        except TypeError:
+            return NotImplemented
+
+    def __ne__(self, other):
+        # required for python 2, cython implements it by default
+        return not (self == other)
+
     def get_index(self, level, name, key):
         """
         Record a partition value at a particular level, returning the distinct
@@ -867,6 +884,16 @@ def _path_split(path, sep):
 EXCLUDED_PARQUET_PATHS = {'_SUCCESS'}
 
 
+def _open_dataset_file(dataset, path, meta=None):
+    if dataset.fs is None or isinstance(dataset.fs, LocalFileSystem):
+        return ParquetFile(path, metadata=meta, memory_map=dataset.memory_map,
+                           common_metadata=dataset.common_metadata)
+    else:
+        return ParquetFile(dataset.fs.open(path, mode='rb'), metadata=meta,
+                           memory_map=dataset.memory_map,
+                           common_metadata=dataset.common_metadata)
+
+
 class ParquetDataset(object):
     """
     Encapsulates details of reading a complete Parquet dataset possibly
@@ -925,14 +952,13 @@ class ParquetDataset(object):
             self.paths = _parse_uri(path_or_paths)
 
         self.memory_map = memory_map
-        self._open_file_func = self._get_open_file_func()
 
         (self.pieces,
          self.partitions,
          self.common_metadata_path,
          self.metadata_path) = _make_manifest(
              path_or_paths, self.fs, metadata_nthreads=metadata_nthreads,
-             open_file_func=self._open_file_func)
+             open_file_func=partial(_open_dataset_file, self))
 
         if self.common_metadata_path is not None:
             with self.fs.open(self.common_metadata_path) as f:
@@ -961,6 +987,31 @@ class ParquetDataset(object):
             filters = _check_filters(filters)
             self._filter(filters)
 
+    def equals(self, other):
+        if not isinstance(other, ParquetDataset):
+            raise TypeError('`other` must be an instance of ParquetDataset')
+
+        if self.fs.__class__ != other.fs.__class__:
+            return False
+        for prop in ('paths', 'memory_map', 'pieces', 'partitions',
+                     'common_metadata_path', 'metadata_path',
+                     'common_metadata', 'metadata', 'schema',
+                     'split_row_groups'):
+            if getattr(self, prop) != getattr(other, prop):
+                return False
+
+        return True
+
+    def __eq__(self, other):
+        try:
+            return self.equals(other)
+        except TypeError:
+            return NotImplemented
+
+    def __ne__(self, other):
+        # required for python 2, cython implements it by default
+        return not (self == other)
+
     def validate_schemas(self):
         if self.metadata is None and self.schema is None:
             if self.common_metadata is not None:
@@ -1048,20 +1099,6 @@ class ParquetDataset(object):
         keyvalues = self.common_metadata.metadata
         return keyvalues.get(b'pandas', None)
 
-    def _get_open_file_func(self):
-        if self.fs is None or isinstance(self.fs, LocalFileSystem):
-            def open_file(path, meta=None):
-                return ParquetFile(path, metadata=meta,
-                                   memory_map=self.memory_map,
-                                   common_metadata=self.common_metadata)
-        else:
-            def open_file(path, meta=None):
-                return ParquetFile(self.fs.open(path, mode='rb'),
-                                   memory_map=self.memory_map,
-                                   metadata=meta,
-                                   common_metadata=self.common_metadata)
-        return open_file
-
     def _filter(self, filters):
         accepts_filter = self.partitions.filter_accepts_partition
 
diff --git a/python/pyarrow/tests/test_parquet.py b/python/pyarrow/tests/test_parquet.py
index 3190b0a..f480215 100644
--- a/python/pyarrow/tests/test_parquet.py
+++ b/python/pyarrow/tests/test_parquet.py
@@ -22,6 +22,7 @@ import io
 import json
 import os
 import six
+import pickle
 import pytest
 
 import numpy as np
@@ -2306,6 +2307,60 @@ def test_backwards_compatible_column_metadata_handling(datadir):
     tm.assert_frame_equal(result, expected[['a']].reset_index(drop=True))
 
 
+def _make_dataset_for_pickling(tempdir, N=100):
+    path = tempdir / 'data.parquet'
+    fs = LocalFileSystem.get_instance()
+
+    df = pd.DataFrame({
+        'index': np.arange(N),
+        'values': np.random.randn(N)
+    }, columns=['index', 'values'])
+    table = pa.Table.from_pandas(df)
+
+    num_groups = 3
+    with pq.ParquetWriter(path, table.schema) as writer:
+        for i in range(num_groups):
+            writer.write_table(table)
+
+    reader = pq.ParquetFile(path)
+    assert reader.metadata.num_row_groups == num_groups
+
+    metadata_path = tempdir / '_metadata'
+    with fs.open(metadata_path, 'wb') as f:
+        pq.write_metadata(table.schema, f)
+
+    dataset = pq.ParquetDataset(tempdir, filesystem=fs)
+    assert dataset.metadata_path == str(metadata_path)
+
+    return dataset
+
+
+@pytest.mark.pandas
+@pytest.mark.parametrize('pickler', [
+    pytest.param(pickle, id='builtin'),
+    pytest.param(pytest.importorskip('cloudpickle'), id='cloudpickle')
+])
+def test_pickle_dataset(tempdir, datadir, pickler):
+    def is_pickleable(obj):
+        return obj == pickler.loads(pickler.dumps(obj))
+
+    dataset = _make_dataset_for_pickling(tempdir)
+
+    assert is_pickleable(dataset)
+    assert is_pickleable(dataset.metadata)
+    assert is_pickleable(dataset.metadata.schema)
+    assert len(dataset.metadata.schema)
+    for column in dataset.metadata.schema:
+        assert is_pickleable(column)
+
+    for piece in dataset.pieces:
+        assert is_pickleable(piece)
+        metadata = piece.get_metadata()
+        assert metadata.num_row_groups
+        for i in range(metadata.num_row_groups):
+            assert is_pickleable(metadata.row_group(i))
+
+
 @pytest.mark.pandas
 def test_decimal_roundtrip(tempdir):
     num_values = 10