You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ap...@apache.org on 2018/11/07 17:11:55 UTC

[arrow] branch master updated: ARROW-3646: [Python] High-level IO API

This is an automated email from the ASF dual-hosted git repository.

apitrou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new e75cbf9  ARROW-3646: [Python] High-level IO API
e75cbf9 is described below

commit e75cbf9a2c4e88c818a3612af26094a35a6b2256
Author: Antoine Pitrou <an...@python.org>
AuthorDate: Wed Nov 7 18:11:47 2018 +0100

    ARROW-3646: [Python] High-level IO API
    
    Add the ``pa.input_stream`` and ``pa.output_stream`` factory functions.
    
    Author: Antoine Pitrou <an...@python.org>
    
    Closes #2873 from pitrou/ARROW-3646-py-io-high-level-api and squashes the following commits:
    
    fbb1780c <Antoine Pitrou> Address review comments
    402f9d2a <Antoine Pitrou> ARROW-3646:  High-level IO API
---
 python/doc/source/api.rst       |  17 ++-
 python/doc/source/memory.rst    | 127 +++++++++++++++-----
 python/pyarrow/__init__.py      |   4 +-
 python/pyarrow/compat.py        |   2 +
 python/pyarrow/io.pxi           | 253 +++++++++++++++++++++++++++++++++-------
 python/pyarrow/tests/test_io.py | 220 +++++++++++++++++++++++++++++++++-
 6 files changed, 541 insertions(+), 82 deletions(-)

diff --git a/python/doc/source/api.rst b/python/doc/source/api.rst
index 69534e6..caa2d65 100644
--- a/python/doc/source/api.rst
+++ b/python/doc/source/api.rst
@@ -204,8 +204,8 @@ Tensor type and Functions
 
 .. _api.io:
 
-Input / Output and Shared Memory
---------------------------------
+In-Memory Buffers
+-----------------
 
 .. autosummary::
    :toctree: generated/
@@ -217,10 +217,23 @@ Input / Output and Shared Memory
    foreign_buffer
    Buffer
    ResizableBuffer
+
+Input / Output and Shared Memory
+--------------------------------
+
+.. autosummary::
+   :toctree: generated/
+
+   input_stream
+   output_stream
    BufferReader
    BufferOutputStream
+   FixedSizeBufferWriter
    NativeFile
+   OSFile
    MemoryMappedFile
+   CompressedInputStream
+   CompressedOutputStream
    memory_map
    create_memory_map
    PythonFile
diff --git a/python/doc/source/memory.rst b/python/doc/source/memory.rst
index 8fcf5f5..1ee81e7 100644
--- a/python/doc/source/memory.rst
+++ b/python/doc/source/memory.rst
@@ -18,6 +18,7 @@
 .. currentmodule:: pyarrow
 .. _io:
 
+========================
 Memory and IO Interfaces
 ========================
 
@@ -25,8 +26,11 @@ This section will introduce you to the major concepts in PyArrow's memory
 management and IO systems:
 
 * Buffers
-* File-like and stream-like objects
 * Memory pools
+* File-like and stream-like objects
+
+Referencing and Allocating Memory
+=================================
 
 pyarrow.Buffer
 --------------
@@ -70,11 +74,42 @@ required, and such conversions are also zero-copy:
 
    memoryview(buf)
 
-.. _io.native_file:
-
-Native Files
+Memory Pools
 ------------
 
+All memory allocations and deallocations (like ``malloc`` and ``free`` in C)
+are tracked in an instance of ``arrow::MemoryPool``. This means that we can
+then precisely track amount of memory that has been allocated:
+
+.. ipython:: python
+
+   pa.total_allocated_bytes()
+
+PyArrow uses a default built-in memory pool, but in the future there may be
+additional memory pools (and subpools) to choose from. Let's allocate
+a resizable ``Buffer`` from the default pool:
+
+.. ipython:: python
+
+   buf = pa.allocate_buffer(1024, resizable=True)
+   pa.total_allocated_bytes()
+   buf.resize(2048)
+   pa.total_allocated_bytes()
+
+The default allocator requests memory in a minimum increment of 64 bytes. If
+the buffer is garbaged-collected, all of the memory is freed:
+
+.. ipython:: python
+
+   buf = None
+   pa.total_allocated_bytes()
+
+
+Input and Output
+================
+
+.. _io.native_file:
+
 The Arrow C++ libraries have several abstract interfaces for different kinds of
 IO objects:
 
