You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ks...@apache.org on 2020/03/30 12:18:02 UTC
[arrow] branch master updated: ARROW-8220: [Python] Make dataset
FileFormat objects serializable
This is an automated email from the ASF dual-hosted git repository.
kszucs pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new 6be085f ARROW-8220: [Python] Make dataset FileFormat objects serializable
6be085f is described below
commit 6be085f29afd2059d151b10249e7e698dbd47438
Author: Krisztián Szűcs <sz...@gmail.com>
AuthorDate: Mon Mar 30 14:01:00 2020 +0200
ARROW-8220: [Python] Make dataset FileFormat objects serializable
Also did some refactoring for a more pleasant user API.
Closes #6720 from kszucs/ARROW-8220
Authored-by: Krisztián Szűcs <sz...@gmail.com>
Signed-off-by: Krisztián Szűcs <sz...@gmail.com>
---
cpp/src/arrow/dataset/file_parquet.h | 3 +
python/pyarrow/_dataset.pyx | 127 ++++++++++++++++++++++++-----------
python/pyarrow/dataset.py | 1 +
python/pyarrow/tests/test_dataset.py | 46 ++++++++++++-
4 files changed, 135 insertions(+), 42 deletions(-)
diff --git a/cpp/src/arrow/dataset/file_parquet.h b/cpp/src/arrow/dataset/file_parquet.h
index 3192d0d..a1ab047 100644
--- a/cpp/src/arrow/dataset/file_parquet.h
+++ b/cpp/src/arrow/dataset/file_parquet.h
@@ -52,6 +52,9 @@ class ARROW_DS_EXPORT ParquetFileFormat : public FileFormat {
bool splittable() const override { return true; }
+ // Note: the default values are exposed in the python bindings and documented
+ // in the docstrings, if any of the default values gets changed please
+ // update there as well.
struct ReaderOptions {
/// \defgroup parquet-file-format-reader-properties properties which correspond to
/// members of parquet::ReaderProperties.
diff --git a/python/pyarrow/_dataset.pyx b/python/pyarrow/_dataset.pyx
index de55002..3970fea 100644
--- a/python/pyarrow/_dataset.pyx
+++ b/python/pyarrow/_dataset.pyx
@@ -399,6 +399,12 @@ cdef class FileFormat:
partition_expression.unwrap()))
return Fragment.wrap(<shared_ptr[CFragment]> move(c_fragment))
+ def __eq__(self, other):
+ try:
+ return self.equals(other)
+ except TypeError:
+ return False
+
cdef class Fragment:
"""Fragment of data from a Dataset."""
@@ -591,43 +597,46 @@ cdef class ParquetFileFragment(FileFragment):
yield Fragment.wrap(c_fragment)
-cdef class ParquetFileFormatReaderOptions:
- cdef:
- CParquetFileFormatReaderOptions* options
-
- def __init__(self, ParquetFileFormat fmt):
- self.options = &fmt.parquet_format.reader_options
-
- @property
- def use_buffered_stream(self):
- """Read files through buffered input streams rather than
- loading entire row groups at once. This may be enabled to
- reduce memory overhead. Disabled by default."""
- return self.options.use_buffered_stream
-
- @use_buffered_stream.setter
- def use_buffered_stream(self, bint value):
- self.options.use_buffered_stream = value
-
- @property
- def buffer_size(self):
- """Size of buffered stream, if enabled. Default is 8KB."""
- return self.options.buffer_size
+cdef class ParquetReadOptions:
+ """
+ Parquet format specific options for reading.
- @buffer_size.setter
- def buffer_size(self, int value):
- self.options.buffer_size = value
+ Parameters
+ ----------
+ use_buffered_stream : bool, default False
+ Read files through buffered input streams rather than loading entire
+ row groups at once. This may be enabled to reduce memory overhead.
+ Disabled by default.
+ buffer_size : int, default 8192
+ Size of buffered stream, if enabled. Default is 8KB.
+ dictionary_columns : list of string, default None
+ Names of columns which should be read as dictionaries.
+ """
- @property
- def dict_columns(self):
- """Names of columns which should be read as dictionaries."""
- return self.options.dict_columns
+ cdef public:
+ bint use_buffered_stream
+ uint32_t buffer_size
+ set dictionary_columns
+
+ def __init__(self, bint use_buffered_stream=False,
+ uint32_t buffer_size=8192,
+ dictionary_columns=None):
+ self.use_buffered_stream = use_buffered_stream
+ self.buffer_size = buffer_size
+ self.dictionary_columns = set(dictionary_columns or set())
+
+ def equals(self, ParquetReadOptions other):
+ return (
+ self.use_buffered_stream == other.use_buffered_stream and
+ self.buffer_size == other.buffer_size and
+ self.dictionary_columns == other.dictionary_columns
+ )
- @dict_columns.setter
- def dict_columns(self, values):
- self.options.dict_columns.clear()
- for value in set(values):
- self.options.dict_columns.insert(tobytes(value))
+ def __eq__(self, other):
+ try:
+ return self.equals(other)
+ except TypeError:
+ return False
cdef class ParquetFileFormat(FileFormat):
@@ -635,18 +644,48 @@ cdef class ParquetFileFormat(FileFormat):
cdef:
CParquetFileFormat* parquet_format
- def __init__(self, dict reader_options=dict()):
- self.init(<shared_ptr[CFileFormat]> make_shared[CParquetFileFormat]())
- for name, value in reader_options.items():
- setattr(self.reader_options, name, value)
+ def __init__(self, read_options=None):
+ cdef:
+ shared_ptr[CParquetFileFormat] wrapped
+ CParquetFileFormatReaderOptions* options
+
+ if read_options is None:
+ read_options = ParquetReadOptions()
+ elif isinstance(read_options, dict):
+ read_options = ParquetReadOptions(**read_options)
+ elif not isinstance(read_options, ParquetReadOptions):
+ raise TypeError('`read_options` must be either a dictionary or an '
+ 'instance of ParquetReadOptions')
+
+ wrapped = make_shared[CParquetFileFormat]()
+ options = &(wrapped.get().reader_options)
+ options.use_buffered_stream = read_options.use_buffered_stream
+ options.buffer_size = read_options.buffer_size
+ if read_options.dictionary_columns is not None:
+ for column in read_options.dictionary_columns:
+ options.dict_columns.insert(tobytes(column))
+
+ self.init(<shared_ptr[CFileFormat]> wrapped)
cdef void init(self, const shared_ptr[CFileFormat]& sp):
FileFormat.init(self, sp)
- self.parquet_format = <CParquetFileFormat*> self.wrapped.get()
+ self.parquet_format = <CParquetFileFormat*> sp.get()
@property
- def reader_options(self):
- return ParquetFileFormatReaderOptions(self)
+ def read_options(self):
+ cdef CParquetFileFormatReaderOptions* options
+ options = &self.parquet_format.reader_options
+ return ParquetReadOptions(
+ use_buffered_stream=options.use_buffered_stream,
+ buffer_size=options.buffer_size,
+ dictionary_columns={frombytes(col) for col in options.dict_columns}
+ )
+
+ def equals(self, ParquetFileFormat other):
+ return self.read_options.equals(other.read_options)
+
+ def __reduce__(self):
+ return ParquetFileFormat, (self.read_options,)
def make_fragment(self, str path not None, FileSystem filesystem not None,
Schema schema=None, columns=None, filter=None,
@@ -685,6 +724,12 @@ cdef class IpcFileFormat(FileFormat):
def __init__(self):
self.init(shared_ptr[CFileFormat](new CIpcFileFormat()))
+ def equals(self, IpcFileFormat other):
+ return True
+
+ def __reduce__(self):
+ return IpcFileFormat, tuple()
+
cdef class Partitioning:
diff --git a/python/pyarrow/dataset.py b/python/pyarrow/dataset.py
index 21f46e3..bc3ca22 100644
--- a/python/pyarrow/dataset.py
+++ b/python/pyarrow/dataset.py
@@ -44,6 +44,7 @@ from pyarrow._dataset import ( # noqa
OrExpression,
ParquetFileFormat,
ParquetFileFragment,
+ ParquetReadOptions,
Partitioning,
PartitioningFactory,
ScalarExpression,
diff --git a/python/pyarrow/tests/test_dataset.py b/python/pyarrow/tests/test_dataset.py
index 8117bfd..23172a2 100644
--- a/python/pyarrow/tests/test_dataset.py
+++ b/python/pyarrow/tests/test_dataset.py
@@ -486,6 +486,48 @@ def test_expression_ergonomics():
field | [1]
+def test_parquet_read_options():
+ opts1 = ds.ParquetReadOptions()
+ opts2 = ds.ParquetReadOptions(buffer_size=4096,
+ dictionary_columns=['a', 'b'])
+ opts3 = ds.ParquetReadOptions(buffer_size=2**13, use_buffered_stream=True,
+ dictionary_columns={'a', 'b'})
+
+ assert opts1.use_buffered_stream is False
+ assert opts1.buffer_size == 2**13
+ assert opts1.dictionary_columns == set()
+
+ assert opts2.use_buffered_stream is False
+ assert opts2.buffer_size == 2**12
+ assert opts2.dictionary_columns == {'a', 'b'}
+
+ assert opts3.use_buffered_stream is True
+ assert opts3.buffer_size == 2**13
+ assert opts3.dictionary_columns == {'a', 'b'}
+
+ assert opts1 == opts1
+ assert opts1 != opts2
+ assert opts2 != opts3
+
+
+def test_file_format_pickling():
+ formats = [
+ ds.IpcFileFormat(),
+ ds.ParquetFileFormat(),
+ ds.ParquetFileFormat(
+ read_options=ds.ParquetReadOptions(use_buffered_stream=True)
+ ),
+ ds.ParquetFileFormat(
+ read_options={
+ 'use_buffered_stream': True,
+ 'buffer_size': 4096,
+ }
+ )
+ ]
+ for file_format in formats:
+ assert pickle.loads(pickle.dumps(file_format)) == file_format
+
+
@pytest.mark.parametrize('paths_or_selector', [
fs.FileSelector('subdir', recursive=True),
[
@@ -499,7 +541,9 @@ def test_expression_ergonomics():
]
])
def test_filesystem_factory(mockfs, paths_or_selector):
- format = ds.ParquetFileFormat(reader_options=dict(dict_columns={"str"}))
+ format = ds.ParquetFileFormat(
+ read_options=ds.ParquetReadOptions(dictionary_columns={"str"})
+ )
options = ds.FileSystemFactoryOptions('subdir')
options.partitioning = ds.DirectoryPartitioning(