You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ks...@apache.org on 2020/03/30 12:18:02 UTC

[arrow] branch master updated: ARROW-8220: [Python] Make dataset FileFormat objects serializable

This is an automated email from the ASF dual-hosted git repository.

kszucs pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new 6be085f  ARROW-8220: [Python] Make dataset FileFormat objects serializable
6be085f is described below

commit 6be085f29afd2059d151b10249e7e698dbd47438
Author: Krisztián Szűcs <sz...@gmail.com>
AuthorDate: Mon Mar 30 14:01:00 2020 +0200

    ARROW-8220: [Python] Make dataset FileFormat objects serializable
    
    Also did some refactoring for a more pleasant user API.
    
    Closes #6720 from kszucs/ARROW-8220
    
    Authored-by: Krisztián Szűcs <sz...@gmail.com>
    Signed-off-by: Krisztián Szűcs <sz...@gmail.com>
---
 cpp/src/arrow/dataset/file_parquet.h |   3 +
 python/pyarrow/_dataset.pyx          | 127 ++++++++++++++++++++++++-----------
 python/pyarrow/dataset.py            |   1 +
 python/pyarrow/tests/test_dataset.py |  46 ++++++++++++-
 4 files changed, 135 insertions(+), 42 deletions(-)

diff --git a/cpp/src/arrow/dataset/file_parquet.h b/cpp/src/arrow/dataset/file_parquet.h
index 3192d0d..a1ab047 100644
--- a/cpp/src/arrow/dataset/file_parquet.h
+++ b/cpp/src/arrow/dataset/file_parquet.h
@@ -52,6 +52,9 @@ class ARROW_DS_EXPORT ParquetFileFormat : public FileFormat {
 
   bool splittable() const override { return true; }
 
+  // Note: the default values are exposed in the python bindings and documented
+  //       in the docstrings, if any of the default values gets changed please
+  //       update there as well.
   struct ReaderOptions {
     /// \defgroup parquet-file-format-reader-properties properties which correspond to
     /// members of parquet::ReaderProperties.
diff --git a/python/pyarrow/_dataset.pyx b/python/pyarrow/_dataset.pyx
index de55002..3970fea 100644
--- a/python/pyarrow/_dataset.pyx
+++ b/python/pyarrow/_dataset.pyx
@@ -399,6 +399,12 @@ cdef class FileFormat:
                                      partition_expression.unwrap()))
         return Fragment.wrap(<shared_ptr[CFragment]> move(c_fragment))
 
+    def __eq__(self, other):
+        try:
+            return self.equals(other)
+        except TypeError:
+            return False
+
 
 cdef class Fragment:
     """Fragment of data from a Dataset."""
@@ -591,43 +597,46 @@ cdef class ParquetFileFragment(FileFragment):
                 yield Fragment.wrap(c_fragment)
 
 
-cdef class ParquetFileFormatReaderOptions:
-    cdef:
-        CParquetFileFormatReaderOptions* options
-
-    def __init__(self, ParquetFileFormat fmt):
-        self.options = &fmt.parquet_format.reader_options
-
-    @property
-    def use_buffered_stream(self):
-        """Read files through buffered input streams rather than
-        loading entire row groups at once. This may be enabled to
-        reduce memory overhead. Disabled by default."""
-        return self.options.use_buffered_stream
-
-    @use_buffered_stream.setter
-    def use_buffered_stream(self, bint value):
-        self.options.use_buffered_stream = value
-
-    @property
-    def buffer_size(self):
-        """Size of buffered stream, if enabled. Default is 8KB."""
-        return self.options.buffer_size
+cdef class ParquetReadOptions:
+    """
+    Parquet format specific options for reading.
 
-    @buffer_size.setter
-    def buffer_size(self, int value):
-        self.options.buffer_size = value
+    Parameters
+    ----------
+    use_buffered_stream : bool, default False
+        Read files through buffered input streams rather than loading entire
+        row groups at once. This may be enabled to reduce memory overhead.
+        Disabled by default.
+    buffer_size : int, default 8192
+        Size of buffered stream, if enabled. Default is 8KB.
+    dictionary_columns : list of string, default None
+        Names of columns which should be read as dictionaries.
+    """
 
-    @property
-    def dict_columns(self):
-        """Names of columns which should be read as dictionaries."""
-        return self.options.dict_columns
+    cdef public:
+        bint use_buffered_stream
+        uint32_t buffer_size
+        set dictionary_columns
+
+    def __init__(self, bint use_buffered_stream=False,
+                 uint32_t buffer_size=8192,
+                 dictionary_columns=None):
+        self.use_buffered_stream = use_buffered_stream
+        self.buffer_size = buffer_size
+        self.dictionary_columns = set(dictionary_columns or set())
+
+    def equals(self, ParquetReadOptions other):
+        return (
+            self.use_buffered_stream == other.use_buffered_stream and
+            self.buffer_size == other.buffer_size and
+            self.dictionary_columns == other.dictionary_columns
+        )
 
-    @dict_columns.setter
-    def dict_columns(self, values):
-        self.options.dict_columns.clear()
-        for value in set(values):
-            self.options.dict_columns.insert(tobytes(value))
+    def __eq__(self, other):
+        try:
+            return self.equals(other)
+        except TypeError:
+            return False
 
 
 cdef class ParquetFileFormat(FileFormat):
@@ -635,18 +644,48 @@ cdef class ParquetFileFormat(FileFormat):
     cdef:
         CParquetFileFormat* parquet_format
 
