You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by jo...@apache.org on 2021/05/05 07:34:42 UTC

[arrow] branch master updated: ARROW-12428: [Python] Expose pre_buffer in pyarrow.parquet

This is an automated email from the ASF dual-hosted git repository.

jorisvandenbossche pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new 3a9aea3  ARROW-12428: [Python] Expose pre_buffer in pyarrow.parquet
3a9aea3 is described below

commit 3a9aea3dd0e2a2b1c68bd9cfc141d83adb9fbc35
Author: David Li <li...@gmail.com>
AuthorDate: Wed May 5 09:32:41 2021 +0200

    ARROW-12428: [Python] Expose pre_buffer in pyarrow.parquet
    
    This allows using the new option without going through the datasets API, such as to read a single file. A simple benchmark is in the JIRA. This helps close the gap between PyArrow and fsspec read performance, as fsspec performs readahead by default.
    
    Closes #10074 from lidavidm/arrow-12428
    
    Lead-authored-by: David Li <li...@gmail.com>
    Co-authored-by: Joris Van den Bossche <jo...@gmail.com>
    Signed-off-by: Joris Van den Bossche <jo...@gmail.com>
---
 python/pyarrow/_parquet.pyx                       |  4 ++-
 python/pyarrow/parquet.py                         | 39 ++++++++++++++++++-----
 python/pyarrow/tests/parquet/test_dataset.py      | 21 ++++++++++++
 python/pyarrow/tests/parquet/test_parquet_file.py | 16 ++++++++++
 python/setup.py                                   |  1 +
 5 files changed, 72 insertions(+), 9 deletions(-)

diff --git a/python/pyarrow/_parquet.pyx b/python/pyarrow/_parquet.pyx
index 4b435ba..0b66ea0 100644
--- a/python/pyarrow/_parquet.pyx
+++ b/python/pyarrow/_parquet.pyx
@@ -929,7 +929,7 @@ cdef class ParquetReader(_Weakrefable):
 
     def open(self, object source not None, bint use_memory_map=True,
              read_dictionary=None, FileMetaData metadata=None,
-             int buffer_size=0):
+             int buffer_size=0, bint pre_buffer=False):
         cdef:
             shared_ptr[CRandomAccessFile] rd_handle
             shared_ptr[CFileMetaData] c_metadata
@@ -950,6 +950,8 @@ cdef class ParquetReader(_Weakrefable):
         else:
             raise ValueError('Buffer size must be larger than zero')
 
+        arrow_props.set_pre_buffer(pre_buffer)
+
         self.source = source
 
         get_reader(source, use_memory_map, &rd_handle)
diff --git a/python/pyarrow/parquet.py b/python/pyarrow/parquet.py
index 88683d9..97e431c 100644
--- a/python/pyarrow/parquet.py
+++ b/python/pyarrow/parquet.py
@@ -209,13 +209,18 @@ class ParquetFile:
     buffer_size : int, default 0
         If positive, perform read buffering when deserializing individual
         column chunks. Otherwise IO calls are unbuffered.
