You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ap...@apache.org on 2018/11/07 17:11:55 UTC
[arrow] branch master updated: ARROW-3646: [Python] High-level IO
API
This is an automated email from the ASF dual-hosted git repository.
apitrou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new e75cbf9 ARROW-3646: [Python] High-level IO API
e75cbf9 is described below
commit e75cbf9a2c4e88c818a3612af26094a35a6b2256
Author: Antoine Pitrou <an...@python.org>
AuthorDate: Wed Nov 7 18:11:47 2018 +0100
ARROW-3646: [Python] High-level IO API
Add the ``pa.input_stream`` and ``pa.output_stream`` factory functions.
Author: Antoine Pitrou <an...@python.org>
Closes #2873 from pitrou/ARROW-3646-py-io-high-level-api and squashes the following commits:
fbb1780c <Antoine Pitrou> Address review comments
402f9d2a <Antoine Pitrou> ARROW-3646: High-level IO API
---
python/doc/source/api.rst | 17 ++-
python/doc/source/memory.rst | 127 +++++++++++++++-----
python/pyarrow/__init__.py | 4 +-
python/pyarrow/compat.py | 2 +
python/pyarrow/io.pxi | 253 +++++++++++++++++++++++++++++++++-------
python/pyarrow/tests/test_io.py | 220 +++++++++++++++++++++++++++++++++-
6 files changed, 541 insertions(+), 82 deletions(-)
diff --git a/python/doc/source/api.rst b/python/doc/source/api.rst
index 69534e6..caa2d65 100644
--- a/python/doc/source/api.rst
+++ b/python/doc/source/api.rst
@@ -204,8 +204,8 @@ Tensor type and Functions
.. _api.io:
-Input / Output and Shared Memory
---------------------------------
+In-Memory Buffers
+-----------------
.. autosummary::
:toctree: generated/
@@ -217,10 +217,23 @@ Input / Output and Shared Memory
foreign_buffer
Buffer
ResizableBuffer
+
+Input / Output and Shared Memory
+--------------------------------
+
+.. autosummary::
+ :toctree: generated/
+
+ input_stream
+ output_stream
BufferReader
BufferOutputStream
+ FixedSizeBufferWriter
NativeFile
+ OSFile
MemoryMappedFile
+ CompressedInputStream
+ CompressedOutputStream
memory_map
create_memory_map
PythonFile
diff --git a/python/doc/source/memory.rst b/python/doc/source/memory.rst
index 8fcf5f5..1ee81e7 100644
--- a/python/doc/source/memory.rst
+++ b/python/doc/source/memory.rst
@@ -18,6 +18,7 @@
.. currentmodule:: pyarrow
.. _io:
+========================
Memory and IO Interfaces
========================
@@ -25,8 +26,11 @@ This section will introduce you to the major concepts in PyArrow's memory
management and IO systems:
* Buffers
-* File-like and stream-like objects
* Memory pools
+* File-like and stream-like objects
+
+Referencing and Allocating Memory
+=================================
pyarrow.Buffer
--------------
@@ -70,11 +74,42 @@ required, and such conversions are also zero-copy:
memoryview(buf)
-.. _io.native_file:
-
-Native Files
+Memory Pools
------------
+All memory allocations and deallocations (like ``malloc`` and ``free`` in C)
+are tracked in an instance of ``arrow::MemoryPool``. This means that we can
+then precisely track amount of memory that has been allocated:
+
+.. ipython:: python
+
+ pa.total_allocated_bytes()
+
+PyArrow uses a default built-in memory pool, but in the future there may be
+additional memory pools (and subpools) to choose from. Let's allocate
+a resizable ``Buffer`` from the default pool:
+
+.. ipython:: python
+
+ buf = pa.allocate_buffer(1024, resizable=True)
+ pa.total_allocated_bytes()
+ buf.resize(2048)
+ pa.total_allocated_bytes()
+
+The default allocator requests memory in a minimum increment of 64 bytes. If
+the buffer is garbaged-collected, all of the memory is freed:
+
+.. ipython:: python
+
+ buf = None
+ pa.total_allocated_bytes()
+
+
+Input and Output
+================
+
+.. _io.native_file:
+
The Arrow C++ libraries have several abstract interfaces for different kinds of
IO objects:
@@ -86,8 +121,7 @@ IO objects:
In the interest of making these objects behave more like Python's built-in
``file`` objects, we have defined a :class:`~pyarrow.NativeFile` base class
-which is intended to mimic Python files and able to be used in functions where
-a Python file (such as ``file`` or ``BytesIO``) is expected.
+which implements the same API as regular Python file objects.
:class:`~pyarrow.NativeFile` has some important features which make it
preferable to using Python files with PyArrow where possible:
@@ -106,41 +140,70 @@ There are several kinds of :class:`~pyarrow.NativeFile` options available:
as a file
* :class:`~pyarrow.BufferOutputStream`, for writing data in-memory, producing a
Buffer at the end
+* :class:`~pyarrow.FixedSizeBufferWriter`, for writing data into an already
+ allocated Buffer
* :class:`~pyarrow.HdfsFile`, for reading and writing data to the Hadoop Filesystem
* :class:`~pyarrow.PythonFile`, for interfacing with Python file objects in C++
+* :class:`~pyarrow.CompressedInputStream` and
+ :class:`~pyarrow.CompressedOutputStream`, for on-the-fly compression or
+ decompression to/from another stream
-We will discuss these in the following sections after explaining memory pools.
+There are also high-level APIs to make instantiating common kinds of streams
+easier.
-Memory Pools
-------------
+High-Level API
+--------------
-All memory allocations and deallocations (like ``malloc`` and ``free`` in C)
-are tracked in an instance of ``arrow::MemoryPool``. This means that we can
-then precisely track amount of memory that has been allocated:
+Input Streams
+~~~~~~~~~~~~~
-.. ipython:: python
+The :func:`~pyarrow.input_stream` function allows creating a readable
+:class:`~pyarrow.NativeFile` from various kinds of sources.
- pa.total_allocated_bytes()
+* If passed a :class:`~pyarrow.Buffer` or a ``memoryview`` object, a
+ :class:`~pyarrow.BufferReader` will be returned:
-PyArrow uses a default built-in memory pool, but in the future there may be
-additional memory pools (and subpools) to choose from. Let's consider an
-``BufferOutputStream``, which is like a ``BytesIO``:
+ .. ipython:: python
-.. ipython:: python
+ buf = memoryview(b"some data")
+ stream = pa.input_stream(buf)
+ stream.read(4)
- stream = pa.BufferOutputStream()
- stream.write(b'foo')
- pa.total_allocated_bytes()
- for i in range(1024): stream.write(b'foo')
- pa.total_allocated_bytes()
+* If passed a string or file path, it will open the given file on disk
+ for reading, creating a :class:`~pyarrow.OSFile`. Optionally, the file
+ can be compressed: if its filename ends with a recognized extension
+ such as ``.gz``, its contents will automatically be decompressed on
+ reading.
-The default allocator requests memory in a minimum increment of 64 bytes. If
-the stream is garbaged-collected, all of the memory is freed:
+ .. ipython:: python
+
+ import gzip
+ with gzip.open('example.gz', 'wb') as f:
+ f.write(b'some data\n' * 3)
+
+ stream = pa.input_stream('example.gz')
+ stream.read()
+
+* If passed a Python file object, it will wrapped in a :class:`PythonFile`
+ such that the Arrow C++ libraries can read data from it (at the expense
+ of a slight overhead).
+
+Output Streams
+~~~~~~~~~~~~~~
+
+:func:`~pyarrow.output_stream` is the equivalent function for output streams
+and allows creating a writable :class:`~pyarrow.NativeFile`. It has the same
+features as explained above for :func:`~pyarrow.input_stream`, such as being
+able to write to buffers or do on-the-fly compression.
.. ipython:: python
- stream = None
- pa.total_allocated_bytes()
+ with pa.output_stream('example1.dat') as stream:
+ stream.write(b'some data')
+
+ f = open('example1.dat', 'rb')
+ f.read()
+
On-Disk and Memory Mapped Files
-------------------------------
@@ -151,17 +214,17 @@ write:
.. ipython:: python
- with open('example.dat', 'wb') as f:
+ with open('example2.dat', 'wb') as f:
f.write(b'some example data')
Using pyarrow's :class:`~pyarrow.OSFile` class, you can write:
.. ipython:: python
- with pa.OSFile('example2.dat', 'wb') as f:
+ with pa.OSFile('example3.dat', 'wb') as f:
f.write(b'some example data')
-For reading files, you can use ``OSFile`` or
+For reading files, you can use :class:`~pyarrow.OSFile` or
:class:`~pyarrow.MemoryMappedFile`. The difference between these is that
:class:`~pyarrow.OSFile` allocates new memory on each read, like Python file
objects. In reads from memory maps, the library constructs a buffer referencing
@@ -169,8 +232,8 @@ the mapped memory without any memory allocation or copying:
.. ipython:: python
- file_obj = pa.OSFile('example.dat')
- mmap = pa.memory_map('example.dat')
+ file_obj = pa.OSFile('example2.dat')
+ mmap = pa.memory_map('example3.dat')
file_obj.read(4)
mmap.read(4)
diff --git a/python/pyarrow/__init__.py b/python/pyarrow/__init__.py
index 558cf46..12c2285 100644
--- a/python/pyarrow/__init__.py
+++ b/python/pyarrow/__init__.py
@@ -101,17 +101,19 @@ from pyarrow.lib import (MemoryPool, LoggingMemoryPool, ProxyMemoryPool,
default_memory_pool, logging_memory_pool,
proxy_memory_pool, log_memory_allocations)
+# I/O
from pyarrow.lib import (HdfsFile, NativeFile, PythonFile,
CompressedInputStream, CompressedOutputStream,
FixedSizeBufferWriter,
BufferReader, BufferOutputStream,
OSFile, MemoryMappedFile, memory_map,
create_memory_map, have_libhdfs, have_libhdfs3,
- MockOutputStream)
+ MockOutputStream, input_stream, output_stream)
from pyarrow.lib import (ChunkedArray, Column, RecordBatch, Table,
concat_tables)
+# Exceptions
from pyarrow.lib import (ArrowException,
ArrowKeyError,
ArrowInvalid,
diff --git a/python/pyarrow/compat.py b/python/pyarrow/compat.py
index 16e8ce6..a481db0 100644
--- a/python/pyarrow/compat.py
+++ b/python/pyarrow/compat.py
@@ -78,6 +78,7 @@ if PY2:
from decimal import Decimal
unicode_type = unicode
+ file_type = file
lzip = zip
zip = itertools.izip
zip_longest = itertools.izip_longest
@@ -113,6 +114,7 @@ else:
import pickle as builtin_pickle
unicode_type = str
+ file_type = None
def lzip(*x):
return list(zip(*x))
long = int
diff --git a/python/pyarrow/io.pxi b/python/pyarrow/io.pxi
index 1c74caf..9f7dc7b 100644
--- a/python/pyarrow/io.pxi
+++ b/python/pyarrow/io.pxi
@@ -26,10 +26,11 @@ import sys
import threading
import time
import warnings
-from io import BufferedIOBase, UnsupportedOperation
+from io import BufferedIOBase, IOBase, UnsupportedOperation
from pyarrow.util import _stringify_path
-from pyarrow.compat import builtin_pickle, frombytes, tobytes, encode_file_path
+from pyarrow.compat import (
+ builtin_pickle, frombytes, tobytes, encode_file_path, file_type)
# 64K
@@ -566,26 +567,54 @@ cdef class PythonFile(NativeFile):
if mode is None:
try:
- mode = handle.mode
+ inferred_mode = handle.mode
except AttributeError:
# Not all file-like objects have a mode attribute
# (e.g. BytesIO)
try:
- mode = 'w' if handle.writable() else 'r'
+ inferred_mode = 'w' if handle.writable() else 'r'
except AttributeError:
raise ValueError("could not infer open mode for file-like "
"object %r, please pass it explicitly"
% (handle,))
- if mode.startswith('w'):
- self.set_output_stream(
- shared_ptr[OutputStream](new PyOutputStream(handle)))
- self.is_writable = True
- elif mode.startswith('r'):
+ else:
+ inferred_mode = mode
+
+ if inferred_mode.startswith('w'):
+ kind = 'w'
+ elif inferred_mode.startswith('r'):
+ kind = 'r'
+ else:
+ raise ValueError('Invalid file mode: {0}'.format(mode))
+
+ # If mode was given, check it matches the given file
+ if mode is not None:
+ if isinstance(handle, IOBase):
+ # Python 3 IO object
+ if kind == 'r':
+ if not handle.readable():
+ raise TypeError("readable file expected")
+ else:
+ if not handle.writable():
+ raise TypeError("writable file expected")
+ elif file_type is not None and isinstance(handle, file_type):
+ # Python 2 file type
+ if kind == 'r':
+ if 'r' not in handle.mode and '+' not in handle.mode:
+ raise TypeError("readable file expected")
+ else:
+ if 'w' not in handle.mode and '+' not in handle.mode:
+ raise TypeError("writable file expected")
+ # (other duck-typed file-like objects are possible)
+
+ if kind == 'r':
self.set_random_access_file(
shared_ptr[RandomAccessFile](new PyReadableFile(handle)))
self.is_readable = True
else:
- raise ValueError('Invalid file mode: {0}'.format(mode))
+ self.set_output_stream(
+ shared_ptr[OutputStream](new PyOutputStream(handle)))
+ self.is_writable = True
def truncate(self, pos=None):
self.handle.truncate(pos)
@@ -1029,17 +1058,38 @@ cdef class BufferReader(NativeFile):
Buffer buffer
def __cinit__(self, object obj):
-
- if isinstance(obj, Buffer):
- self.buffer = obj
- else:
- self.buffer = py_buffer(obj)
-
+ self.buffer = as_buffer(obj)
self.set_random_access_file(shared_ptr[RandomAccessFile](
new CBufferReader(self.buffer.buffer)))
self.is_readable = True
+cdef shared_ptr[InputStream] _make_compressed_input_stream(
+ shared_ptr[InputStream] stream,
+ CompressionType compression_type) except *:
+ cdef:
+ shared_ptr[CCompressedInputStream] compressed_stream
+ unique_ptr[CCodec] codec
+
+ check_status(CCodec.Create(compression_type, &codec))
+ check_status(CCompressedInputStream.Make(codec.get(), stream,
+ &compressed_stream))
+ return <shared_ptr[InputStream]> compressed_stream
+
+
+cdef shared_ptr[OutputStream] _make_compressed_output_stream(
+ shared_ptr[OutputStream] stream,
+ CompressionType compression_type) except *:
+ cdef:
+ shared_ptr[CCompressedOutputStream] compressed_stream
+ unique_ptr[CCodec] codec
+
+ check_status(CCodec.Create(compression_type, &codec))
+ check_status(CCompressedOutputStream.Make(codec.get(), stream,
+ &compressed_stream))
+ return <shared_ptr[OutputStream]> compressed_stream
+
+
cdef class CompressedInputStream(NativeFile):
"""
An input stream wrapper which decompresses data on the fly.
@@ -1051,22 +1101,29 @@ cdef class CompressedInputStream(NativeFile):
The compression type ("bz2", "brotli", "gzip", "lz4", "snappy"
or "zstd")
"""
- def __cinit__(self, NativeFile stream, compression):
+ def __init__(self, NativeFile stream, compression):
cdef:
CompressionType compression_type
- shared_ptr[CCompressedInputStream] compressed_stream
- unique_ptr[CCodec] codec
compression_type = _get_compression_type(compression)
if compression_type == CompressionType_UNCOMPRESSED:
raise ValueError("Invalid value for compression: %r"
% (compression,))
+ self._init(stream, compression_type)
+
+ @staticmethod
+ cdef create(NativeFile stream, CompressionType compression_type):
+ cdef:
+ CompressedInputStream self
- check_status(CCodec.Create(compression_type, &codec))
- check_status(CCompressedInputStream.Make(codec.get(),
- stream.get_input_stream(),
- &compressed_stream))
- self.set_input_stream(<shared_ptr[InputStream]> compressed_stream)
+ self = CompressedInputStream.__new__(CompressedInputStream)
+ self._init(stream, compression_type)
+ return self
+
+ cdef _init(self, NativeFile stream, CompressionType compression_type):
+ self.set_input_stream(
+ _make_compressed_input_stream(stream.get_input_stream(),
+ compression_type))
self.is_readable = True
@@ -1081,28 +1138,35 @@ cdef class CompressedOutputStream(NativeFile):
The compression type ("bz2", "brotli", "gzip", "lz4", "snappy"
or "zstd")
"""
- def __cinit__(self, NativeFile stream, compression):
+ def __init__(self, NativeFile stream, compression):
cdef:
CompressionType compression_type
- shared_ptr[CCompressedOutputStream] compressed_stream
- unique_ptr[CCodec] codec
compression_type = _get_compression_type(compression)
if compression_type == CompressionType_UNCOMPRESSED:
raise ValueError("Invalid value for compression: %r"
% (compression,))
+ self._init(stream, compression_type)
+
+ @staticmethod
+ cdef create(NativeFile stream, CompressionType compression_type):
+ cdef:
+ CompressedOutputStream self
- check_status(CCodec.Create(compression_type, &codec))
- check_status(CCompressedOutputStream.Make(codec.get(),
- stream.get_output_stream(),
- &compressed_stream))
- self.set_output_stream(<shared_ptr[OutputStream]> compressed_stream)
+ self = CompressedOutputStream.__new__(CompressedOutputStream)
+ self._init(stream, compression_type)
+ return self
+
+ cdef _init(self, NativeFile stream, CompressionType compression_type):
+ self.set_output_stream(
+ _make_compressed_output_stream(stream.get_output_stream(),
+ compression_type))
self.is_writable = True
def py_buffer(object obj):
"""
- Construct an Arrow buffer from a Python bytes object
+ Construct an Arrow buffer from a Python bytes-like or buffer-like object
"""
cdef shared_ptr[CBuffer] buf
check_status(PyBuffer.FromPyObject(obj, &buf))
@@ -1180,10 +1244,8 @@ cdef get_input_stream(object source, c_bool use_memory_map,
input_stream = nf.get_input_stream()
if compression_type != CompressionType_UNCOMPRESSED:
- check_status(CCodec.Create(compression_type, &codec))
- check_status(CCompressedInputStream.Make(codec.get(), input_stream,
- &compressed_stream))
- input_stream = <shared_ptr[InputStream]> compressed_stream
+ input_stream = _make_compressed_input_stream(input_stream,
+ compression_type)
out[0] = input_stream
@@ -1210,7 +1272,7 @@ cdef get_writer(object source, shared_ptr[OutputStream]* writer):
# ---------------------------------------------------------------------
-cdef CompressionType _get_compression_type(object name):
+cdef CompressionType _get_compression_type(object name) except *:
if name is None or name == 'uncompressed':
return CompressionType_UNCOMPRESSED
elif name == 'bz2':
@@ -1230,7 +1292,7 @@ cdef CompressionType _get_compression_type(object name):
.format(str(name)))
-cdef CompressionType _get_compression_type_by_filename(filename):
+cdef CompressionType _get_compression_type_by_filename(filename) except *:
if filename.endswith('.bz2'):
return CompressionType_BZ2
elif filename.endswith('.gz'):
@@ -1273,9 +1335,7 @@ def compress(object buf, codec='lz4', asbytes=False, memory_pool=None):
with nogil:
check_status(CCodec.Create(c_codec, &compressor))
- if not isinstance(buf, Buffer):
- buf = py_buffer(buf)
-
+ buf = as_buffer(buf)
c_buf = (<Buffer> buf).buffer.get()
cdef int64_t max_output_size = (compressor.get()
@@ -1338,9 +1398,7 @@ def decompress(object buf, decompressed_size=None, codec='lz4',
with nogil:
check_status(CCodec.Create(c_codec, &compressor))
- if not isinstance(buf, Buffer):
- buf = py_buffer(buf)
-
+ buf = as_buffer(buf)
c_buf = (<Buffer> buf).buffer.get()
if decompressed_size is None:
@@ -1363,3 +1421,108 @@ def decompress(object buf, decompressed_size=None, codec='lz4',
output_size, output_buffer))
return pybuf if asbytes else out_buf
+
+
+cdef CompressionType _stream_compression_argument(
+ compression, source_path) except *:
+ if compression == 'detect':
+ if source_path is not None:
+ return _get_compression_type_by_filename(source_path)
+ else:
+ return CompressionType_UNCOMPRESSED
+ else:
+ return _get_compression_type(compression)
+
+
+def input_stream(source, compression='detect'):
+ """
+ Create an Arrow input stream.
+
+ Parameters
+ ----------
+ source: str, Path, buffer, file-like object, ...
+ The source to open for reading
+ compression: str or None
+ The compression algorithm to use for on-the-fly decompression.
+ If "detect" and source is a file path, then compression will be
+ chosen based on the file extension.
+ If None, no compression will be applied.
+ Otherwise, a well-known algorithm name must be supplied (e.g. "gzip")
+ """
+ cdef:
+ CompressionType compression_type
+ NativeFile stream
+
+ try:
+ source_path = _stringify_path(source)
+ except TypeError:
+ source_path = None
+
+ compression_type = _stream_compression_argument(compression, source_path)
+
+ if isinstance(source, NativeFile):
+ stream = source
+ elif source_path is not None:
+ stream = OSFile(source_path, 'r')
+ elif isinstance(source, (Buffer, memoryview)):
+ stream = BufferReader(as_buffer(source))
+ elif isinstance(source, BufferedIOBase):
+ stream = PythonFile(source, 'r')
+ elif file_type is not None and isinstance(source, file_type):
+ # Python 2 file type
+ stream = PythonFile(source, 'r')
+ else:
+ raise TypeError("pa.input_stream() called with instance of '{}'"
+ .format(source.__class__))
+
+ if compression_type != CompressionType_UNCOMPRESSED:
+ stream = CompressedInputStream.create(stream, compression_type)
+
+ return stream
+
+
+def output_stream(source, compression='detect'):
+ """
+ Create an Arrow output stream.
+
+ Parameters
+ ----------
+ source: str, Path, buffer, file-like object, ...
+ The source to open for writing
+ compression: str or None
+ The compression algorithm to use for on-the-fly compression.
+ If "detect" and source is a file path, then compression will be
+ chosen based on the file extension.
+ If None, no compression will be applied.
+ Otherwise, a well-known algorithm name must be supplied (e.g. "gzip")
+ """
+ cdef:
+ CompressionType compression_type
+ NativeFile stream
+
+ try:
+ source_path = _stringify_path(source)
+ except TypeError:
+ source_path = None
+
+ compression_type = _stream_compression_argument(compression, source_path)
+
+ if isinstance(source, NativeFile):
+ stream = source
+ elif source_path is not None:
+ stream = OSFile(source_path, 'w')
+ elif isinstance(source, (Buffer, memoryview)):
+ stream = FixedSizeBufferWriter(as_buffer(source))
+ elif isinstance(source, BufferedIOBase):
+ stream = PythonFile(source, 'w')
+ elif file_type is not None and isinstance(source, file_type):
+ # Python 2 file type
+ stream = PythonFile(source, 'w')
+ else:
+ raise TypeError("pa.output_stream() called with instance of '{}'"
+ .format(source.__class__))
+
+ if compression_type != CompressionType_UNCOMPRESSED:
+ stream = CompressedOutputStream.create(stream, compression_type)
+
+ return stream
diff --git a/python/pyarrow/tests/test_io.py b/python/pyarrow/tests/test_io.py
index a648c7d..f54f03a 100644
--- a/python/pyarrow/tests/test_io.py
+++ b/python/pyarrow/tests/test_io.py
@@ -16,7 +16,7 @@
# under the License.
import bz2
-from io import (BytesIO, TextIOWrapper, BufferedIOBase, IOBase)
+from io import (BytesIO, StringIO, TextIOWrapper, BufferedIOBase, IOBase)
import gc
import gzip
import os
@@ -26,11 +26,16 @@ import sys
import tempfile
import weakref
+try:
+ import pathlib
+except ImportError:
+ import pathlib2 as pathlib
+
import numpy as np
import pandas as pd
-from pyarrow.compat import u, guid
+from pyarrow.compat import u, guid, PY2
import pyarrow as pa
@@ -1050,3 +1055,214 @@ def test_compressed_roundtrip(compression):
with pa.CompressedInputStream(raw, compression) as compressed:
got = compressed.read()
assert got == data
+
+
+# ----------------------------------------------------------------------
+# High-level API
+
+if PY2:
+ def gzip_compress(data):
+ fd, fn = tempfile.mkstemp(suffix='.gz')
+ try:
+ os.close(fd)
+ with gzip.GzipFile(fn, 'wb') as f:
+ f.write(data)
+ with open(fn, 'rb') as f:
+ return f.read()
+ finally:
+ os.unlink(fn)
+
+ def gzip_decompress(data):
+ fd, fn = tempfile.mkstemp(suffix='.gz')
+ try:
+ os.close(fd)
+ with open(fn, 'wb') as f:
+ f.write(data)
+ with gzip.GzipFile(fn, 'rb') as f:
+ return f.read()
+ finally:
+ os.unlink(fn)
+else:
+ gzip_compress = gzip.compress
+ gzip_decompress = gzip.decompress
+
+
+def test_input_stream_buffer():
+ data = b"some test data\n" * 10 + b"eof\n"
+ for arg in [pa.py_buffer(data), memoryview(data)]:
+ stream = pa.input_stream(arg)
+ assert stream.read() == data
+
+ gz_data = gzip_compress(data)
+ stream = pa.input_stream(memoryview(gz_data))
+ assert stream.read() == gz_data
+ stream = pa.input_stream(memoryview(gz_data), compression='gzip')
+ assert stream.read() == data
+
+
+def test_input_stream_file_path(tmpdir):
+ data = b"some test data\n" * 10 + b"eof\n"
+ file_path = tmpdir / 'input_stream'
+ with open(str(file_path), 'wb') as f:
+ f.write(data)
+
+ stream = pa.input_stream(file_path)
+ assert stream.read() == data
+ stream = pa.input_stream(str(file_path))
+ assert stream.read() == data
+ stream = pa.input_stream(pathlib.Path(str(file_path)))
+ assert stream.read() == data
+
+
+def test_input_stream_file_path_compressed(tmpdir):
+ data = b"some test data\n" * 10 + b"eof\n"
+ gz_data = gzip_compress(data)
+ file_path = tmpdir / 'input_stream.gz'
+ with open(str(file_path), 'wb') as f:
+ f.write(gz_data)
+
+ stream = pa.input_stream(file_path)
+ assert stream.read() == data
+ stream = pa.input_stream(str(file_path))
+ assert stream.read() == data
+ stream = pa.input_stream(pathlib.Path(str(file_path)))
+ assert stream.read() == data
+
+ stream = pa.input_stream(file_path, compression='gzip')
+ assert stream.read() == data
+ stream = pa.input_stream(file_path, compression=None)
+ assert stream.read() == gz_data
+
+
+def test_input_stream_python_file(tmpdir):
+ data = b"some test data\n" * 10 + b"eof\n"
+ bio = BytesIO(data)
+
+ stream = pa.input_stream(bio)
+ assert stream.read() == data
+
+ gz_data = gzip_compress(data)
+ bio = BytesIO(gz_data)
+ stream = pa.input_stream(bio)
+ assert stream.read() == gz_data
+ bio.seek(0)
+ stream = pa.input_stream(bio, compression='gzip')
+ assert stream.read() == data
+
+ file_path = tmpdir / 'input_stream'
+ with open(str(file_path), 'wb') as f:
+ f.write(data)
+ with open(str(file_path), 'rb') as f:
+ stream = pa.input_stream(f)
+ assert stream.read() == data
+
+
+def test_input_stream_native_file():
+ data = b"some test data\n" * 10 + b"eof\n"
+ gz_data = gzip_compress(data)
+ reader = pa.BufferReader(gz_data)
+ stream = pa.input_stream(reader)
+ assert stream is reader
+ reader = pa.BufferReader(gz_data)
+ stream = pa.input_stream(reader, compression='gzip')
+ assert stream.read() == data
+
+
+def test_input_stream_errors(tmpdir):
+ buf = memoryview(b"")
+ with pytest.raises(ValueError):
+ pa.input_stream(buf, compression="foo")
+
+ for arg in [bytearray(), StringIO()]:
+ with pytest.raises(TypeError):
+ pa.input_stream(arg)
+
+ with pytest.raises(IOError):
+ pa.input_stream("non_existent_file")
+
+ with open(str(tmpdir / 'new_file'), 'wb') as f:
+ with pytest.raises(TypeError, match="readable file expected"):
+ pa.input_stream(f)
+
+
+def test_output_stream_buffer():
+ data = b"some test data\n" * 10 + b"eof\n"
+ buf = bytearray(len(data))
+ stream = pa.output_stream(pa.py_buffer(buf))
+ stream.write(data)
+ assert buf == data
+
+ buf = bytearray(len(data))
+ stream = pa.output_stream(memoryview(buf))
+ stream.write(data)
+ assert buf == data
+
+
+def test_output_stream_file_path(tmpdir):
+ data = b"some test data\n" * 10 + b"eof\n"
+ file_path = tmpdir / 'output_stream'
+
+ def check_data(file_path, data):
+ with pa.output_stream(file_path) as stream:
+ stream.write(data)
+ with open(str(file_path), 'rb') as f:
+ assert f.read() == data
+
+ check_data(file_path, data)
+ check_data(str(file_path), data)
+ check_data(pathlib.Path(str(file_path)), data)
+
+
+def test_output_stream_file_path_compressed(tmpdir):
+ data = b"some test data\n" * 10 + b"eof\n"
+ file_path = tmpdir / 'output_stream.gz'
+
+ def check_data(file_path, data, **kwargs):
+ with pa.output_stream(file_path, **kwargs) as stream:
+ stream.write(data)
+ with open(str(file_path), 'rb') as f:
+ return f.read()
+
+ assert gzip_decompress(check_data(file_path, data)) == data
+ assert gzip_decompress(check_data(str(file_path), data)) == data
+ assert gzip_decompress(
+ check_data(pathlib.Path(str(file_path)), data)) == data
+
+ assert gzip_decompress(
+ check_data(file_path, data, compression='gzip')) == data
+ assert check_data(file_path, data, compression=None) == data
+
+
+def test_output_stream_python_file(tmpdir):
+ data = b"some test data\n" * 10 + b"eof\n"
+
+ def check_data(data, **kwargs):
+ # XXX cannot use BytesIO because stream.close() is necessary
+ # to finish writing compressed data, but it will also close the
+ # underlying BytesIO
+ fn = str(tmpdir / 'output_stream_file')
+ with open(fn, 'wb') as f:
+ with pa.output_stream(f, **kwargs) as stream:
+ stream.write(data)
+ with open(fn, 'rb') as f:
+ return f.read()
+
+ assert check_data(data) == data
+ assert gzip_decompress(check_data(data, compression='gzip')) == data
+
+
+def test_output_stream_errors(tmpdir):
+ buf = memoryview(bytearray())
+ with pytest.raises(ValueError):
+ pa.output_stream(buf, compression="foo")
+
+ for arg in [bytearray(), StringIO()]:
+ with pytest.raises(TypeError):
+ pa.output_stream(arg)
+
+ fn = str(tmpdir / 'new_file')
+ with open(fn, 'wb') as f:
+ pass
+ with open(fn, 'rb') as f:
+ with pytest.raises(TypeError, match="writable file expected"):
+ pa.output_stream(f)