@@ -86,8 +121,7 @@ IO objects:
 
 In the interest of making these objects behave more like Python's built-in
 ``file`` objects, we have defined a :class:`~pyarrow.NativeFile` base class
-which is intended to mimic Python files and able to be used in functions where
-a Python file (such as ``file`` or ``BytesIO``) is expected.
+which implements the same API as regular Python file objects.
 
 :class:`~pyarrow.NativeFile` has some important features which make it
 preferable to using Python files with PyArrow where possible:
@@ -106,41 +140,70 @@ There are several kinds of :class:`~pyarrow.NativeFile` options available:
   as a file
 * :class:`~pyarrow.BufferOutputStream`, for writing data in-memory, producing a
   Buffer at the end
+* :class:`~pyarrow.FixedSizeBufferWriter`, for writing data into an already
+  allocated Buffer
 * :class:`~pyarrow.HdfsFile`, for reading and writing data to the Hadoop Filesystem
 * :class:`~pyarrow.PythonFile`, for interfacing with Python file objects in C++
+* :class:`~pyarrow.CompressedInputStream` and
+  :class:`~pyarrow.CompressedOutputStream`, for on-the-fly compression or
+  decompression to/from another stream
 
-We will discuss these in the following sections after explaining memory pools.
+There are also high-level APIs to make instantiating common kinds of streams
+easier.
 
-Memory Pools
-------------
+High-Level API
+--------------
 
-All memory allocations and deallocations (like ``malloc`` and ``free`` in C)
-are tracked in an instance of ``arrow::MemoryPool``. This means that we can
-then precisely track amount of memory that has been allocated:
+Input Streams
+~~~~~~~~~~~~~
 
-.. ipython:: python
+The :func:`~pyarrow.input_stream` function allows creating a readable
+:class:`~pyarrow.NativeFile` from various kinds of sources.
 
-   pa.total_allocated_bytes()
+* If passed a :class:`~pyarrow.Buffer` or a ``memoryview`` object, a
+  :class:`~pyarrow.BufferReader` will be returned:
 
-PyArrow uses a default built-in memory pool, but in the future there may be
-additional memory pools (and subpools) to choose from. Let's consider an
-``BufferOutputStream``, which is like a ``BytesIO``:
+   .. ipython:: python
 
-.. ipython:: python
+      buf = memoryview(b"some data")
+      stream = pa.input_stream(buf)
+      stream.read(4)
 
-   stream = pa.BufferOutputStream()
-   stream.write(b'foo')
-   pa.total_allocated_bytes()
-   for i in range(1024): stream.write(b'foo')
-   pa.total_allocated_bytes()
+* If passed a string or file path, it will open the given file on disk
+  for reading, creating a :class:`~pyarrow.OSFile`.  Optionally, the file
+  can be compressed: if its filename ends with a recognized extension
+  such as ``.gz``, its contents will automatically be decompressed on
+  reading.
 
-The default allocator requests memory in a minimum increment of 64 bytes. If
-the stream is garbaged-collected, all of the memory is freed:
+  .. ipython:: python
+
+     import gzip
+     with gzip.open('example.gz', 'wb') as f:
+         f.write(b'some data\n' * 3)
+
+     stream = pa.input_stream('example.gz')
+     stream.read()
+
+* If passed a Python file object, it will wrapped in a :class:`PythonFile`
+  such that the Arrow C++ libraries can read data from it (at the expense
+  of a slight overhead).
+
+Output Streams
+~~~~~~~~~~~~~~
+
+:func:`~pyarrow.output_stream` is the equivalent function for output streams
+and allows creating a writable :class:`~pyarrow.NativeFile`.  It has the same
+features as explained above for :func:`~pyarrow.input_stream`, such as being
+able to write to buffers or do on-the-fly compression.
 
 .. ipython:: python
 
-   stream = None
-   pa.total_allocated_bytes()
+   with pa.output_stream('example1.dat') as stream:
+       stream.write(b'some data')
+
+   f = open('example1.dat', 'rb')
+   f.read()
+
 
 On-Disk and Memory Mapped Files
 -------------------------------