+    pre_buffer : bool, default False
+        Coalesce and issue file reads in parallel to improve performance on
+        high-latency filesystems (e.g. S3). If True, Arrow will use a
+        background I/O thread pool.
     """
 
     def __init__(self, source, metadata=None, common_metadata=None,
-                 read_dictionary=None, memory_map=False, buffer_size=0):
+                 read_dictionary=None, memory_map=False, buffer_size=0,
+                 pre_buffer=False):
         self.reader = ParquetReader()
         self.reader.open(source, use_memory_map=memory_map,
-                         buffer_size=buffer_size,
+                         buffer_size=buffer_size, pre_buffer=pre_buffer,
                          read_dictionary=read_dictionary, metadata=metadata)
         self.common_metadata = common_metadata
         self._nested_paths_by_prefix = self._build_nested_paths()
@@ -1212,13 +1217,20 @@ use_legacy_dataset : bool, default True
     new Arrow Dataset API). Among other things, this allows to pass
     `filters` for all columns and not only the partition keys, enables
     different partitioning schemes, etc.
+pre_buffer : bool, default True
+    Coalesce and issue file reads in parallel to improve performance on
+    high-latency filesystems (e.g. S3). If True, Arrow will use a
+    background I/O thread pool. This option is only supported for
+    use_legacy_dataset=False. If using a filesystem layer that itself
+    performs readahead (e.g. fsspec's S3FS), disable readahead for best
+    results.
 """.format(_read_docstring_common, _DNF_filter_doc)
 
     def __new__(cls, path_or_paths=None, filesystem=None, schema=None,
                 metadata=None, split_row_groups=False, validate_schema=True,
                 filters=None, metadata_nthreads=1, read_dictionary=None,
                 memory_map=False, buffer_size=0, partitioning="hive",
-                use_legacy_dataset=None):
+                use_legacy_dataset=None, pre_buffer=True):
         if use_legacy_dataset is None:
             # if a new filesystem is passed -> default to new implementation
             if isinstance(filesystem, FileSystem):
@@ -1234,6 +1246,7 @@ use_legacy_dataset : bool, default True
                                      read_dictionary=read_dictionary,
                                      memory_map=memory_map,
                                      buffer_size=buffer_size,
+                                     pre_buffer=pre_buffer,
                                      # unsupported keywords
                                      schema=schema, metadata=metadata,
                                      split_row_groups=split_row_groups,
@@ -1246,7 +1259,7 @@ use_legacy_dataset : bool, default True
                  metadata=None, split_row_groups=False, validate_schema=True,
                  filters=None, metadata_nthreads=1, read_dictionary=None,
                  memory_map=False, buffer_size=0, partitioning="hive",
-                 use_legacy_dataset=True):
+                 use_legacy_dataset=True, pre_buffer=True):
         if partitioning != "hive":
             raise ValueError(
                 'Only "hive" for hive-like partitioning is supported when '
@@ -1480,7 +1493,8 @@ class _ParquetDatasetV2:
 
     def __init__(self, path_or_paths, filesystem=None, filters=None,
                  partitioning="hive", read_dictionary=None, buffer_size=None,
-                 memory_map=False, ignore_prefixes=None, **kwargs):
+                 memory_map=False, ignore_prefixes=None, pre_buffer=True,
+                 **kwargs):
         import pyarrow.dataset as ds
 
         # Raise error for not supported keywords
@@ -1494,7 +1508,7 @@ class _ParquetDatasetV2:
                     "Dataset API".format(keyword))
 
         # map format arguments
-        read_options = {}
+        read_options = {"pre_buffer": pre_buffer}
         if buffer_size:
             read_options.update(use_buffered_stream=True,
                                 buffer_size=buffer_size)
@@ -1676,6 +1690,13 @@ filters : List[Tuple] or List[List[Tuple]] or None (default)
     keys and only a hive-style directory structure is supported. When
     setting `use_legacy_dataset` to False, also within-file level filtering
     and different partitioning schemes are supported.
