You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by jo...@apache.org on 2021/05/05 07:34:42 UTC
[arrow] branch master updated: ARROW-12428: [Python] Expose
pre_buffer in pyarrow.parquet
This is an automated email from the ASF dual-hosted git repository.
jorisvandenbossche pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new 3a9aea3 ARROW-12428: [Python] Expose pre_buffer in pyarrow.parquet
3a9aea3 is described below
commit 3a9aea3dd0e2a2b1c68bd9cfc141d83adb9fbc35
Author: David Li <li...@gmail.com>
AuthorDate: Wed May 5 09:32:41 2021 +0200
ARROW-12428: [Python] Expose pre_buffer in pyarrow.parquet
This allows using the new option without going through the datasets API, such as to read a single file. A simple benchmark is in the JIRA. This helps close the gap between PyArrow and fsspec read performance, as fsspec performs readahead by default.
Closes #10074 from lidavidm/arrow-12428
Lead-authored-by: David Li <li...@gmail.com>
Co-authored-by: Joris Van den Bossche <jo...@gmail.com>
Signed-off-by: Joris Van den Bossche <jo...@gmail.com>
---
python/pyarrow/_parquet.pyx | 4 ++-
python/pyarrow/parquet.py | 39 ++++++++++++++++++-----
python/pyarrow/tests/parquet/test_dataset.py | 21 ++++++++++++
python/pyarrow/tests/parquet/test_parquet_file.py | 16 ++++++++++
python/setup.py | 1 +
5 files changed, 72 insertions(+), 9 deletions(-)
diff --git a/python/pyarrow/_parquet.pyx b/python/pyarrow/_parquet.pyx
index 4b435ba..0b66ea0 100644
--- a/python/pyarrow/_parquet.pyx
+++ b/python/pyarrow/_parquet.pyx
@@ -929,7 +929,7 @@ cdef class ParquetReader(_Weakrefable):
def open(self, object source not None, bint use_memory_map=True,
read_dictionary=None, FileMetaData metadata=None,
- int buffer_size=0):
+ int buffer_size=0, bint pre_buffer=False):
cdef:
shared_ptr[CRandomAccessFile] rd_handle
shared_ptr[CFileMetaData] c_metadata
@@ -950,6 +950,8 @@ cdef class ParquetReader(_Weakrefable):
else:
raise ValueError('Buffer size must be larger than zero')
+ arrow_props.set_pre_buffer(pre_buffer)
+
self.source = source
get_reader(source, use_memory_map, &rd_handle)
diff --git a/python/pyarrow/parquet.py b/python/pyarrow/parquet.py
index 88683d9..97e431c 100644
--- a/python/pyarrow/parquet.py
+++ b/python/pyarrow/parquet.py
@@ -209,13 +209,18 @@ class ParquetFile:
buffer_size : int, default 0
If positive, perform read buffering when deserializing individual
column chunks. Otherwise IO calls are unbuffered.
+ pre_buffer : bool, default False
+ Coalesce and issue file reads in parallel to improve performance on
+ high-latency filesystems (e.g. S3). If True, Arrow will use a
+ background I/O thread pool.
"""
def __init__(self, source, metadata=None, common_metadata=None,
- read_dictionary=None, memory_map=False, buffer_size=0):
+ read_dictionary=None, memory_map=False, buffer_size=0,
+ pre_buffer=False):
self.reader = ParquetReader()
self.reader.open(source, use_memory_map=memory_map,
- buffer_size=buffer_size,
+ buffer_size=buffer_size, pre_buffer=pre_buffer,
read_dictionary=read_dictionary, metadata=metadata)
self.common_metadata = common_metadata
self._nested_paths_by_prefix = self._build_nested_paths()
@@ -1212,13 +1217,20 @@ use_legacy_dataset : bool, default True
new Arrow Dataset API). Among other things, this allows to pass
`filters` for all columns and not only the partition keys, enables
different partitioning schemes, etc.
+pre_buffer : bool, default True
+ Coalesce and issue file reads in parallel to improve performance on
+ high-latency filesystems (e.g. S3). If True, Arrow will use a
+ background I/O thread pool. This option is only supported for
+ use_legacy_dataset=False. If using a filesystem layer that itself
+ performs readahead (e.g. fsspec's S3FS), disable readahead for best
+ results.
""".format(_read_docstring_common, _DNF_filter_doc)
def __new__(cls, path_or_paths=None, filesystem=None, schema=None,
metadata=None, split_row_groups=False, validate_schema=True,
filters=None, metadata_nthreads=1, read_dictionary=None,
memory_map=False, buffer_size=0, partitioning="hive",
- use_legacy_dataset=None):
+ use_legacy_dataset=None, pre_buffer=True):
if use_legacy_dataset is None:
# if a new filesystem is passed -> default to new implementation
if isinstance(filesystem, FileSystem):
@@ -1234,6 +1246,7 @@ use_legacy_dataset : bool, default True
read_dictionary=read_dictionary,
memory_map=memory_map,
buffer_size=buffer_size,
+ pre_buffer=pre_buffer,
# unsupported keywords
schema=schema, metadata=metadata,
split_row_groups=split_row_groups,
@@ -1246,7 +1259,7 @@ use_legacy_dataset : bool, default True
metadata=None, split_row_groups=False, validate_schema=True,
filters=None, metadata_nthreads=1, read_dictionary=None,
memory_map=False, buffer_size=0, partitioning="hive",
- use_legacy_dataset=True):
+ use_legacy_dataset=True, pre_buffer=True):
if partitioning != "hive":
raise ValueError(
'Only "hive" for hive-like partitioning is supported when '
@@ -1480,7 +1493,8 @@ class _ParquetDatasetV2:
def __init__(self, path_or_paths, filesystem=None, filters=None,
partitioning="hive", read_dictionary=None, buffer_size=None,
- memory_map=False, ignore_prefixes=None, **kwargs):
+ memory_map=False, ignore_prefixes=None, pre_buffer=True,
+ **kwargs):
import pyarrow.dataset as ds
# Raise error for not supported keywords
@@ -1494,7 +1508,7 @@ class _ParquetDatasetV2:
"Dataset API".format(keyword))
# map format arguments
- read_options = {}
+ read_options = {"pre_buffer": pre_buffer}
if buffer_size:
read_options.update(use_buffered_stream=True,
buffer_size=buffer_size)
@@ -1676,6 +1690,13 @@ filters : List[Tuple] or List[List[Tuple]] or None (default)
keys and only a hive-style directory structure is supported. When
setting `use_legacy_dataset` to False, also within-file level filtering
and different partitioning schemes are supported.
+pre_buffer : bool, default True
+ Coalesce and issue file reads in parallel to improve performance on
+ high-latency filesystems (e.g. S3). If True, Arrow will use a
+ background I/O thread pool. This option is only supported for
+ use_legacy_dataset=False. If using a filesystem layer that itself
+ performs readahead (e.g. fsspec's S3FS), disable readahead for best
+ results.
{3}
@@ -1689,7 +1710,7 @@ def read_table(source, columns=None, use_threads=True, metadata=None,
use_pandas_metadata=False, memory_map=False,
read_dictionary=None, filesystem=None, filters=None,
buffer_size=0, partitioning="hive", use_legacy_dataset=False,
- ignore_prefixes=None):
+ ignore_prefixes=None, pre_buffer=True):
if not use_legacy_dataset:
if metadata is not None:
raise ValueError(
@@ -1708,6 +1729,7 @@ def read_table(source, columns=None, use_threads=True, metadata=None,
buffer_size=buffer_size,
filters=filters,
ignore_prefixes=ignore_prefixes,
+ pre_buffer=pre_buffer,
)
except ImportError:
# fall back on ParquetFile for simple cases when pyarrow.dataset
@@ -1728,7 +1750,8 @@ def read_table(source, columns=None, use_threads=True, metadata=None,
# TODO test that source is not a directory or a list
dataset = ParquetFile(
source, metadata=metadata, read_dictionary=read_dictionary,
- memory_map=memory_map, buffer_size=buffer_size)
+ memory_map=memory_map, buffer_size=buffer_size,
+ pre_buffer=pre_buffer)
return dataset.read(columns=columns, use_threads=use_threads,
use_pandas_metadata=use_pandas_metadata)
diff --git a/python/pyarrow/tests/parquet/test_dataset.py b/python/pyarrow/tests/parquet/test_dataset.py
index fce6ae5..70ea37b 100644
--- a/python/pyarrow/tests/parquet/test_dataset.py
+++ b/python/pyarrow/tests/parquet/test_dataset.py
@@ -1021,6 +1021,27 @@ def test_dataset_enable_buffered_stream(tempdir, use_legacy_dataset):
assert dataset.read().equals(table)
+@pytest.mark.pandas
+@parametrize_legacy_dataset
+def test_dataset_enable_pre_buffer(tempdir, use_legacy_dataset):
+ dirpath = tempdir / guid()
+ dirpath.mkdir()
+
+ df = _test_dataframe(10, seed=0)
+ path = dirpath / '{}.parquet'.format(0)
+ table = pa.Table.from_pandas(df)
+ _write_table(table, path, version='2.0')
+
+ for pre_buffer in (True, False):
+ dataset = pq.ParquetDataset(
+ dirpath, pre_buffer=pre_buffer,
+ use_legacy_dataset=use_legacy_dataset)
+ assert dataset.read().equals(table)
+ actual = pq.read_table(dirpath, pre_buffer=pre_buffer,
+ use_legacy_dataset=use_legacy_dataset)
+ assert actual.equals(table)
+
+
def _make_example_multifile_dataset(base_path, nfiles=10, file_nrows=5):
test_data = []
paths = []
diff --git a/python/pyarrow/tests/parquet/test_parquet_file.py b/python/pyarrow/tests/parquet/test_parquet_file.py
index 85f81a3..dc9a3bb 100644
--- a/python/pyarrow/tests/parquet/test_parquet_file.py
+++ b/python/pyarrow/tests/parquet/test_parquet_file.py
@@ -256,3 +256,19 @@ def test_iter_batches_reader(tempdir, chunk_size):
)
batch_no += 1
+
+
+@pytest.mark.pandas
+@pytest.mark.parametrize('pre_buffer', [False, True])
+def test_pre_buffer(pre_buffer):
+ N, K = 10000, 4
+ df = alltypes_sample(size=N)
+ a_table = pa.Table.from_pandas(df)
+
+ buf = io.BytesIO()
+ _write_table(a_table, buf, row_group_size=N / K,
+ compression='snappy', version='2.0')
+
+ buf.seek(0)
+ pf = pq.ParquetFile(buf, pre_buffer=pre_buffer)
+ assert pf.read().num_rows == N
diff --git a/python/setup.py b/python/setup.py
index 24d5480..fac8e0b 100755
--- a/python/setup.py
+++ b/python/setup.py
@@ -110,6 +110,7 @@ class build_ext(_build_ext):
('with-flight', None, 'build the Flight extension'),
('with-dataset', None, 'build the Dataset extension'),
('with-parquet', None, 'build the Parquet extension'),
+ ('with-s3', None, 'build the Amazon S3 extension'),
('with-static-parquet', None, 'link parquet statically'),
('with-static-boost', None, 'link boost statically'),
('with-plasma', None, 'build the Plasma extension'),