@@ -151,17 +214,17 @@ write:
 
 .. ipython:: python
 
-   with open('example.dat', 'wb') as f:
+   with open('example2.dat', 'wb') as f:
        f.write(b'some example data')
 
 Using pyarrow's :class:`~pyarrow.OSFile` class, you can write:
 
 .. ipython:: python
 
-   with pa.OSFile('example2.dat', 'wb') as f:
+   with pa.OSFile('example3.dat', 'wb') as f:
        f.write(b'some example data')
 
-For reading files, you can use ``OSFile`` or
+For reading files, you can use :class:`~pyarrow.OSFile` or
 :class:`~pyarrow.MemoryMappedFile`. The difference between these is that
 :class:`~pyarrow.OSFile` allocates new memory on each read, like Python file
 objects. In reads from memory maps, the library constructs a buffer referencing
@@ -169,8 +232,8 @@ the mapped memory without any memory allocation or copying:
 
 .. ipython:: python
 
-   file_obj = pa.OSFile('example.dat')
-   mmap = pa.memory_map('example.dat')
+   file_obj = pa.OSFile('example2.dat')
+   mmap = pa.memory_map('example3.dat')
    file_obj.read(4)
    mmap.read(4)
 
diff --git a/python/pyarrow/__init__.py b/python/pyarrow/__init__.py
index 558cf46..12c2285 100644
--- a/python/pyarrow/__init__.py
+++ b/python/pyarrow/__init__.py
@@ -101,17 +101,19 @@ from pyarrow.lib import (MemoryPool, LoggingMemoryPool, ProxyMemoryPool,
                          default_memory_pool, logging_memory_pool,
                          proxy_memory_pool, log_memory_allocations)
 
+# I/O
 from pyarrow.lib import (HdfsFile, NativeFile, PythonFile,
                          CompressedInputStream, CompressedOutputStream,
                          FixedSizeBufferWriter,
                          BufferReader, BufferOutputStream,
                          OSFile, MemoryMappedFile, memory_map,
                          create_memory_map, have_libhdfs, have_libhdfs3,
-                         MockOutputStream)
+                         MockOutputStream, input_stream, output_stream)
 
 from pyarrow.lib import (ChunkedArray, Column, RecordBatch, Table,
                          concat_tables)
 