-    def __init__(self, dict reader_options=dict()):
-        self.init(<shared_ptr[CFileFormat]> make_shared[CParquetFileFormat]())
-        for name, value in reader_options.items():
-            setattr(self.reader_options, name, value)
+    def __init__(self, read_options=None):
+        cdef:
+            shared_ptr[CParquetFileFormat] wrapped
+            CParquetFileFormatReaderOptions* options
+
+        if read_options is None:
+            read_options = ParquetReadOptions()
+        elif isinstance(read_options, dict):
+            read_options = ParquetReadOptions(**read_options)
+        elif not isinstance(read_options, ParquetReadOptions):
+            raise TypeError('`read_options` must be either a dictionary or an '
+                            'instance of ParquetReadOptions')
+
+        wrapped = make_shared[CParquetFileFormat]()
+        options = &(wrapped.get().reader_options)
+        options.use_buffered_stream = read_options.use_buffered_stream
+        options.buffer_size = read_options.buffer_size
+        if read_options.dictionary_columns is not None:
+            for column in read_options.dictionary_columns:
+                options.dict_columns.insert(tobytes(column))
+
+        self.init(<shared_ptr[CFileFormat]> wrapped)
 
     cdef void init(self, const shared_ptr[CFileFormat]& sp):
         FileFormat.init(self, sp)
-        self.parquet_format = <CParquetFileFormat*> self.wrapped.get()
+        self.parquet_format = <CParquetFileFormat*> sp.get()
 
     @property
-    def reader_options(self):
-        return ParquetFileFormatReaderOptions(self)
+    def read_options(self):
+        cdef CParquetFileFormatReaderOptions* options
+        options = &self.parquet_format.reader_options
+        return ParquetReadOptions(
+            use_buffered_stream=options.use_buffered_stream,
+            buffer_size=options.buffer_size,
+            dictionary_columns={frombytes(col) for col in options.dict_columns}
+        )
+
+    def equals(self, ParquetFileFormat other):
+        return self.read_options.equals(other.read_options)
+
+    def __reduce__(self):
+        return ParquetFileFormat, (self.read_options,)
 
     def make_fragment(self, str path not None, FileSystem filesystem not None,
                       Schema schema=None, columns=None, filter=None,
@@ -685,6 +724,12 @@ cdef class IpcFileFormat(FileFormat):
     def __init__(self):
         self.init(shared_ptr[CFileFormat](new CIpcFileFormat()))
 
+    def equals(self, IpcFileFormat other):
+        return True
+
+    def __reduce__(self):
+        return IpcFileFormat, tuple()
+
 
 cdef class Partitioning:
 
diff --git a/python/pyarrow/dataset.py b/python/pyarrow/dataset.py
index 21f46e3..bc3ca22 100644
--- a/python/pyarrow/dataset.py
+++ b/python/pyarrow/dataset.py
@@ -44,6 +44,7 @@ from pyarrow._dataset import (  # noqa
     OrExpression,
     ParquetFileFormat,
     ParquetFileFragment,
+    ParquetReadOptions,
     Partitioning,
     PartitioningFactory,
     ScalarExpression,
diff --git a/python/pyarrow/tests/test_dataset.py b/python/pyarrow/tests/test_dataset.py
index 8117bfd..23172a2 100644
--- a/python/pyarrow/tests/test_dataset.py
+++ b/python/pyarrow/tests/test_dataset.py
@@ -486,6 +486,48 @@ def test_expression_ergonomics():
         field | [1]
 
 
+def test_parquet_read_options():
+    opts1 = ds.ParquetReadOptions()
+    opts2 = ds.ParquetReadOptions(buffer_size=4096,
+                                  dictionary_columns=['a', 'b'])
+    opts3 = ds.ParquetReadOptions(buffer_size=2**13, use_buffered_stream=True,
+                                  dictionary_columns={'a', 'b'})
+
+    assert opts1.use_buffered_stream is False
+    assert opts1.buffer_size == 2**13
+    assert opts1.dictionary_columns == set()
+
+    assert opts2.use_buffered_stream is False
+    assert opts2.buffer_size == 2**12
+    assert opts2.dictionary_columns == {'a', 'b'}
+
+    assert opts3.use_buffered_stream is True
+    assert opts3.buffer_size == 2**13
+    assert opts3.dictionary_columns == {'a', 'b'}
+
+    assert opts1 == opts1
+    assert opts1 != opts2
+    assert opts2 != opts3
+
+
+def test_file_format_pickling():
+    formats = [
+        ds.IpcFileFormat(),
+        ds.ParquetFileFormat(),
+        ds.ParquetFileFormat(
+            read_options=ds.ParquetReadOptions(use_buffered_stream=True)
+        ),
+        ds.ParquetFileFormat(
+            read_options={
+                'use_buffered_stream': True,
+                'buffer_size': 4096,
+            }
+        )
+    ]
+    for file_format in formats:
+        assert pickle.loads(pickle.dumps(file_format)) == file_format
+
+
 @pytest.mark.parametrize('paths_or_selector', [
     fs.FileSelector('subdir', recursive=True),
     [
@@ -499,7 +541,9 @@ def test_expression_ergonomics():
     ]
 ])
 def test_filesystem_factory(mockfs, paths_or_selector):
-    format = ds.ParquetFileFormat(reader_options=dict(dict_columns={"str"}))
+    format = ds.ParquetFileFormat(
+        read_options=ds.ParquetReadOptions(dictionary_columns={"str"})
+    )
 
     options = ds.FileSystemFactoryOptions('subdir')
     options.partitioning = ds.DirectoryPartitioning(