You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ap...@apache.org on 2022/05/31 13:30:39 UTC

[arrow] branch master updated: ARROW-16272: [Python] Fix NativeFile.read1()

This is an automated email from the ASF dual-hosted git repository.

apitrou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new 31494860c2 ARROW-16272: [Python] Fix NativeFile.read1()
31494860c2 is described below

commit 31494860c23ff7fe38a748a09c58822378605477
Author: Antoine Pitrou <an...@python.org>
AuthorDate: Tue May 31 15:30:29 2022 +0200

    ARROW-16272: [Python] Fix NativeFile.read1()
    
    `read1()` should not read the entire input stream but instead return a reasonable amount of bytes, suitable for building up an internal buffer.
    
    Should fix the performance issue when using `TextIOWrapper` or `pandas.read_csv` on a S3 input file.
    
    Closes #13264 from pitrou/ARROW-16272-native-file-read1
    
    Authored-by: Antoine Pitrou <an...@python.org>
    Signed-off-by: Antoine Pitrou <an...@python.org>
---
 python/pyarrow/io.pxi           | 31 +++++++++++++++------
 python/pyarrow/tests/test_io.py | 60 +++++++++++++++++++++++++++++++++++++++++
 2 files changed, 83 insertions(+), 8 deletions(-)

diff --git a/python/pyarrow/io.pxi b/python/pyarrow/io.pxi
index cb101e27f7..d2e4f7062e 100644
--- a/python/pyarrow/io.pxi
+++ b/python/pyarrow/io.pxi
@@ -101,6 +101,10 @@ cdef class NativeFile(_Weakrefable):
     will not flush any pending data.
     """
 
+    # Default chunk size for chunked reads.
+    # Use a large enough value for networked filesystems.
+    _default_chunk_size = 256 * 1024
+
     def __cinit__(self):
         self.own_file = False
         self.is_readable = False
@@ -326,8 +330,7 @@ cdef class NativeFile(_Weakrefable):
 
     def write(self, data):
         """
-        Write byte from any object implementing buffer protocol (bytes,
-        bytearray, ndarray, pyarrow.Buffer)
+        Write data to the file.
 
         Parameters
         ----------
@@ -349,8 +352,9 @@ cdef class NativeFile(_Weakrefable):
 
     def read(self, nbytes=None):
         """
-        Read indicated number of bytes from file, or read all remaining bytes
-        if no argument passed
+        Read and return up to n bytes.
+
+        If *nbytes* is None, then the entire remaining file contents are read.
 
         Parameters
         ----------
@@ -368,7 +372,7 @@ cdef class NativeFile(_Weakrefable):
         if nbytes is None:
             if not self.is_seekable:
                 # Cannot get file size => read chunkwise
-                bs = 16384
+                bs = self._default_chunk_size
                 chunks = []
                 while True:
                     chunk = self.read(bs)
@@ -436,14 +440,25 @@ cdef class NativeFile(_Weakrefable):
     def read1(self, nbytes=None):
         """Read and return up to n bytes.
 
-        Alias for read, needed to match the BufferedIOBase interface.
+        Unlike read(), if *nbytes* is None then a chunk is read, not the
+        entire file.
 
         Parameters
         ----------
-        nbytes : int
+        nbytes : int, default None
             The maximum number of bytes to read.
+
+        Returns
+        -------
+        data : bytes
         """
-        return self.read(nbytes=None)
+        if nbytes is None:
+            # The expectation when passing `nbytes=None` is not to read the
+            # entire file but to issue a single underlying read call up to
+            # a reasonable size (the use case being to read a bufferable
+            # amount of bytes, such as with io.TextIOWrapper).
+            nbytes = self._default_chunk_size
+        return self.read(nbytes)
 
     def readall(self):
         return self.read()
diff --git a/python/pyarrow/tests/test_io.py b/python/pyarrow/tests/test_io.py
index f63e6f6a0f..f04ae23ce8 100644
--- a/python/pyarrow/tests/test_io.py
+++ b/python/pyarrow/tests/test_io.py
@@ -21,6 +21,7 @@ from io import (BytesIO, StringIO, TextIOWrapper, BufferedIOBase, IOBase)
 import itertools
 import gc
 import gzip
+import math
 import os
 import pathlib
 import pickle
@@ -1201,6 +1202,65 @@ def test_native_file_TextIOWrapper(tmpdir):
         assert res == data
 
 
+def test_native_file_TextIOWrapper_perf(tmpdir):
+    # ARROW-16272: TextIOWrapper.readline() shouldn't exhaust a large
+    # Arrow input stream.
+    data = b'foo\nquux\n'
+    path = str(tmpdir / 'largefile.txt')
+    with open(path, 'wb') as f:
+        f.write(data * 100_000)
+
+    binary_file = pa.OSFile(path, mode='rb')
+    with TextIOWrapper(binary_file) as f:
+        assert binary_file.tell() == 0
+        nbytes = 20_000
+        lines = f.readlines(nbytes)
+        assert len(lines) == math.ceil(2 * nbytes / len(data))
+        assert nbytes <= binary_file.tell() <= nbytes * 2
+
+
+def test_native_file_read1(tmpdir):
+    # ARROW-16272: read1() should not exhaust the input stream if there
+    # is a large amount of data remaining.
+    data = b'123\n' * 1_000_000
+    path = str(tmpdir / 'largefile.txt')
+    with open(path, 'wb') as f:
+        f.write(data)
+
+    chunks = []
+    with pa.OSFile(path, mode='rb') as f:
+        while True:
+            b = f.read1()
+            assert len(b) < len(data)
+            chunks.append(b)
+            b = f.read1(30_000)
+            assert len(b) <= 30_000
+            chunks.append(b)
+            if not b:
+                break
+
+    assert b"".join(chunks) == data
+
+
+@pytest.mark.pandas
+def test_native_file_pandas_text_reader(tmpdir):
+    # ARROW-16272: Pandas' read_csv() should not exhaust an Arrow
+    # input stream when a small nrows is passed.
+    import pandas as pd
+    import pandas.testing as tm
+    data = b'a,b\n' * 10_000_000
+    path = str(tmpdir / 'largefile.txt')
+    with open(path, 'wb') as f:
+        f.write(data)
+
+    with pa.OSFile(path, mode='rb') as f:
+        df = pd.read_csv(f, nrows=10)
+        expected = pd.DataFrame({'a': ['a'] * 10, 'b': ['b'] * 10})
+        tm.assert_frame_equal(df, expected)
+        # Some readahead occurred, but not up to the end of file
+        assert f.tell() <= 256 * 1024
+
+
 def test_native_file_open_error():
     with assert_file_not_found():
         pa.OSFile('non_existent_file', 'rb')