You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ap...@apache.org on 2022/05/31 13:30:39 UTC
[arrow] branch master updated: ARROW-16272: [Python] Fix NativeFile.read1()
This is an automated email from the ASF dual-hosted git repository.
apitrou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new 31494860c2 ARROW-16272: [Python] Fix NativeFile.read1()
31494860c2 is described below
commit 31494860c23ff7fe38a748a09c58822378605477
Author: Antoine Pitrou <an...@python.org>
AuthorDate: Tue May 31 15:30:29 2022 +0200
ARROW-16272: [Python] Fix NativeFile.read1()
`read1()` should not read the entire input stream but instead return a reasonable amount of bytes, suitable for building up an internal buffer.
Should fix the performance issue when using `TextIOWrapper` or `pandas.read_csv` on a S3 input file.
Closes #13264 from pitrou/ARROW-16272-native-file-read1
Authored-by: Antoine Pitrou <an...@python.org>
Signed-off-by: Antoine Pitrou <an...@python.org>
---
python/pyarrow/io.pxi | 31 +++++++++++++++------
python/pyarrow/tests/test_io.py | 60 +++++++++++++++++++++++++++++++++++++++++
2 files changed, 83 insertions(+), 8 deletions(-)
diff --git a/python/pyarrow/io.pxi b/python/pyarrow/io.pxi
index cb101e27f7..d2e4f7062e 100644
--- a/python/pyarrow/io.pxi
+++ b/python/pyarrow/io.pxi
@@ -101,6 +101,10 @@ cdef class NativeFile(_Weakrefable):
will not flush any pending data.
"""
+ # Default chunk size for chunked reads.
+ # Use a large enough value for networked filesystems.
+ _default_chunk_size = 256 * 1024
+
def __cinit__(self):
self.own_file = False
self.is_readable = False
@@ -326,8 +330,7 @@ cdef class NativeFile(_Weakrefable):
def write(self, data):
"""
- Write byte from any object implementing buffer protocol (bytes,
- bytearray, ndarray, pyarrow.Buffer)
+ Write data to the file.
Parameters
----------
@@ -349,8 +352,9 @@ cdef class NativeFile(_Weakrefable):
def read(self, nbytes=None):
"""
- Read indicated number of bytes from file, or read all remaining bytes
- if no argument passed
+ Read and return up to n bytes.
+
+ If *nbytes* is None, then the entire remaining file contents are read.
Parameters
----------
@@ -368,7 +372,7 @@ cdef class NativeFile(_Weakrefable):
if nbytes is None:
if not self.is_seekable:
# Cannot get file size => read chunkwise
- bs = 16384
+ bs = self._default_chunk_size
chunks = []
while True:
chunk = self.read(bs)
@@ -436,14 +440,25 @@ cdef class NativeFile(_Weakrefable):
def read1(self, nbytes=None):
"""Read and return up to n bytes.
- Alias for read, needed to match the BufferedIOBase interface.
+ Unlike read(), if *nbytes* is None then a chunk is read, not the
+ entire file.
Parameters
----------
- nbytes : int
+ nbytes : int, default None
The maximum number of bytes to read.
+
+ Returns
+ -------
+ data : bytes
"""
- return self.read(nbytes=None)
+ if nbytes is None:
+ # The expectation when passing `nbytes=None` is not to read the
+ # entire file but to issue a single underlying read call up to
+ # a reasonable size (the use case being to read a bufferable
+ # amount of bytes, such as with io.TextIOWrapper).
+ nbytes = self._default_chunk_size
+ return self.read(nbytes)
def readall(self):
return self.read()
diff --git a/python/pyarrow/tests/test_io.py b/python/pyarrow/tests/test_io.py
index f63e6f6a0f..f04ae23ce8 100644
--- a/python/pyarrow/tests/test_io.py
+++ b/python/pyarrow/tests/test_io.py
@@ -21,6 +21,7 @@ from io import (BytesIO, StringIO, TextIOWrapper, BufferedIOBase, IOBase)
import itertools
import gc
import gzip
+import math
import os
import pathlib
import pickle
@@ -1201,6 +1202,65 @@ def test_native_file_TextIOWrapper(tmpdir):
assert res == data
+def test_native_file_TextIOWrapper_perf(tmpdir):
+ # ARROW-16272: TextIOWrapper.readline() shouldn't exhaust a large
+ # Arrow input stream.
+ data = b'foo\nquux\n'
+ path = str(tmpdir / 'largefile.txt')
+ with open(path, 'wb') as f:
+ f.write(data * 100_000)
+
+ binary_file = pa.OSFile(path, mode='rb')
+ with TextIOWrapper(binary_file) as f:
+ assert binary_file.tell() == 0
+ nbytes = 20_000
+ lines = f.readlines(nbytes)
+ assert len(lines) == math.ceil(2 * nbytes / len(data))
+ assert nbytes <= binary_file.tell() <= nbytes * 2
+
+
+def test_native_file_read1(tmpdir):
+ # ARROW-16272: read1() should not exhaust the input stream if there
+ # is a large amount of data remaining.
+ data = b'123\n' * 1_000_000
+ path = str(tmpdir / 'largefile.txt')
+ with open(path, 'wb') as f:
+ f.write(data)
+
+ chunks = []
+ with pa.OSFile(path, mode='rb') as f:
+ while True:
+ b = f.read1()
+ assert len(b) < len(data)
+ chunks.append(b)
+ b = f.read1(30_000)
+ assert len(b) <= 30_000
+ chunks.append(b)
+ if not b:
+ break
+
+ assert b"".join(chunks) == data
+
+
+@pytest.mark.pandas
+def test_native_file_pandas_text_reader(tmpdir):
+ # ARROW-16272: Pandas' read_csv() should not exhaust an Arrow
+ # input stream when a small nrows is passed.
+ import pandas as pd
+ import pandas.testing as tm
+ data = b'a,b\n' * 10_000_000
+ path = str(tmpdir / 'largefile.txt')
+ with open(path, 'wb') as f:
+ f.write(data)
+
+ with pa.OSFile(path, mode='rb') as f:
+ df = pd.read_csv(f, nrows=10)
+ expected = pd.DataFrame({'a': ['a'] * 10, 'b': ['b'] * 10})
+ tm.assert_frame_equal(df, expected)
+ # Some readahead occurred, but not up to the end of file
+ assert f.tell() <= 256 * 1024
+
+
def test_native_file_open_error():
with assert_file_not_found():
pa.OSFile('non_existent_file', 'rb')