+# Exceptions
 from pyarrow.lib import (ArrowException,
                          ArrowKeyError,
                          ArrowInvalid,
diff --git a/python/pyarrow/compat.py b/python/pyarrow/compat.py
index 16e8ce6..a481db0 100644
--- a/python/pyarrow/compat.py
+++ b/python/pyarrow/compat.py
@@ -78,6 +78,7 @@ if PY2:
         from decimal import Decimal
 
     unicode_type = unicode
+    file_type = file
     lzip = zip
     zip = itertools.izip
     zip_longest = itertools.izip_longest
@@ -113,6 +114,7 @@ else:
         import pickle as builtin_pickle
 
     unicode_type = str
+    file_type = None
     def lzip(*x):
         return list(zip(*x))
     long = int
diff --git a/python/pyarrow/io.pxi b/python/pyarrow/io.pxi
index 1c74caf..9f7dc7b 100644
--- a/python/pyarrow/io.pxi
+++ b/python/pyarrow/io.pxi
@@ -26,10 +26,11 @@ import sys
 import threading
 import time
 import warnings
-from io import BufferedIOBase, UnsupportedOperation
+from io import BufferedIOBase, IOBase, UnsupportedOperation
 
 from pyarrow.util import _stringify_path
-from pyarrow.compat import builtin_pickle, frombytes, tobytes, encode_file_path
+from pyarrow.compat import (
+    builtin_pickle, frombytes, tobytes, encode_file_path, file_type)
 
 
 # 64K
@@ -566,26 +567,54 @@ cdef class PythonFile(NativeFile):
 
         if mode is None:
             try:
-                mode = handle.mode
+                inferred_mode = handle.mode
             except AttributeError:
                 # Not all file-like objects have a mode attribute
                 # (e.g. BytesIO)
                 try:
-                    mode = 'w' if handle.writable() else 'r'
+                    inferred_mode = 'w' if handle.writable() else 'r'
                 except AttributeError:
                     raise ValueError("could not infer open mode for file-like "
                                      "object %r, please pass it explicitly"
                                      % (handle,))
-        if mode.startswith('w'):
-            self.set_output_stream(
-                shared_ptr[OutputStream](new PyOutputStream(handle)))
-            self.is_writable = True
-        elif mode.startswith('r'):
+        else:
+            inferred_mode = mode
+
+        if inferred_mode.startswith('w'):
+            kind = 'w'
+        elif inferred_mode.startswith('r'):
+            kind = 'r'
+        else:
+            raise ValueError('Invalid file mode: {0}'.format(mode))
+
+        # If mode was given, check it matches the given file
+        if mode is not None:
+            if isinstance(handle, IOBase):
+                # Python 3 IO object
+                if kind == 'r':
+                    if not handle.readable():
+                        raise TypeError("readable file expected")
+                else:
+                    if not handle.writable():
+                        raise TypeError("writable file expected")
+            elif file_type is not None and isinstance(handle, file_type):
+                # Python 2 file type
+                if kind == 'r':
+                    if 'r' not in handle.mode and '+' not in handle.mode:
+                        raise TypeError("readable file expected")
+                else:
+                    if 'w' not in handle.mode and '+' not in handle.mode:
+                        raise TypeError("writable file expected")
+            # (other duck-typed file-like objects are possible)
+
+        if kind == 'r':
             self.set_random_access_file(
                 shared_ptr[RandomAccessFile](new PyReadableFile(handle)))
             self.is_readable = True
         else:
-            raise ValueError('Invalid file mode: {0}'.format(mode))
+            self.set_output_stream(
+                shared_ptr[OutputStream](new PyOutputStream(handle)))
+            self.is_writable = True
 
     def truncate(self, pos=None):
         self.handle.truncate(pos)
@@ -1029,17 +1058,38 @@ cdef class BufferReader(NativeFile):
         Buffer buffer
 
     def __cinit__(self, object obj):
-
-        if isinstance(obj, Buffer):
-            self.buffer = obj
-        else:
-            self.buffer = py_buffer(obj)
-
+        self.buffer = as_buffer(obj)
         self.set_random_access_file(shared_ptr[RandomAccessFile](
             new CBufferReader(self.buffer.buffer)))
         self.is_readable = True
 
 
+cdef shared_ptr[InputStream] _make_compressed_input_stream(
+        shared_ptr[InputStream] stream,
+        CompressionType compression_type) except *:
+    cdef:
+        shared_ptr[CCompressedInputStream] compressed_stream
+        unique_ptr[CCodec] codec
+
+    check_status(CCodec.Create(compression_type, &codec))
+    check_status(CCompressedInputStream.Make(codec.get(), stream,
+                                             &compressed_stream))
+    return <shared_ptr[InputStream]> compressed_stream
+
+
+cdef shared_ptr[OutputStream] _make_compressed_output_stream(
+        shared_ptr[OutputStream] stream,
+        CompressionType compression_type) except *:
+    cdef:
+        shared_ptr[CCompressedOutputStream] compressed_stream
+        unique_ptr[CCodec] codec
+
+    check_status(CCodec.Create(compression_type, &codec))
+    check_status(CCompressedOutputStream.Make(codec.get(), stream,
+                                              &compressed_stream))
+    return <shared_ptr[OutputStream]> compressed_stream
+
+
 cdef class CompressedInputStream(NativeFile):
     """
     An input stream wrapper which decompresses data on the fly.
@@ -1051,22 +1101,29 @@ cdef class CompressedInputStream(NativeFile):
         The compression type ("bz2", "brotli", "gzip", "lz4", "snappy"
         or "zstd")
     """
-    def __cinit__(self, NativeFile stream, compression):
+    def __init__(self, NativeFile stream, compression):
         cdef:
             CompressionType compression_type
-            shared_ptr[CCompressedInputStream] compressed_stream
-            unique_ptr[CCodec] codec
 
         compression_type = _get_compression_type(compression)
         if compression_type == CompressionType_UNCOMPRESSED:
             raise ValueError("Invalid value for compression: %r"
                              % (compression,))
+        self._init(stream, compression_type)
+
+    @staticmethod
+    cdef create(NativeFile stream, CompressionType compression_type):
+        cdef:
+            CompressedInputStream self
 
-        check_status(CCodec.Create(compression_type, &codec))
-        check_status(CCompressedInputStream.Make(codec.get(),
-                                                 stream.get_input_stream(),
-                                                 &compressed_stream))
-        self.set_input_stream(<shared_ptr[InputStream]> compressed_stream)
+        self = CompressedInputStream.__new__(CompressedInputStream)
+        self._init(stream, compression_type)
+        return self
+
+    cdef _init(self, NativeFile stream, CompressionType compression_type):
+        self.set_input_stream(
+            _make_compressed_input_stream(stream.get_input_stream(),
+                                          compression_type))
         self.is_readable = True
 
 
@@ -1081,28 +1138,35 @@ cdef class CompressedOutputStream(NativeFile):
         The compression type ("bz2", "brotli", "gzip", "lz4", "snappy"
         or "zstd")
     """
-    def __cinit__(self, NativeFile stream, compression):
+    def __init__(self, NativeFile stream, compression):
         cdef:
             CompressionType compression_type
-            shared_ptr[CCompressedOutputStream] compressed_stream
-            unique_ptr[CCodec] codec
 
         compression_type = _get_compression_type(compression)
         if compression_type == CompressionType_UNCOMPRESSED:
             raise ValueError("Invalid value for compression: %r"
                              % (compression,))
+        self._init(stream, compression_type)
+
+    @staticmethod
+    cdef create(NativeFile stream, CompressionType compression_type):
+        cdef:
+            CompressedOutputStream self
 
-        check_status(CCodec.Create(compression_type, &codec))
-        check_status(CCompressedOutputStream.Make(codec.get(),
-                                                  stream.get_output_stream(),
-                                                  &compressed_stream))
-        self.set_output_stream(<shared_ptr[OutputStream]> compressed_stream)
+        self = CompressedOutputStream.__new__(CompressedOutputStream)
+        self._init(stream, compression_type)
+        return self
+
+    cdef _init(self, NativeFile stream, CompressionType compression_type):
+        self.set_output_stream(
+            _make_compressed_output_stream(stream.get_output_stream(),
+                                           compression_type))
         self.is_writable = True
 
 
 def py_buffer(object obj):
     """
-    Construct an Arrow buffer from a Python bytes object
+    Construct an Arrow buffer from a Python bytes-like or buffer-like object
     """
     cdef shared_ptr[CBuffer] buf
     check_status(PyBuffer.FromPyObject(obj, &buf))
@@ -1180,10 +1244,8 @@ cdef get_input_stream(object source, c_bool use_memory_map,
     input_stream = nf.get_input_stream()
 
     if compression_type != CompressionType_UNCOMPRESSED:
-        check_status(CCodec.Create(compression_type, &codec))
-        check_status(CCompressedInputStream.Make(codec.get(), input_stream,
-                                                 &compressed_stream))
-        input_stream = <shared_ptr[InputStream]> compressed_stream
+        input_stream = _make_compressed_input_stream(input_stream,
+                                                     compression_type)
 
     out[0] = input_stream
 
@@ -1210,7 +1272,7 @@ cdef get_writer(object source, shared_ptr[OutputStream]* writer):
 
 # ---------------------------------------------------------------------
 
-cdef CompressionType _get_compression_type(object name):
+cdef CompressionType _get_compression_type(object name) except *:
     if name is None or name == 'uncompressed':
         return CompressionType_UNCOMPRESSED
     elif name == 'bz2':
@@ -1230,7 +1292,7 @@ cdef CompressionType _get_compression_type(object name):
                          .format(str(name)))
 
 
-cdef CompressionType _get_compression_type_by_filename(filename):
+cdef CompressionType _get_compression_type_by_filename(filename) except *:
     if filename.endswith('.bz2'):
         return CompressionType_BZ2
     elif filename.endswith('.gz'):
@@ -1273,9 +1335,7 @@ def compress(object buf, codec='lz4', asbytes=False, memory_pool=None):
     with nogil:
         check_status(CCodec.Create(c_codec, &compressor))
 
-    if not isinstance(buf, Buffer):
-        buf = py_buffer(buf)
-
+    buf = as_buffer(buf)
     c_buf = (<Buffer> buf).buffer.get()
 
     cdef int64_t max_output_size = (compressor.get()
@@ -1338,9 +1398,7 @@ def decompress(object buf, decompressed_size=None, codec='lz4',
     with nogil:
         check_status(CCodec.Create(c_codec, &compressor))
 
-    if not isinstance(buf, Buffer):
-        buf = py_buffer(buf)
-
+    buf = as_buffer(buf)
     c_buf = (<Buffer> buf).buffer.get()
 
     if decompressed_size is None:
@@ -1363,3 +1421,108 @@ def decompress(object buf, decompressed_size=None, codec='lz4',
                                  output_size, output_buffer))
 
     return pybuf if asbytes else out_buf
+
+
+cdef CompressionType _stream_compression_argument(
+        compression, source_path) except *:
+    if compression == 'detect':
+        if source_path is not None:
+            return _get_compression_type_by_filename(source_path)
+        else:
+            return CompressionType_UNCOMPRESSED
+    else:
+        return _get_compression_type(compression)
+
+
+def input_stream(source, compression='detect'):
+    """
+    Create an Arrow input stream.
+
+    Parameters
+    ----------
+    source: str, Path, buffer, file-like object, ...
+        The source to open for reading
+    compression: str or None
+        The compression algorithm to use for on-the-fly decompression.
+        If "detect" and source is a file path, then compression will be
+        chosen based on the file extension.
+        If None, no compression will be applied.
+        Otherwise, a well-known algorithm name must be supplied (e.g. "gzip")
+    """
+    cdef:
+        CompressionType compression_type
+        NativeFile stream
+
+    try:
+        source_path = _stringify_path(source)
+    except TypeError:
+        source_path = None
+
+    compression_type = _stream_compression_argument(compression, source_path)
+
+    if isinstance(source, NativeFile):
+        stream = source
+    elif source_path is not None:
+        stream = OSFile(source_path, 'r')
+    elif isinstance(source, (Buffer, memoryview)):
+        stream = BufferReader(as_buffer(source))
+    elif isinstance(source, BufferedIOBase):
+        stream = PythonFile(source, 'r')
+    elif file_type is not None and isinstance(source, file_type):
+        # Python 2 file type
+        stream = PythonFile(source, 'r')
+    else:
+        raise TypeError("pa.input_stream() called with instance of '{}'"
+                        .format(source.__class__))
+
+    if compression_type != CompressionType_UNCOMPRESSED:
+        stream = CompressedInputStream.create(stream, compression_type)
+
+    return stream
+
+
+def output_stream(source, compression='detect'):
+    """
+    Create an Arrow output stream.
+
+    Parameters
+    ----------
+    source: str, Path, buffer, file-like object, ...
+        The source to open for writing
+    compression: str or None
+        The compression algorithm to use for on-the-fly compression.
+        If "detect" and source is a file path, then compression will be
+        chosen based on the file extension.
+        If None, no compression will be applied.
+        Otherwise, a well-known algorithm name must be supplied (e.g. "gzip")
+    """
+    cdef:
+        CompressionType compression_type
+        NativeFile stream
+
+    try:
+        source_path = _stringify_path(source)
+    except TypeError:
+        source_path = None
+
+    compression_type = _stream_compression_argument(compression, source_path)
+
+    if isinstance(source, NativeFile):
+        stream = source
+    elif source_path is not None:
+        stream = OSFile(source_path, 'w')
+    elif isinstance(source, (Buffer, memoryview)):
+        stream = FixedSizeBufferWriter(as_buffer(source))
+    elif isinstance(source, BufferedIOBase):
+        stream = PythonFile(source, 'w')
+    elif file_type is not None and isinstance(source, file_type):
+        # Python 2 file type
+        stream = PythonFile(source, 'w')
+    else:
+        raise TypeError("pa.output_stream() called with instance of '{}'"
+                        .format(source.__class__))
+
+    if compression_type != CompressionType_UNCOMPRESSED:
+        stream = CompressedOutputStream.create(stream, compression_type)
+
+    return stream
diff --git a/python/pyarrow/tests/test_io.py b/python/pyarrow/tests/test_io.py
index a648c7d..f54f03a 100644
--- a/python/pyarrow/tests/test_io.py
+++ b/python/pyarrow/tests/test_io.py
@@ -16,7 +16,7 @@
 # under the License.
 
 import bz2
-from io import (BytesIO, TextIOWrapper, BufferedIOBase, IOBase)
+from io import (BytesIO, StringIO, TextIOWrapper, BufferedIOBase, IOBase)
 import gc
 import gzip
 import os
@@ -26,11 +26,16 @@ import sys
 import tempfile
 import weakref
 
+try:
+    import pathlib
+except ImportError:
+    import pathlib2 as pathlib
+
 import numpy as np
 
 import pandas as pd
 
-from pyarrow.compat import u, guid
+from pyarrow.compat import u, guid, PY2
 import pyarrow as pa
 
 
@@ -1050,3 +1055,214 @@ def test_compressed_roundtrip(compression):
     with pa.CompressedInputStream(raw, compression) as compressed:
         got = compressed.read()
         assert got == data
+
+
+# ----------------------------------------------------------------------
+# High-level API
+
+if PY2:
+    def gzip_compress(data):
+        fd, fn = tempfile.mkstemp(suffix='.gz')
+        try:
+            os.close(fd)
+            with gzip.GzipFile(fn, 'wb') as f:
+                f.write(data)
+            with open(fn, 'rb') as f:
+                return f.read()
+        finally:
+            os.unlink(fn)
+
+    def gzip_decompress(data):
+        fd, fn = tempfile.mkstemp(suffix='.gz')
+        try:
+            os.close(fd)
+            with open(fn, 'wb') as f:
+                f.write(data)
+            with gzip.GzipFile(fn, 'rb') as f:
+                return f.read()
+        finally:
+            os.unlink(fn)
+else:
+    gzip_compress = gzip.compress
+    gzip_decompress = gzip.decompress
+
+
+def test_input_stream_buffer():
+    data = b"some test data\n" * 10 + b"eof\n"
+    for arg in [pa.py_buffer(data), memoryview(data)]:
+        stream = pa.input_stream(arg)
+        assert stream.read() == data
+
+    gz_data = gzip_compress(data)
+    stream = pa.input_stream(memoryview(gz_data))
+    assert stream.read() == gz_data
+    stream = pa.input_stream(memoryview(gz_data), compression='gzip')
+    assert stream.read() == data
+
+
+def test_input_stream_file_path(tmpdir):
+    data = b"some test data\n" * 10 + b"eof\n"
+    file_path = tmpdir / 'input_stream'
+    with open(str(file_path), 'wb') as f:
+        f.write(data)
+
+    stream = pa.input_stream(file_path)
+    assert stream.read() == data
+    stream = pa.input_stream(str(file_path))
+    assert stream.read() == data
+    stream = pa.input_stream(pathlib.Path(str(file_path)))
+    assert stream.read() == data
+
+
+def test_input_stream_file_path_compressed(tmpdir):
+    data = b"some test data\n" * 10 + b"eof\n"
+    gz_data = gzip_compress(data)
+    file_path = tmpdir / 'input_stream.gz'
+    with open(str(file_path), 'wb') as f:
+        f.write(gz_data)
+
+    stream = pa.input_stream(file_path)
+    assert stream.read() == data
+    stream = pa.input_stream(str(file_path))
+    assert stream.read() == data
+    stream = pa.input_stream(pathlib.Path(str(file_path)))
+    assert stream.read() == data
+
+    stream = pa.input_stream(file_path, compression='gzip')
+    assert stream.read() == data
+    stream = pa.input_stream(file_path, compression=None)
+    assert stream.read() == gz_data
+
+
+def test_input_stream_python_file(tmpdir):
+    data = b"some test data\n" * 10 + b"eof\n"
+    bio = BytesIO(data)
+
+    stream = pa.input_stream(bio)
+    assert stream.read() == data
+
+    gz_data = gzip_compress(data)
+    bio = BytesIO(gz_data)
+    stream = pa.input_stream(bio)
+    assert stream.read() == gz_data
+    bio.seek(0)
+    stream = pa.input_stream(bio, compression='gzip')
+    assert stream.read() == data
+
+    file_path = tmpdir / 'input_stream'
+    with open(str(file_path), 'wb') as f:
+        f.write(data)
+    with open(str(file_path), 'rb') as f:
+        stream = pa.input_stream(f)
+        assert stream.read() == data
+
+
+def test_input_stream_native_file():
+    data = b"some test data\n" * 10 + b"eof\n"
+    gz_data = gzip_compress(data)
+    reader = pa.BufferReader(gz_data)
+    stream = pa.input_stream(reader)
+    assert stream is reader
+    reader = pa.BufferReader(gz_data)
+    stream = pa.input_stream(reader, compression='gzip')
+    assert stream.read() == data
+
+
+def test_input_stream_errors(tmpdir):
+    buf = memoryview(b"")
+    with pytest.raises(ValueError):
+        pa.input_stream(buf, compression="foo")
+
+    for arg in [bytearray(), StringIO()]:
+        with pytest.raises(TypeError):
+            pa.input_stream(arg)
+
+    with pytest.raises(IOError):
+        pa.input_stream("non_existent_file")
+
+    with open(str(tmpdir / 'new_file'), 'wb') as f:
+        with pytest.raises(TypeError, match="readable file expected"):
+            pa.input_stream(f)
+
+
+def test_output_stream_buffer():
+    data = b"some test data\n" * 10 + b"eof\n"
+    buf = bytearray(len(data))
+    stream = pa.output_stream(pa.py_buffer(buf))
+    stream.write(data)
+    assert buf == data
+
+    buf = bytearray(len(data))
+    stream = pa.output_stream(memoryview(buf))
+    stream.write(data)
+    assert buf == data
+
+
+def test_output_stream_file_path(tmpdir):
+    data = b"some test data\n" * 10 + b"eof\n"
+    file_path = tmpdir / 'output_stream'
+
+    def check_data(file_path, data):
+        with pa.output_stream(file_path) as stream:
+            stream.write(data)
+        with open(str(file_path), 'rb') as f:
+            assert f.read() == data
+
+    check_data(file_path, data)
+    check_data(str(file_path), data)
+    check_data(pathlib.Path(str(file_path)), data)
+
+
+def test_output_stream_file_path_compressed(tmpdir):
+    data = b"some test data\n" * 10 + b"eof\n"
+    file_path = tmpdir / 'output_stream.gz'
+
+    def check_data(file_path, data, **kwargs):
+        with pa.output_stream(file_path, **kwargs) as stream:
+            stream.write(data)
+        with open(str(file_path), 'rb') as f:
+            return f.read()
+
+    assert gzip_decompress(check_data(file_path, data)) == data
+    assert gzip_decompress(check_data(str(file_path), data)) == data
+    assert gzip_decompress(
+        check_data(pathlib.Path(str(file_path)), data)) == data
+
+    assert gzip_decompress(
+        check_data(file_path, data, compression='gzip')) == data
+    assert check_data(file_path, data, compression=None) == data
+
+
+def test_output_stream_python_file(tmpdir):
+    data = b"some test data\n" * 10 + b"eof\n"
+
+    def check_data(data, **kwargs):
+        # XXX cannot use BytesIO because stream.close() is necessary
+        # to finish writing compressed data, but it will also close the
+        # underlying BytesIO
+        fn = str(tmpdir / 'output_stream_file')
+        with open(fn, 'wb') as f:
+            with pa.output_stream(f, **kwargs) as stream:
+                stream.write(data)
+        with open(fn, 'rb') as f:
+            return f.read()
+
+    assert check_data(data) == data
+    assert gzip_decompress(check_data(data, compression='gzip')) == data
+
+
+def test_output_stream_errors(tmpdir):
+    buf = memoryview(bytearray())
+    with pytest.raises(ValueError):
+        pa.output_stream(buf, compression="foo")
+
+    for arg in [bytearray(), StringIO()]:
+        with pytest.raises(TypeError):
+            pa.output_stream(arg)
+
+    fn = str(tmpdir / 'new_file')
+    with open(fn, 'wb') as f:
+        pass
+    with open(fn, 'rb') as f:
+        with pytest.raises(TypeError, match="writable file expected"):
+            pa.output_stream(f)