You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2017/01/23 14:10:28 UTC

arrow git commit: ARROW-503: [Python] Implement Python interface to streaming file format

Repository: arrow
Updated Branches:
  refs/heads/master c327b5fd2 -> 1f81adcc8


ARROW-503: [Python] Implement Python interface to streaming file format

See the new `StreamWriter` and `StreamReader` classes. This patch is stacked on top of the patch for ARROW-475. Will rebase when that is merged.

Author: Wes McKinney <we...@twosigma.com>

Closes #299 from wesm/ARROW-503 and squashes the following commits:

e9d918e [Wes McKinney] Close BufferOutputStream after completing file or stream writes
31e519f [Wes McKinney] Add function alias to preserve backwards compatibility
faac28c [Wes McKinney] Fix small bug in BinaryArray::Equals, add rudimentary StreamReader/Writer interface and tests
d9fb3dc [Wes McKinney] Refactoring, consolidate IPC code into io.pyx


Project: http://git-wip-us.apache.org/repos/asf/arrow/repo
Commit: http://git-wip-us.apache.org/repos/asf/arrow/commit/1f81adcc
Tree: http://git-wip-us.apache.org/repos/asf/arrow/tree/1f81adcc
Diff: http://git-wip-us.apache.org/repos/asf/arrow/diff/1f81adcc

Branch: refs/heads/master
Commit: 1f81adcc88b138c6ae5f5ffb3250f87239c89dc1
Parents: c327b5f
Author: Wes McKinney <we...@twosigma.com>
Authored: Mon Jan 23 09:10:18 2017 -0500
Committer: Wes McKinney <we...@twosigma.com>
Committed: Mon Jan 23 09:10:18 2017 -0500

----------------------------------------------------------------------
 cpp/src/arrow/array.cc                   |   2 +-
 cpp/src/arrow/ipc/ipc-file-test.cc       |   2 +
 cpp/src/arrow/ipc/stream.cc              |   6 +-
 cpp/src/arrow/ipc/stream.h               |   3 +
 python/CMakeLists.txt                    |   1 -
 python/pyarrow/__init__.py               |   2 +
 python/pyarrow/includes/libarrow_ipc.pxd |  29 +-
 python/pyarrow/io.pyx                    | 367 ++++++++++++++++++--------
 python/pyarrow/ipc.py                    |  83 ++++++
 python/pyarrow/ipc.pyx                   | 115 --------
 python/pyarrow/schema.pyx                |   1 +
 python/pyarrow/table.pxd                 |   1 +
 python/pyarrow/tests/test_ipc.py         | 120 ++++-----
 python/setup.py                          |   1 -
 14 files changed, 438 insertions(+), 295 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/arrow/blob/1f81adcc/cpp/src/arrow/array.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/array.cc b/cpp/src/arrow/array.cc