+pre_buffer : bool, default True
+    Coalesce and issue file reads in parallel to improve performance on
+    high-latency filesystems (e.g. S3). If True, Arrow will use a
+    background I/O thread pool. This option is only supported for
+    use_legacy_dataset=False. If using a filesystem layer that itself
+    performs readahead (e.g. fsspec's S3FS), disable readahead for best
+    results.
 
     {3}
 
@@ -1689,7 +1710,7 @@ def read_table(source, columns=None, use_threads=True, metadata=None,
                use_pandas_metadata=False, memory_map=False,
                read_dictionary=None, filesystem=None, filters=None,
                buffer_size=0, partitioning="hive", use_legacy_dataset=False,
-               ignore_prefixes=None):
+               ignore_prefixes=None, pre_buffer=True):
     if not use_legacy_dataset:
         if metadata is not None:
             raise ValueError(
@@ -1708,6 +1729,7 @@ def read_table(source, columns=None, use_threads=True, metadata=None,
                 buffer_size=buffer_size,
                 filters=filters,
                 ignore_prefixes=ignore_prefixes,
+                pre_buffer=pre_buffer,
             )
         except ImportError:
             # fall back on ParquetFile for simple cases when pyarrow.dataset
@@ -1728,7 +1750,8 @@ def read_table(source, columns=None, use_threads=True, metadata=None,
             # TODO test that source is not a directory or a list
             dataset = ParquetFile(
                 source, metadata=metadata, read_dictionary=read_dictionary,
-                memory_map=memory_map, buffer_size=buffer_size)
+                memory_map=memory_map, buffer_size=buffer_size,
+                pre_buffer=pre_buffer)
 
         return dataset.read(columns=columns, use_threads=use_threads,
                             use_pandas_metadata=use_pandas_metadata)
diff --git a/python/pyarrow/tests/parquet/test_dataset.py b/python/pyarrow/tests/parquet/test_dataset.py
index fce6ae5..70ea37b 100644
--- a/python/pyarrow/tests/parquet/test_dataset.py
+++ b/python/pyarrow/tests/parquet/test_dataset.py
@@ -1021,6 +1021,27 @@ def test_dataset_enable_buffered_stream(tempdir, use_legacy_dataset):
         assert dataset.read().equals(table)
 
 
+@pytest.mark.pandas
+@parametrize_legacy_dataset
+def test_dataset_enable_pre_buffer(tempdir, use_legacy_dataset):
+    dirpath = tempdir / guid()
+    dirpath.mkdir()
+
+    df = _test_dataframe(10, seed=0)
+    path = dirpath / '{}.parquet'.format(0)
+    table = pa.Table.from_pandas(df)
+    _write_table(table, path, version='2.0')
+
+    for pre_buffer in (True, False):
+        dataset = pq.ParquetDataset(
+            dirpath, pre_buffer=pre_buffer,
+            use_legacy_dataset=use_legacy_dataset)
+        assert dataset.read().equals(table)
+        actual = pq.read_table(dirpath, pre_buffer=pre_buffer,
+                               use_legacy_dataset=use_legacy_dataset)
+        assert actual.equals(table)
+
+
 def _make_example_multifile_dataset(base_path, nfiles=10, file_nrows=5):
     test_data = []
     paths = []
diff --git a/python/pyarrow/tests/parquet/test_parquet_file.py b/python/pyarrow/tests/parquet/test_parquet_file.py
index 85f81a3..dc9a3bb 100644
--- a/python/pyarrow/tests/parquet/test_parquet_file.py
+++ b/python/pyarrow/tests/parquet/test_parquet_file.py
@@ -256,3 +256,19 @@ def test_iter_batches_reader(tempdir, chunk_size):
         )
 
         batch_no += 1
+
+
+@pytest.mark.pandas
+@pytest.mark.parametrize('pre_buffer', [False, True])
+def test_pre_buffer(pre_buffer):
+    N, K = 10000, 4
+    df = alltypes_sample(size=N)
+    a_table = pa.Table.from_pandas(df)
+
+    buf = io.BytesIO()
+    _write_table(a_table, buf, row_group_size=N / K,
+                 compression='snappy', version='2.0')
+
+    buf.seek(0)
+    pf = pq.ParquetFile(buf, pre_buffer=pre_buffer)
+    assert pf.read().num_rows == N
diff --git a/python/setup.py b/python/setup.py
index 24d5480..fac8e0b 100755
--- a/python/setup.py
+++ b/python/setup.py
@@ -110,6 +110,7 @@ class build_ext(_build_ext):
                      ('with-flight', None, 'build the Flight extension'),
                      ('with-dataset', None, 'build the Dataset extension'),
                      ('with-parquet', None, 'build the Parquet extension'),
+                     ('with-s3', None, 'build the Amazon S3 extension'),
                      ('with-static-parquet', None, 'link parquet statically'),
                      ('with-static-boost', None, 'link boost statically'),
                      ('with-plasma', None, 'build the Plasma extension'),