index 7509520..aa4a692 100644
--- a/cpp/src/arrow/array.cc
+++ b/cpp/src/arrow/array.cc
@@ -359,7 +359,7 @@ bool BinaryArray::EqualsExact(const BinaryArray& other) const {
 
   if (!data_buffer_ && !(other.data_buffer_)) { return true; }
 
-  return data_buffer_->Equals(*other.data_buffer_, data_buffer_->size());
+  return data_buffer_->Equals(*other.data_buffer_, raw_offsets()[length_]);
 }
 
 bool BinaryArray::Equals(const std::shared_ptr<Array>& arr) const {

http://git-wip-us.apache.org/repos/asf/arrow/blob/1f81adcc/cpp/src/arrow/ipc/ipc-file-test.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/ipc-file-test.cc b/cpp/src/arrow/ipc/ipc-file-test.cc
index 15ceb80..7cd8054 100644
--- a/cpp/src/arrow/ipc/ipc-file-test.cc
+++ b/cpp/src/arrow/ipc/ipc-file-test.cc
@@ -75,6 +75,7 @@ class TestFileFormat : public ::testing::TestWithParam<MakeRecordBatch*> {
       RETURN_NOT_OK(writer->WriteRecordBatch(*batch));
     }
     RETURN_NOT_OK(writer->Close());
+    RETURN_NOT_OK(sink_->Close());
 
     // Current offset into stream is the end of the file
     int64_t footer_offset;
@@ -138,6 +139,7 @@ class TestStreamFormat : public ::testing::TestWithParam<MakeRecordBatch*> {
       RETURN_NOT_OK(writer->WriteRecordBatch(batch));
     }
     RETURN_NOT_OK(writer->Close());
+    RETURN_NOT_OK(sink_->Close());
 
     // Open the file
     auto buf_reader = std::make_shared<io::BufferReader>(buffer_);

http://git-wip-us.apache.org/repos/asf/arrow/blob/1f81adcc/cpp/src/arrow/ipc/stream.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/stream.cc b/cpp/src/arrow/ipc/stream.cc
index a2ca672..c9057e8 100644
--- a/cpp/src/arrow/ipc/stream.cc
+++ b/cpp/src/arrow/ipc/stream.cc
@@ -117,9 +117,9 @@ Status StreamWriter::WriteRecordBatch(const RecordBatch& batch) {
 }
 
 Status StreamWriter::Close() {
-  // Close the stream
-  RETURN_NOT_OK(CheckStarted());
-  return sink_->Close();
+  // Write the schema if not already written
+  // User is responsible for closing the OutputStream
+  return CheckStarted();
 }
 
 // ----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/arrow/blob/1f81adcc/cpp/src/arrow/ipc/stream.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/stream.h b/cpp/src/arrow/ipc/stream.h
index 0b0e62f..53f51dc 100644
--- a/cpp/src/arrow/ipc/stream.h
+++ b/cpp/src/arrow/ipc/stream.h
@@ -54,6 +54,9 @@ class ARROW_EXPORT StreamWriter {
       std::shared_ptr<StreamWriter>* out);
 
   virtual Status WriteRecordBatch(const RecordBatch& batch);
+
+  /// Perform any logic necessary to finish the stream. User is responsible for
+  /// closing the actual OutputStream
   virtual Status Close();
 
  protected:

http://git-wip-us.apache.org/repos/asf/arrow/blob/1f81adcc/python/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/python/CMakeLists.txt b/python/CMakeLists.txt
index b3735b1..d63fff4 100644
--- a/python/CMakeLists.txt
+++ b/python/CMakeLists.txt
@@ -409,7 +409,6 @@ set(CYTHON_EXTENSIONS
   config
   error
   io
-  ipc
   scalar
   schema
   table

http://git-wip-us.apache.org/repos/asf/arrow/blob/1f81adcc/python/pyarrow/__init__.py
----------------------------------------------------------------------
diff --git a/python/pyarrow/__init__.py b/python/pyarrow/__init__.py
index d563c7a..7c521db 100644
--- a/python/pyarrow/__init__.py
+++ b/python/pyarrow/__init__.py
@@ -46,6 +46,8 @@ from pyarrow.filesystem import Filesystem, HdfsClient, LocalFilesystem
 from pyarrow.io import (HdfsFile, NativeFile, PythonFileInterface,
                         Buffer, InMemoryOutputStream, BufferReader)
 
+from pyarrow.ipc import FileReader, FileWriter, StreamReader, StreamWriter
+
 from pyarrow.scalar import (ArrayValue, Scalar, NA, NAType,
                             BooleanValue,
                             Int8Value, Int16Value, Int32Value, Int64Value,

http://git-wip-us.apache.org/repos/asf/arrow/blob/1f81adcc/python/pyarrow/includes/libarrow_ipc.pxd
----------------------------------------------------------------------
diff --git a/python/pyarrow/includes/libarrow_ipc.pxd b/python/pyarrow/includes/libarrow_ipc.pxd
index 8295760..bfece14 100644
--- a/python/pyarrow/includes/libarrow_ipc.pxd
+++ b/python/pyarrow/includes/libarrow_ipc.pxd
@@ -20,18 +20,37 @@
 from pyarrow.includes.common cimport *
 from pyarrow.includes.libarrow cimport (MemoryPool, CArray, CSchema,
                                         CRecordBatch)
-from pyarrow.includes.libarrow_io cimport (OutputStream, ReadableFileInterface)
+from pyarrow.includes.libarrow_io cimport (InputStream, OutputStream,
+                                           ReadableFileInterface)
 
-cdef extern from "arrow/ipc/file.h" namespace "arrow::ipc" nogil:
 
-    cdef cppclass CFileWriter " arrow::ipc::FileWriter":
+cdef extern from "arrow/ipc/stream.h" namespace "arrow::ipc" nogil:
+
+    cdef cppclass CStreamWriter " arrow::ipc::StreamWriter":
         @staticmethod
         CStatus Open(OutputStream* sink, const shared_ptr[CSchema]& schema,
-                     shared_ptr[CFileWriter]* out)
+                     shared_ptr[CStreamWriter]* out)
 
+        CStatus Close()
         CStatus WriteRecordBatch(const CRecordBatch& batch)
 
-        CStatus Close()
+    cdef cppclass CStreamReader " arrow::ipc::StreamReader":
+
+        @staticmethod
+        CStatus Open(const shared_ptr[InputStream]& stream,
+                     shared_ptr[CStreamReader]* out)
+
+        shared_ptr[CSchema] schema()
+
+        CStatus GetNextRecordBatch(shared_ptr[CRecordBatch]* batch)
+
+
+cdef extern from "arrow/ipc/file.h" namespace "arrow::ipc" nogil:
+
+    cdef cppclass CFileWriter " arrow::ipc::FileWriter"(CStreamWriter):
+        @staticmethod
+        CStatus Open(OutputStream* sink, const shared_ptr[CSchema]& schema,
+                     shared_ptr[CFileWriter]* out)
 
     cdef cppclass CFileReader " arrow::ipc::FileReader":
 

http://git-wip-us.apache.org/repos/asf/arrow/blob/1f81adcc/python/pyarrow/io.pyx
----------------------------------------------------------------------
diff --git a/python/pyarrow/io.pyx b/python/pyarrow/io.pyx
index 2621512..0755ed8 100644
--- a/python/pyarrow/io.pyx
+++ b/python/pyarrow/io.pyx
@@ -15,20 +15,26 @@
 # specific language governing permissions and limitations
 # under the License.
 
-# Cython wrappers for IO interfaces defined in arrow/io
+# Cython wrappers for IO interfaces defined in arrow::io and messaging in
+# arrow::ipc
 
 # cython: profile=False
 # distutils: language = c++
 # cython: embedsignature = True
 
+from cython.operator cimport dereference as deref
+
 from libc.stdlib cimport malloc, free
 
 from pyarrow.includes.libarrow cimport *
-cimport pyarrow.includes.pyarrow as pyarrow
 from pyarrow.includes.libarrow_io cimport *
+from pyarrow.includes.libarrow_ipc cimport *
+cimport pyarrow.includes.pyarrow as pyarrow
 
 from pyarrow.compat import frombytes, tobytes, encode_file_path
 from pyarrow.error cimport check_status
+from pyarrow.schema cimport Schema
+from pyarrow.table cimport RecordBatch, batch_from_cbatch
 
 cimport cpython as cp
 
@@ -38,6 +44,11 @@ import sys
 import threading
 import time
 
+
+# 64K
+DEFAULT_BUFFER_SIZE = 2 ** 16
+
+
 # To let us get a PyObject* and avoid Cython auto-ref-counting
 cdef extern from "Python.h":
     PyObject* PyBytes_FromStringAndSizeNative" PyBytes_FromStringAndSize"(
@@ -167,6 +178,129 @@ cdef class NativeFile:
 
         return wrap_buffer(output)
 
+    def download(self, stream_or_path, buffer_size=None):
+        """
+        Read file completely to local path (rather than reading completely into
+        memory). First seeks to the beginning of the file.
+        """
+        cdef:
+            int64_t bytes_read = 0
+            uint8_t* buf
+        self._assert_readable()
+
+        buffer_size = buffer_size or DEFAULT_BUFFER_SIZE
+
+        write_queue = Queue(50)
+
+        if not hasattr(stream_or_path, 'read'):
+            stream = open(stream_or_path, 'wb')
+            cleanup = lambda: stream.close()
+        else:
+            stream = stream_or_path
+            cleanup = lambda: None
+
+        done = False
+        exc_info = None
+        def bg_write():
+            try:
+                while not done or write_queue.qsize() > 0:
+                    try:
+                        buf = write_queue.get(timeout=0.01)
+                    except QueueEmpty:
+                        continue
+                    stream.write(buf)
+            except Exception as e:
+                exc_info = sys.exc_info()
+            finally:
+                cleanup()
+
+        self.seek(0)
+
+        writer_thread = threading.Thread(target=bg_write)
+
+        # This isn't ideal -- PyBytes_FromStringAndSize copies the data from
+        # the passed buffer, so it's hard for us to avoid doubling the memory
+        buf = <uint8_t*> malloc(buffer_size)
+        if buf == NULL:
+            raise MemoryError("Failed to allocate {0} bytes"
+                              .format(buffer_size))
+
+        writer_thread.start()
+
+        cdef int64_t total_bytes = 0
+        cdef int32_t c_buffer_size = buffer_size
+
+        try:
+            while True:
+                with nogil:
+                    check_status(self.rd_file.get()
+                                 .Read(c_buffer_size, &bytes_read, buf))
+
+                total_bytes += bytes_read
+
+                # EOF
+                if bytes_read == 0:
+                    break
+
+                pybuf = cp.PyBytes_FromStringAndSize(<const char*>buf,
+                                                     bytes_read)
+
+                write_queue.put_nowait(pybuf)
+        finally:
+            free(buf)
+            done = True
+
+        writer_thread.join()
+        if exc_info is not None:
+            raise exc_info[0], exc_info[1], exc_info[2]
+
+    def upload(self, stream, buffer_size=None):
+        """
+        Pipe file-like object to file
+        """
+        write_queue = Queue(50)
+        self._assert_writeable()
+
+        buffer_size = buffer_size or DEFAULT_BUFFER_SIZE
+
+        done = False
+        exc_info = None
+        def bg_write():
+            try:
+                while not done or write_queue.qsize() > 0:
+                    try:
+                        buf = write_queue.get(timeout=0.01)
+                    except QueueEmpty:
+                        continue
+
+                    self.write(buf)
+
+            except Exception as e:
+                exc_info = sys.exc_info()
+
+        writer_thread = threading.Thread(target=bg_write)
+        writer_thread.start()
+
+        try:
+            while True:
+                buf = stream.read(buffer_size)
+                if not buf:
+                    break
+
+                if writer_thread.is_alive():
+                    while write_queue.full():
+                        time.sleep(0.01)
+                else:
+                    break
+
+                write_queue.put_nowait(buf)
+        finally:
+            done = True
+
+        writer_thread.join()
+        if exc_info is not None:
+            raise exc_info[0], exc_info[1], exc_info[2]
+
 
 # ----------------------------------------------------------------------
 # Python file-like objects
@@ -679,58 +813,17 @@ cdef class _HdfsClient:
 
         return out
 
-    def upload(self, path, stream, buffer_size=2**16):
+    def download(self, path, stream, buffer_size=None):
+        with self.open(path, 'rb') as f:
+            f.download(stream, buffer_size=buffer_size)
+
+    def upload(self, path, stream, buffer_size=None):
         """
         Upload file-like object to HDFS path
         """
-        write_queue = Queue(50)
-
         with self.open(path, 'wb') as f:
-            done = False
-            exc_info = None
-            def bg_write():
-                try:
-                    while not done or write_queue.qsize() > 0:
-                        try:
-                            buf = write_queue.get(timeout=0.01)
-                        except QueueEmpty:
-                            continue
-
-                        f.write(buf)
-
-                except Exception as e:
-                    exc_info = sys.exc_info()
-
-            writer_thread = threading.Thread(target=bg_write)
-            writer_thread.start()
+            f.upload(stream, buffer_size=buffer_size)
 
-            try:
-                while True:
-                    buf = stream.read(buffer_size)
-                    if not buf:
-                        break
-
-                    if writer_thread.is_alive():
-                        while write_queue.full():
-                            time.sleep(0.01)
-                    else:
-                        break
-
-                    write_queue.put_nowait(buf)
-            finally:
-                done = True
-
-            writer_thread.join()
-            if exc_info is not None:
-                raise exc_info[0], exc_info[1], exc_info[2]
-
-    def download(self, path, stream, buffer_size=None):
-        with self.open(path, 'rb', buffer_size=buffer_size) as f:
-            f.download(stream)
-
-
-# ----------------------------------------------------------------------
-# Specialization for HDFS
 
 # ARROW-404: Helper class to ensure that files are closed before the
 # client. During deallocation of the extension class, the attributes are
@@ -766,75 +859,139 @@ cdef class HdfsFile(NativeFile):
     def __dealloc__(self):
         self.parent = None
 
-    def download(self, stream_or_path):
+# ----------------------------------------------------------------------
+# File and stream readers and writers
+
+cdef class _StreamWriter:
+    cdef:
+        shared_ptr[CStreamWriter] writer
+        shared_ptr[OutputStream] sink
+        bint closed
+
+    def __cinit__(self):
+        self.closed = True
+
+    def __dealloc__(self):
+        if not self.closed:
+            self.close()
+
+    def _open(self, sink, Schema schema):
+        get_writer(sink, &self.sink)
+
+        with nogil:
+            check_status(CStreamWriter.Open(self.sink.get(), schema.sp_schema,
+                                            &self.writer))
+
+        self.closed = False
+
+    def write_batch(self, RecordBatch batch):
+        with nogil:
+            check_status(self.writer.get()
+                         .WriteRecordBatch(deref(batch.batch)))
+
+    def close(self):
+        with nogil:
+            check_status(self.writer.get().Close())
+        self.closed = True
+
+
+cdef class _StreamReader:
+    cdef:
+        shared_ptr[CStreamReader] reader
+
+    cdef readonly:
+        Schema schema
+
+    def __cinit__(self):
+        pass
+
+    def _open(self, source):
+        cdef:
+            shared_ptr[ReadableFileInterface] reader
+            shared_ptr[InputStream] in_stream
+
+        get_reader(source, &reader)
+        in_stream = <shared_ptr[InputStream]> reader
+
+        with nogil:
+            check_status(CStreamReader.Open(in_stream, &self.reader))
+
+        schema = Schema()
+        schema.init_schema(self.reader.get().schema())
+
+    def get_next_batch(self):
         """
-        Read file completely to local path (rather than reading completely into
-        memory). First seeks to the beginning of the file.
+        Read next RecordBatch from the stream. Raises StopIteration at end of
+        stream
         """
-        cdef:
-            int64_t bytes_read = 0
-            uint8_t* buf
-        self._assert_readable()
+        cdef shared_ptr[CRecordBatch] batch
 
-        write_queue = Queue(50)
+        with nogil:
+            check_status(self.reader.get().GetNextRecordBatch(&batch))
 
-        if not hasattr(stream_or_path, 'read'):
-            stream = open(stream_or_path, 'wb')
-            cleanup = lambda: stream.close()
-        else:
-            stream = stream_or_path
-            cleanup = lambda: None
+        if batch.get() == NULL:
+            raise StopIteration
 
-        done = False
-        exc_info = None
-        def bg_write():
-            try:
-                while not done or write_queue.qsize() > 0:
-                    try:
-                        buf = write_queue.get(timeout=0.01)
-                    except QueueEmpty:
-                        continue
-                    stream.write(buf)
-            except Exception as e:
-                exc_info = sys.exc_info()
-            finally:
-                cleanup()
+        return batch_from_cbatch(batch)
 
-        self.seek(0)
 
-        writer_thread = threading.Thread(target=bg_write)
+cdef class _FileWriter(_StreamWriter):
 
-        # This isn't ideal -- PyBytes_FromStringAndSize copies the data from
-        # the passed buffer, so it's hard for us to avoid doubling the memory
-        buf = <uint8_t*> malloc(self.buffer_size)
-        if buf == NULL:
-            raise MemoryError("Failed to allocate {0} bytes"
-                              .format(self.buffer_size))
+    def _open(self, sink, Schema schema):
+        cdef shared_ptr[CFileWriter] writer
+        get_writer(sink, &self.sink)
 
-        writer_thread.start()
+        with nogil:
+            check_status(CFileWriter.Open(self.sink.get(), schema.sp_schema,
+                                          &writer))
 
-        cdef int64_t total_bytes = 0
+        # Cast to base class, because has same interface
+        self.writer = <shared_ptr[CStreamWriter]> writer
+        self.closed = False
 
-        try:
-            while True:
-                with nogil:
-                    check_status(self.rd_file.get()
-                                 .Read(self.buffer_size, &bytes_read, buf))
 
-                total_bytes += bytes_read
+cdef class _FileReader:
+    cdef:
+        shared_ptr[CFileReader] reader
 
-                # EOF
-                if bytes_read == 0:
-                    break
+    def __cinit__(self):
+        pass
 
-                pybuf = cp.PyBytes_FromStringAndSize(<const char*>buf,
-                                                     bytes_read)
+    def _open(self, source, footer_offset=None):
+        cdef shared_ptr[ReadableFileInterface] reader
+        get_reader(source, &reader)
 
-                write_queue.put_nowait(pybuf)
-        finally:
-            free(buf)
-            done = True
+        cdef int64_t offset = 0
+        if footer_offset is not None:
+            offset = footer_offset
 
-        writer_thread.join()
-        if exc_info is not None:
-            raise exc_info[0], exc_info[1], exc_info[2]
+        with nogil:
+            if offset != 0:
+                check_status(CFileReader.Open2(reader, offset, &self.reader))
+            else:
+                check_status(CFileReader.Open(reader, &self.reader))
+
+    property num_dictionaries:
+
+        def __get__(self):
+            return self.reader.get().num_dictionaries()
+
+    property num_record_batches:
+
+        def __get__(self):
+            return self.reader.get().num_record_batches()
+
+    def get_batch(self, int i):
+        cdef shared_ptr[CRecordBatch] batch
+
+        if i < 0 or i >= self.num_record_batches:
+            raise ValueError('Batch number {0} out of range'.format(i))
+
+        with nogil:
+            check_status(self.reader.get().GetRecordBatch(i, &batch))
+
+        return batch_from_cbatch(batch)
+
+    # TODO(wesm): ARROW-503: Function was renamed. Remove after a period of
+    # time has passed
+    get_record_batch = get_batch

http://git-wip-us.apache.org/repos/asf/arrow/blob/1f81adcc/python/pyarrow/ipc.py
----------------------------------------------------------------------
diff --git a/python/pyarrow/ipc.py b/python/pyarrow/ipc.py
new file mode 100644
index 0000000..5a56165
--- /dev/null
+++ b/python/pyarrow/ipc.py
@@ -0,0 +1,83 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Arrow file and stream reader/writer classes, and other messaging tools
+
+import pyarrow.io as io
+
+
+class StreamReader(io._StreamReader):
+    """
+    Reader for the Arrow streaming binary format
+
+    Parameters
+    ----------
+    source : str, pyarrow.NativeFile, or file-like Python object
+        Either a file path, or a readable file object
+    """
+    def __init__(self, source):
+        self._open(source)
+
+    def __iter__(self):
+        while True:
+            yield self.get_next_batch()
+
+
+class StreamWriter(io._StreamWriter):
+    """
+    Writer for the Arrow streaming binary format
+
+    Parameters
+    ----------
+    sink : str, pyarrow.NativeFile, or file-like Python object
+        Either a file path, or a writeable file object
+    schema : pyarrow.Schema
+        The Arrow schema for data to be written to the file
+    """
+    def __init__(self, sink, schema):
+        self._open(sink, schema)
+
+
+class FileReader(io._FileReader):
+    """
+    Class for reading Arrow record batch data from the Arrow binary file format
+
+    Parameters
+    ----------
+    source : str, pyarrow.NativeFile, or file-like Python object
+        Either a file path, or a readable file object
+    footer_offset : int, default None
+        If the file is embedded in some larger file, this is the byte offset to
+        the very end of the file data
+    """
+    def __init__(self, source, footer_offset=None):
+        self._open(source, footer_offset=footer_offset)
+
+
+class FileWriter(io._FileWriter):
+    """
+    Writer to create the Arrow binary file format
+
+    Parameters
+    ----------
+    sink : str, pyarrow.NativeFile, or file-like Python object
+        Either a file path, or a writeable file object
+    schema : pyarrow.Schema
+        The Arrow schema for data to be written to the file
+    """
+    def __init__(self, sink, schema):
+        self._open(sink, schema)

http://git-wip-us.apache.org/repos/asf/arrow/blob/1f81adcc/python/pyarrow/ipc.pyx
----------------------------------------------------------------------
diff --git a/python/pyarrow/ipc.pyx b/python/pyarrow/ipc.pyx
deleted file mode 100644
index 22069a7..0000000
--- a/python/pyarrow/ipc.pyx
+++ /dev/null
@@ -1,115 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-# Cython wrappers for arrow::ipc
-
-# cython: profile=False
-# distutils: language = c++
-# cython: embedsignature = True
-
-from cython.operator cimport dereference as deref
-
-from pyarrow.includes.libarrow cimport *
-from pyarrow.includes.libarrow_io cimport *
-from pyarrow.includes.libarrow_ipc cimport *
-cimport pyarrow.includes.pyarrow as pyarrow
-
-from pyarrow.error cimport check_status
-from pyarrow.io cimport NativeFile, get_reader, get_writer
-from pyarrow.schema cimport Schema
-from pyarrow.table cimport RecordBatch
-
-from pyarrow.compat import frombytes, tobytes
-import pyarrow.io as io
-
-cimport cpython as cp
-
-
-cdef class ArrowFileWriter:
-    cdef:
-        shared_ptr[CFileWriter] writer
-        shared_ptr[OutputStream] sink
-        bint closed
-
-    def __cinit__(self, sink, Schema schema):
-        self.closed = True
-        get_writer(sink, &self.sink)
-
-        with nogil:
-            check_status(CFileWriter.Open(self.sink.get(), schema.sp_schema,
-                                          &self.writer))
-
-        self.closed = False
-
-    def __dealloc__(self):
-        if not self.closed:
-            self.close()
-
-    def write_record_batch(self, RecordBatch batch):
-        with nogil:
-            check_status(self.writer.get()
-                         .WriteRecordBatch(deref(batch.batch)))
-
-    def close(self):
-        with nogil:
-            check_status(self.writer.get().Close())
-        self.closed = True
-
-
-cdef class ArrowFileReader:
-    cdef:
-        shared_ptr[CFileReader] reader
-
-    def __cinit__(self, source, footer_offset=None):
-        cdef shared_ptr[ReadableFileInterface] reader
-        get_reader(source, &reader)
-
-        cdef int64_t offset = 0
-        if footer_offset is not None:
-            offset = footer_offset
-
-        with nogil:
-            if offset != 0:
-                check_status(CFileReader.Open2(reader, offset, &self.reader))
-            else:
-                check_status(CFileReader.Open(reader, &self.reader))
-
-    property num_dictionaries:
-
-        def __get__(self):
-            return self.reader.get().num_dictionaries()
-
-    property num_record_batches:
-
-        def __get__(self):
-            return self.reader.get().num_record_batches()
-
-    def get_record_batch(self, int i):
-        cdef:
-            shared_ptr[CRecordBatch] batch
-            RecordBatch result
-
-        if i < 0 or i >= self.num_record_batches:
-            raise ValueError('Batch number {0} out of range'.format(i))
-
-        with nogil:
-            check_status(self.reader.get().GetRecordBatch(i, &batch))
-
-        result = RecordBatch()
-        result.init(batch)
-
-        return result

http://git-wip-us.apache.org/repos/asf/arrow/blob/1f81adcc/python/pyarrow/schema.pyx
----------------------------------------------------------------------
diff --git a/python/pyarrow/schema.pyx b/python/pyarrow/schema.pyx
index 2bcfec1..52eeeaf 100644
--- a/python/pyarrow/schema.pyx
+++ b/python/pyarrow/schema.pyx
@@ -112,6 +112,7 @@ cdef class Field:
         def __get__(self):
             return frombytes(self.field.name)
 
+
 cdef class Schema:
 
     def __cinit__(self):

http://git-wip-us.apache.org/repos/asf/arrow/blob/1f81adcc/python/pyarrow/table.pxd
----------------------------------------------------------------------
diff --git a/python/pyarrow/table.pxd b/python/pyarrow/table.pxd
index df3687d..389727b 100644
--- a/python/pyarrow/table.pxd
+++ b/python/pyarrow/table.pxd
@@ -59,3 +59,4 @@ cdef class RecordBatch:
     cdef _check_nullptr(self)
 
 cdef api object table_from_ctable(const shared_ptr[CTable]& ctable)
+cdef api object batch_from_cbatch(const shared_ptr[CRecordBatch]& cbatch)

http://git-wip-us.apache.org/repos/asf/arrow/blob/1f81adcc/python/pyarrow/tests/test_ipc.py
----------------------------------------------------------------------
diff --git a/python/pyarrow/tests/test_ipc.py b/python/pyarrow/tests/test_ipc.py
index bbd6c6a..819d1b7 100644
--- a/python/pyarrow/tests/test_ipc.py
+++ b/python/pyarrow/tests/test_ipc.py
@@ -16,21 +16,20 @@
 # under the License.
 
 import io
+import pytest
 
 import numpy as np
 
 from pandas.util.testing import assert_frame_equal
 import pandas as pd
 
-import pyarrow as A
-import pyarrow.io as aio
-import pyarrow.ipc as ipc
+from pyarrow.compat import unittest
+import pyarrow as pa
 
 
-class RoundtripTest(object):
-    # Also tests writing zero-copy NumPy array with additional padding
+class MessagingTest(object):
 
-    def __init__(self):
+    def setUp(self):
         self.sink = self._get_sink()
 
     def _get_sink(self):
@@ -39,14 +38,15 @@ class RoundtripTest(object):
     def _get_source(self):
         return self.sink.getvalue()
 
-    def run(self):
+    def write_batches(self):
         nrows = 5
         df = pd.DataFrame({
             'one': np.random.randn(nrows),
             'two': ['foo', np.nan, 'bar', 'bazbaz', 'qux']})
 
-        batch = A.RecordBatch.from_pandas(df)
-        writer = ipc.ArrowFileWriter(self.sink, batch.schema)
+        batch = pa.RecordBatch.from_pandas(df)
+
+        writer = self._get_writer(self.sink, batch.schema)
 
         num_batches = 5
         frames = []
@@ -55,46 +55,73 @@ class RoundtripTest(object):
             unique_df = df.copy()
             unique_df['one'] = np.random.randn(nrows)
 
-            batch = A.RecordBatch.from_pandas(unique_df)
-            writer.write_record_batch(batch)
+            batch = pa.RecordBatch.from_pandas(unique_df)
+            writer.write_batch(batch)
             frames.append(unique_df)
             batches.append(batch)
 
         writer.close()
+        return batches
+
+
+class TestFile(MessagingTest, unittest.TestCase):
+    # Also tests writing zero-copy NumPy array with additional padding
+
+    def _get_writer(self, sink, schema):
+        return pa.FileWriter(sink, schema)
 
+    def test_simple_roundtrip(self):
+        batches = self.write_batches()
         file_contents = self._get_source()
-        reader = ipc.ArrowFileReader(aio.BufferReader(file_contents))
 
-        assert reader.num_record_batches == num_batches
+        reader = pa.FileReader(pa.BufferReader(file_contents))
 
-        for i in range(num_batches):
+        assert reader.num_record_batches == len(batches)
+
+        for i, batch in enumerate(batches):
             # it works. Must convert back to DataFrame
-            batch = reader.get_record_batch(i)
+            batch = reader.get_batch(i)
             assert batches[i].equals(batch)
 
 
-class InMemoryStreamTest(RoundtripTest):
+class TestStream(MessagingTest, unittest.TestCase):
+
+    def _get_writer(self, sink, schema):
+        return pa.StreamWriter(sink, schema)
+
+    def test_simple_roundtrip(self):
+        batches = self.write_batches()
+        file_contents = self._get_source()
+        reader = pa.StreamReader(pa.BufferReader(file_contents))
+
+        total = 0
+        for i, next_batch in enumerate(reader):
+            assert next_batch.equals(batches[i])
+            total += 1
+
+        assert total == len(batches)
+
+        with pytest.raises(StopIteration):
+            reader.get_next_batch()
+
+
+class TestInMemoryFile(TestFile):
 
     def _get_sink(self):
-        return aio.InMemoryOutputStream()
+        return pa.InMemoryOutputStream()
 
     def _get_source(self):
         return self.sink.get_result()
 
 
-def test_ipc_file_simple_roundtrip():
-    helper = RoundtripTest()
-    helper.run()
-
-
 def test_ipc_zero_copy_numpy():
     df = pd.DataFrame({'foo': [1.5]})
 
-    batch = A.RecordBatch.from_pandas(df)
-    sink = aio.InMemoryOutputStream()
+    batch = pa.RecordBatch.from_pandas(df)
+    sink = pa.InMemoryOutputStream()
     write_file(batch, sink)
     buffer = sink.get_result()
-    reader = aio.BufferReader(buffer)
+    reader = pa.BufferReader(buffer)
 
     batches = read_file(reader)
 
@@ -103,48 +130,13 @@ def test_ipc_zero_copy_numpy():
     assert_frame_equal(df, rdf)
 
 
-# XXX: For benchmarking
-
-def big_batch():
-    K = 2**4
-    N = 2**20
-    df = pd.DataFrame(
-        np.random.randn(K, N).T,
-        columns=[str(i) for i in range(K)]
-    )
-
-    df = pd.concat([df] * 2 ** 3, ignore_index=True)
-    return df
-
-
-def write_to_memory2(batch):
-    sink = aio.InMemoryOutputStream()
-    write_file(batch, sink)
-    return sink.get_result()
-
-
-def write_to_memory(batch):
-    sink = io.BytesIO()
-    write_file(batch, sink)
-    return sink.getvalue()
-
-
 def write_file(batch, sink):
-    writer = ipc.ArrowFileWriter(sink, batch.schema)
-    writer.write_record_batch(batch)
+    writer = pa.FileWriter(sink, batch.schema)
+    writer.write_batch(batch)
     writer.close()
 
 
 def read_file(source):
-    reader = ipc.ArrowFileReader(source)
-    return [reader.get_record_batch(i)
+    reader = pa.FileReader(source)
+    return [reader.get_batch(i)
             for i in range(reader.num_record_batches)]
-
-# df = big_batch()
-# batch = A.RecordBatch.from_pandas(df)
-# mem = write_to_memory(batch)
-# batches = read_file(mem)
-# data = batches[0].to_pandas()
-# rdf = pd.DataFrame(data)
-
-# [x.to_pandas() for x in batches]

http://git-wip-us.apache.org/repos/asf/arrow/blob/1f81adcc/python/setup.py
----------------------------------------------------------------------
diff --git a/python/setup.py b/python/setup.py
index de59a92..9c63e93 100644
--- a/python/setup.py
+++ b/python/setup.py
@@ -94,7 +94,6 @@ class build_ext(_build_ext):
         'config',
         'error',
         'io',
-        'ipc',
         '_parquet',
         'scalar',
         'schema',