You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ju...@apache.org on 2016/10/11 01:42:38 UTC

[3/3] arrow git commit: ARROW-312: Read and write Arrow IPC file format from Python

ARROW-312: Read and write Arrow IPC file format from Python

This also adds some IO scaffolding for interacting with `arrow::Buffer` objects from Python and assorted additions to help with testing.

Author: Wes McKinney <we...@twosigma.com>

Closes #164 from wesm/ARROW-312 and squashes the following commits:

7df3e5f [Wes McKinney] Set BUILD_WITH_INSTALL_RPATH on arrow_ipc
be8cee0 [Wes McKinney] Link Cython modules to libarrow* libraries
5716601 [Wes McKinney] Fix accidental deletion
77fb03b [Wes McKinney] Add / test Buffer wrapper. Test that we can write an arrow file to a wrapped buffer. Resize buffer in BufferOutputStream on close
316537d [Wes McKinney] Get ready to wrap Arrow buffers in a Python object
4822d32 [Wes McKinney] Implement RecordBatch::Equals, compare in Python ipc file writes
a931e49 [Wes McKinney] Permit buffers (write padding) in a non-multiple of 64 in an IPC context, to allow zero-copy writing of NumPy arrays
2c49cd4 [Wes McKinney] Some debugging
ca1562b [Wes McKinney] Draft implementations of Arrow file read/write from Python


Project: http://git-wip-us.apache.org/repos/asf/arrow/repo
Commit: http://git-wip-us.apache.org/repos/asf/arrow/commit/a9747cea
Tree: http://git-wip-us.apache.org/repos/asf/arrow/tree/a9747cea
Diff: http://git-wip-us.apache.org/repos/asf/arrow/diff/a9747cea

Branch: refs/heads/master
Commit: a9747ceac2b6399c6acf027de8074d8661d5eb1d
Parents: 17cd7a6
Author: Wes McKinney <we...@twosigma.com>
Authored: Mon Oct 10 11:21:49 2016 -0400
Committer: Julien Le Dem <ju...@dremio.com>
Committed: Mon Oct 10 18:42:05 2016 -0700

----------------------------------------------------------------------
 cpp/src/arrow/io/io-memory-test.cc       |  25 ++
 cpp/src/arrow/io/memory.cc               |  13 +-
 cpp/src/arrow/ipc/CMakeLists.txt         |   7 +
 cpp/src/arrow/ipc/adapter.cc             |  16 +-
 cpp/src/arrow/ipc/util.h                 |   6 +-
 cpp/src/arrow/table-test.cc              |  27 ++
 cpp/src/arrow/table.cc                   |  16 ++
 cpp/src/arrow/table.h                    |   2 +
 cpp/src/arrow/types/primitive-test.cc    |   3 +-
 cpp/src/arrow/util/bit-util.h            |  13 +
 cpp/src/arrow/util/buffer.cc             |  16 +-
 cpp/src/arrow/util/buffer.h              |   1 -
 cpp/src/arrow/util/logging.h             |   4 +-
 python/CMakeLists.txt                    |   8 +-
 python/cmake_modules/FindArrow.cmake     |  11 +
 python/pyarrow/__init__.py               |   3 +-
 python/pyarrow/array.pyx                 |  44 +---
 python/pyarrow/includes/common.pxd       |   4 -
 python/pyarrow/includes/libarrow.pxd     |  29 ++-
 python/pyarrow/includes/libarrow_io.pxd  |  14 +-
 python/pyarrow/includes/libarrow_ipc.pxd |  52 ++++
 python/pyarrow/includes/pyarrow.pxd      |  13 +-
 python/pyarrow/io.pxd                    |   6 +
 python/pyarrow/io.pyx                    | 340 ++++++++++++++++----------
 python/pyarrow/ipc.pyx                   | 155 ++++++++++++
 python/pyarrow/table.pxd                 |  17 +-
 python/pyarrow/table.pyx                 | 194 +++++++++++----
 python/pyarrow/tests/test_array.py       |   4 +
 python/pyarrow/tests/test_io.py          |  41 ++++
 python/pyarrow/tests/test_ipc.py         | 116 +++++++++
 python/pyarrow/tests/test_table.py       |  82 +++----
 python/setup.py                          |   1 +
 python/src/pyarrow/adapters/builtin.cc   |   2 +-
 python/src/pyarrow/adapters/pandas.cc    |   8 +
 python/src/pyarrow/common.cc             |   2 +-
 python/src/pyarrow/common.h              |  20 +-
 python/src/pyarrow/io.cc                 |   6 +-
 37 files changed, 1012 insertions(+), 309 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/arrow/blob/a9747cea/cpp/src/arrow/io/io-memory-test.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/io/io-memory-test.cc b/cpp/src/arrow/io/io-memory-test.cc
index 6de35da..a49faf3 100644
--- a/cpp/src/arrow/io/io-memory-test.cc
+++ b/cpp/src/arrow/io/io-memory-test.cc
@@ -121,5 +121,30 @@ TEST_F(TestMemoryMappedFile, InvalidFile) {
       IOError, MemoryMappedFile::Open(non_existent_path, FileMode::READ, &result));
 }
 
+class TestBufferOutputStream : public ::testing::Test {
+ public:
+  void SetUp() {
+    buffer_.reset(new PoolBuffer(default_memory_pool()));
+    stream_.reset(new BufferOutputStream(buffer_));
+  }
+
+ protected:
+  std::shared_ptr<PoolBuffer> buffer_;
+  std::unique_ptr<OutputStream> stream_;
+};
+
+TEST_F(TestBufferOutputStream, CloseResizes) {
+  std::string data = "data123456";
+
+  const int64_t nbytes = static_cast<int64_t>(data.size());
+  const int K = 100;
+  for (int i = 0; i < K; ++i) {
+    EXPECT_OK(stream_->Write(reinterpret_cast<const uint8_t*>(data.c_str()), nbytes));
+  }
+
+  ASSERT_OK(stream_->Close());
+  ASSERT_EQ(K * nbytes, buffer_->size());
+}
+
 }  // namespace io
 }  // namespace arrow

http://git-wip-us.apache.org/repos/asf/arrow/blob/a9747cea/cpp/src/arrow/io/memory.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/io/memory.cc b/cpp/src/arrow/io/memory.cc
index 7d6e02e..c7d0ae5 100644
--- a/cpp/src/arrow/io/memory.cc
+++ b/cpp/src/arrow/io/memory.cc
@@ -212,7 +212,11 @@ BufferOutputStream::BufferOutputStream(const std::shared_ptr<ResizableBuffer>& b
       mutable_data_(buffer->mutable_data()) {}
 
 Status BufferOutputStream::Close() {
-  return Status::OK();
+  if (position_ < capacity_) {
+    return buffer_->Resize(position_);
+  } else {
+    return Status::OK();
+  }
 }
 
 Status BufferOutputStream::Tell(int64_t* position) {
@@ -228,8 +232,11 @@ Status BufferOutputStream::Write(const uint8_t* data, int64_t nbytes) {
 }
 
 Status BufferOutputStream::Reserve(int64_t nbytes) {
-  while (position_ + nbytes > capacity_) {
-    int64_t new_capacity = std::max(kBufferMinimumSize, capacity_ * 2);
+  int64_t new_capacity = capacity_;
+  while (position_ + nbytes > new_capacity) {
+    new_capacity = std::max(kBufferMinimumSize, new_capacity * 2);
+  }
+  if (new_capacity > capacity_) {
     RETURN_NOT_OK(buffer_->Resize(new_capacity));
     capacity_ = new_capacity;
   }

http://git-wip-us.apache.org/repos/asf/arrow/blob/a9747cea/cpp/src/arrow/ipc/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/CMakeLists.txt b/cpp/src/arrow/ipc/CMakeLists.txt
index bde8c5b..8dcd9ac 100644
--- a/cpp/src/arrow/ipc/CMakeLists.txt
+++ b/cpp/src/arrow/ipc/CMakeLists.txt
@@ -57,6 +57,13 @@ SET_TARGET_PROPERTIES(arrow_ipc PROPERTIES
   LINKER_LANGUAGE CXX
   LINK_FLAGS "${ARROW_IPC_LINK_FLAGS}")
 
+if (APPLE)
+  set_target_properties(arrow_ipc
+    PROPERTIES
+    BUILD_WITH_INSTALL_RPATH ON
+    INSTALL_NAME_DIR "@rpath")
+endif()
+
 ADD_ARROW_TEST(ipc-adapter-test)
 ARROW_TEST_LINK_LIBRARIES(ipc-adapter-test
   ${ARROW_IPC_TEST_LINK_LIBS})

http://git-wip-us.apache.org/repos/asf/arrow/blob/a9747cea/cpp/src/arrow/ipc/adapter.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/adapter.cc b/cpp/src/arrow/ipc/adapter.cc
index 99974a4..cd8ab53 100644
--- a/cpp/src/arrow/ipc/adapter.cc
+++ b/cpp/src/arrow/ipc/adapter.cc
@@ -162,15 +162,14 @@ class RecordBatchWriter {
     for (size_t i = 0; i < buffers_.size(); ++i) {
       const Buffer* buffer = buffers_[i].get();
       int64_t size = 0;
+      int64_t padding = 0;
 
       // The buffer might be null if we are handling zero row lengths.
       if (buffer) {
-        // We use capacity here, because size might not reflect the padding
-        // requirements of buffers but capacity always should.
-        size = buffer->capacity();
-        // check that padding is appropriate
-        RETURN_NOT_OK(CheckMultipleOf64(size));
+        size = buffer->size();
+        padding = util::RoundUpToMultipleOf64(size) - size;
       }
+
       // TODO(wesm): We currently have no notion of shared memory page id's,
       // but we've included it in the metadata IDL for when we have it in the
       // future. Use page=0 for now
@@ -179,12 +178,17 @@ class RecordBatchWriter {
       // are using from any OS-level shared memory. The thought is that systems
       // may (in the future) associate integer page id's with physical memory
       // pages (according to whatever is the desired shared memory mechanism)
-      buffer_meta_.push_back(flatbuf::Buffer(0, position, size));
+      buffer_meta_.push_back(flatbuf::Buffer(0, position, size + padding));
 
       if (size > 0) {
         RETURN_NOT_OK(dst->Write(buffer->data(), size));
         position += size;
       }
+
+      if (padding > 0) {
+        RETURN_NOT_OK(dst->Write(kPaddingBytes, padding));
+        position += padding;
+      }
     }
 
     *body_end_offset = position;

http://git-wip-us.apache.org/repos/asf/arrow/blob/a9747cea/cpp/src/arrow/ipc/util.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/util.h b/cpp/src/arrow/ipc/util.h
index 94079a3..9000d1b 100644
--- a/cpp/src/arrow/ipc/util.h
+++ b/cpp/src/arrow/ipc/util.h
@@ -29,7 +29,11 @@ namespace ipc {
 
 // Align on 8-byte boundaries
 static constexpr int kArrowAlignment = 8;
-static constexpr uint8_t kPaddingBytes[kArrowAlignment] = {0};
+
+// Buffers are padded to 64-byte boundaries (for SIMD)
+static constexpr int kArrowBufferAlignment = 64;
+
+static constexpr uint8_t kPaddingBytes[kArrowBufferAlignment] = {0};
 
 static inline int64_t PaddedLength(int64_t nbytes, int64_t alignment = kArrowAlignment) {
   return ((nbytes + alignment - 1) / alignment) * alignment;

http://git-wip-us.apache.org/repos/asf/arrow/blob/a9747cea/cpp/src/arrow/table-test.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/table-test.cc b/cpp/src/arrow/table-test.cc
index 385e7d8..743fb66 100644
--- a/cpp/src/arrow/table-test.cc
+++ b/cpp/src/arrow/table-test.cc
@@ -123,4 +123,31 @@ TEST_F(TestTable, InvalidColumns) {
   ASSERT_RAISES(Invalid, table_->ValidateColumns());
 }
 
+class TestRecordBatch : public TestBase {};
+
+TEST_F(TestRecordBatch, Equals) {
+  const int length = 10;
+
+  auto f0 = std::make_shared<Field>("f0", INT32);
+  auto f1 = std::make_shared<Field>("f1", UINT8);
+  auto f2 = std::make_shared<Field>("f2", INT16);
+
+  vector<shared_ptr<Field>> fields = {f0, f1, f2};
+  auto schema = std::make_shared<Schema>(fields);
+
+  auto a0 = MakePrimitive<Int32Array>(length);
+  auto a1 = MakePrimitive<UInt8Array>(length);
+  auto a2 = MakePrimitive<Int16Array>(length);
+
+  RecordBatch b1(schema, length, {a0, a1, a2});
+  RecordBatch b2(schema, 5, {a0, a1, a2});
+  RecordBatch b3(schema, length, {a0, a1});
+  RecordBatch b4(schema, length, {a0, a1, a1});
+
+  ASSERT_TRUE(b1.Equals(b1));
+  ASSERT_FALSE(b1.Equals(b2));
+  ASSERT_FALSE(b1.Equals(b3));
+  ASSERT_FALSE(b1.Equals(b4));
+}
+
 }  // namespace arrow

http://git-wip-us.apache.org/repos/asf/arrow/blob/a9747cea/cpp/src/arrow/table.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/table.cc b/cpp/src/arrow/table.cc
index 3a250df..af84f27 100644
--- a/cpp/src/arrow/table.cc
+++ b/cpp/src/arrow/table.cc
@@ -21,6 +21,7 @@
 #include <memory>
 #include <sstream>
 
+#include "arrow/array.h"
 #include "arrow/column.h"
 #include "arrow/schema.h"
 #include "arrow/util/status.h"
@@ -35,6 +36,21 @@ const std::string& RecordBatch::column_name(int i) const {
   return schema_->field(i)->name;
 }
 
+bool RecordBatch::Equals(const RecordBatch& other) const {
+  if (num_columns() != other.num_columns() || num_rows_ != other.num_rows()) {
+    return false;
+  }
+
+  for (int i = 0; i < num_columns(); ++i) {
+    if (!column(i)->Equals(other.column(i))) { return false; }
+  }
+
+  return true;
+}
+
+// ----------------------------------------------------------------------
+// Table methods
+
 Table::Table(const std::string& name, const std::shared_ptr<Schema>& schema,
     const std::vector<std::shared_ptr<Column>>& columns)
     : name_(name), schema_(schema), columns_(columns) {

http://git-wip-us.apache.org/repos/asf/arrow/blob/a9747cea/cpp/src/arrow/table.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/table.h b/cpp/src/arrow/table.h
index 36b3c8e..1a856c8 100644
--- a/cpp/src/arrow/table.h
+++ b/cpp/src/arrow/table.h
@@ -43,6 +43,8 @@ class ARROW_EXPORT RecordBatch {
   RecordBatch(const std::shared_ptr<Schema>& schema, int32_t num_rows,
       const std::vector<std::shared_ptr<Array>>& columns);
 
+  bool Equals(const RecordBatch& other) const;
+
   // @returns: the table's schema
   const std::shared_ptr<Schema>& schema() const { return schema_; }
 

http://git-wip-us.apache.org/repos/asf/arrow/blob/a9747cea/cpp/src/arrow/types/primitive-test.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/types/primitive-test.cc b/cpp/src/arrow/types/primitive-test.cc
index ffebb92..87eb0fe 100644
--- a/cpp/src/arrow/types/primitive-test.cc
+++ b/cpp/src/arrow/types/primitive-test.cc
@@ -238,8 +238,7 @@ void TestPrimitiveBuilder<PBoolean>::Check(
 }
 
 typedef ::testing::Types<PBoolean, PUInt8, PUInt16, PUInt32, PUInt64, PInt8, PInt16,
-    PInt32, PInt64, PFloat, PDouble>
-    Primitives;
+    PInt32, PInt64, PFloat, PDouble> Primitives;
 
 TYPED_TEST_CASE(TestPrimitiveBuilder, Primitives);
 

http://git-wip-us.apache.org/repos/asf/arrow/blob/a9747cea/cpp/src/arrow/util/bit-util.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/util/bit-util.h b/cpp/src/arrow/util/bit-util.h
index 873a195..3087ce7 100644
--- a/cpp/src/arrow/util/bit-util.h
+++ b/cpp/src/arrow/util/bit-util.h
@@ -19,6 +19,7 @@
 #define ARROW_UTIL_BIT_UTIL_H
 
 #include <cstdint>
+#include <limits>
 #include <memory>
 #include <vector>
 
@@ -77,6 +78,18 @@ static inline bool is_multiple_of_64(int64_t n) {
   return (n & 63) == 0;
 }
 
+inline int64_t RoundUpToMultipleOf64(int64_t num) {
+  // TODO(wesm): is this definitely needed?
+  // DCHECK_GE(num, 0);
+  constexpr int64_t round_to = 64;
+  constexpr int64_t force_carry_addend = round_to - 1;
+  constexpr int64_t truncate_bitmask = ~(round_to - 1);
+  constexpr int64_t max_roundable_num = std::numeric_limits<int64_t>::max() - round_to;
+  if (num <= max_roundable_num) { return (num + force_carry_addend) & truncate_bitmask; }
+  // handle overflow case.  This should result in a malloc error upstream
+  return num;
+}
+
 void bytes_to_bits(const std::vector<uint8_t>& bytes, uint8_t* bits);
 ARROW_EXPORT Status bytes_to_bits(const std::vector<uint8_t>&, std::shared_ptr<Buffer>*);
 

http://git-wip-us.apache.org/repos/asf/arrow/blob/a9747cea/cpp/src/arrow/util/buffer.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/util/buffer.cc b/cpp/src/arrow/util/buffer.cc
index 703ef83..6faa048 100644
--- a/cpp/src/arrow/util/buffer.cc
+++ b/cpp/src/arrow/util/buffer.cc
@@ -20,25 +20,13 @@
 #include <cstdint>
 #include <limits>
 
+#include "arrow/util/bit-util.h"
 #include "arrow/util/logging.h"
 #include "arrow/util/memory-pool.h"
 #include "arrow/util/status.h"
 
 namespace arrow {
 
-namespace {
-int64_t RoundUpToMultipleOf64(int64_t num) {
-  DCHECK_GE(num, 0);
-  constexpr int64_t round_to = 64;
-  constexpr int64_t force_carry_addend = round_to - 1;
-  constexpr int64_t truncate_bitmask = ~(round_to - 1);
-  constexpr int64_t max_roundable_num = std::numeric_limits<int64_t>::max() - round_to;
-  if (num <= max_roundable_num) { return (num + force_carry_addend) & truncate_bitmask; }
-  // handle overflow case.  This should result in a malloc error upstream
-  return num;
-}
-}  // namespace
-
 Buffer::Buffer(const std::shared_ptr<Buffer>& parent, int64_t offset, int64_t size) {
   data_ = parent->data() + offset;
   size_ = size;
@@ -64,7 +52,7 @@ PoolBuffer::~PoolBuffer() {
 Status PoolBuffer::Reserve(int64_t new_capacity) {
   if (!mutable_data_ || new_capacity > capacity_) {
     uint8_t* new_data;
-    new_capacity = RoundUpToMultipleOf64(new_capacity);
+    new_capacity = util::RoundUpToMultipleOf64(new_capacity);
     if (mutable_data_) {
       RETURN_NOT_OK(pool_->Allocate(new_capacity, &new_data));
       memcpy(new_data, mutable_data_, size_);

http://git-wip-us.apache.org/repos/asf/arrow/blob/a9747cea/cpp/src/arrow/util/buffer.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/util/buffer.h b/cpp/src/arrow/util/buffer.h
index 1aeebc6..01e4259 100644
--- a/cpp/src/arrow/util/buffer.h
+++ b/cpp/src/arrow/util/buffer.h
@@ -23,7 +23,6 @@
 #include <cstring>
 #include <memory>
 
-#include "arrow/util/bit-util.h"
 #include "arrow/util/macros.h"
 #include "arrow/util/status.h"
 #include "arrow/util/visibility.h"

http://git-wip-us.apache.org/repos/asf/arrow/blob/a9747cea/cpp/src/arrow/util/logging.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/util/logging.h b/cpp/src/arrow/util/logging.h
index b22f07d..06ee841 100644
--- a/cpp/src/arrow/util/logging.h
+++ b/cpp/src/arrow/util/logging.h
@@ -118,9 +118,9 @@ class CerrLog {
 class FatalLog : public CerrLog {
  public:
   explicit FatalLog(int /* severity */)  // NOLINT
-      : CerrLog(ARROW_FATAL){}           // NOLINT
+      : CerrLog(ARROW_FATAL) {}          // NOLINT
 
-            [[noreturn]] ~FatalLog() {
+  [[noreturn]] ~FatalLog() {
     if (has_logged_) { std::cerr << std::endl; }
     std::exit(1);
   }

http://git-wip-us.apache.org/repos/asf/arrow/blob/a9747cea/python/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/python/CMakeLists.txt b/python/CMakeLists.txt
index 77a771a..55f6d05 100644
--- a/python/CMakeLists.txt
+++ b/python/CMakeLists.txt
@@ -352,6 +352,8 @@ ADD_THIRDPARTY_LIB(arrow
   SHARED_LIB ${ARROW_SHARED_LIB})
 ADD_THIRDPARTY_LIB(arrow_io
   SHARED_LIB ${ARROW_IO_SHARED_LIB})
+ADD_THIRDPARTY_LIB(arrow_ipc
+  SHARED_LIB ${ARROW_IPC_SHARED_LIB})
 
 ############################################################
 # Linker setup
@@ -415,6 +417,8 @@ if (UNIX)
   set(CMAKE_BUILD_WITH_INSTALL_RPATH TRUE)
 endif()
 
+SET(CMAKE_INSTALL_RPATH_USE_LINK_PATH TRUE)
+
 add_subdirectory(src/pyarrow)
 add_subdirectory(src/pyarrow/util)
 
@@ -423,6 +427,7 @@ set(CYTHON_EXTENSIONS
   config
   error
   io
+  ipc
   scalar
   schema
   table
@@ -442,6 +447,7 @@ set(PYARROW_SRCS
 set(LINK_LIBS
   arrow
   arrow_io
+  arrow_ipc
 )
 
 if(PARQUET_FOUND AND PARQUET_ARROW_FOUND)
@@ -455,8 +461,6 @@ if(PARQUET_FOUND AND PARQUET_ARROW_FOUND)
     parquet)
 endif()
 
-SET(CMAKE_INSTALL_RPATH_USE_LINK_PATH TRUE)
-
 add_library(pyarrow SHARED
   ${PYARROW_SRCS})
 target_link_libraries(pyarrow ${LINK_LIBS})

http://git-wip-us.apache.org/repos/asf/arrow/blob/a9747cea/python/cmake_modules/FindArrow.cmake
----------------------------------------------------------------------
diff --git a/python/cmake_modules/FindArrow.cmake b/python/cmake_modules/FindArrow.cmake
index 9919746..3c359aa 100644
--- a/python/cmake_modules/FindArrow.cmake
+++ b/python/cmake_modules/FindArrow.cmake
@@ -47,10 +47,16 @@ find_library(ARROW_IO_LIB_PATH NAMES arrow_io
   ${ARROW_SEARCH_LIB_PATH}
   NO_DEFAULT_PATH)
 
+find_library(ARROW_IPC_LIB_PATH NAMES arrow_ipc
+  PATHS
+  ${ARROW_SEARCH_LIB_PATH}
+  NO_DEFAULT_PATH)
+
 if (ARROW_INCLUDE_DIR AND ARROW_LIB_PATH)
   set(ARROW_FOUND TRUE)
   set(ARROW_LIB_NAME libarrow)
   set(ARROW_IO_LIB_NAME libarrow_io)
+  set(ARROW_IPC_LIB_NAME libarrow_ipc)
 
   set(ARROW_LIBS ${ARROW_SEARCH_LIB_PATH})
   set(ARROW_STATIC_LIB ${ARROW_SEARCH_LIB_PATH}/${ARROW_LIB_NAME}.a)
@@ -58,9 +64,14 @@ if (ARROW_INCLUDE_DIR AND ARROW_LIB_PATH)
 
   set(ARROW_IO_STATIC_LIB ${ARROW_SEARCH_LIB_PATH}/${ARROW_IO_LIB_NAME}.a)
   set(ARROW_IO_SHARED_LIB ${ARROW_LIBS}/${ARROW_IO_LIB_NAME}${CMAKE_SHARED_LIBRARY_SUFFIX})
+
+  set(ARROW_IPC_STATIC_LIB ${ARROW_SEARCH_LIB_PATH}/${ARROW_IPC_LIB_NAME}.a)
+  set(ARROW_IPC_SHARED_LIB ${ARROW_LIBS}/${ARROW_IPC_LIB_NAME}${CMAKE_SHARED_LIBRARY_SUFFIX})
+
   if (NOT Arrow_FIND_QUIETLY)
     message(STATUS "Found the Arrow core library: ${ARROW_LIB_PATH}")
     message(STATUS "Found the Arrow IO library: ${ARROW_IO_LIB_PATH}")
+    message(STATUS "Found the Arrow IPC library: ${ARROW_IPC_LIB_PATH}")
   endif ()
 else ()
   if (NOT Arrow_FIND_QUIETLY)

http://git-wip-us.apache.org/repos/asf/arrow/blob/a9747cea/python/pyarrow/__init__.py
----------------------------------------------------------------------
diff --git a/python/pyarrow/__init__.py b/python/pyarrow/__init__.py
index 7561f6d..8b131aa 100644
--- a/python/pyarrow/__init__.py
+++ b/python/pyarrow/__init__.py
@@ -41,5 +41,4 @@ from pyarrow.schema import (null, bool_,
                             list_, struct, field,
                             DataType, Field, Schema, schema)
 
-from pyarrow.array import RowBatch
-from pyarrow.table import Column, Table, from_pandas_dataframe
+from pyarrow.table import Column, RecordBatch, Table, from_pandas_dataframe

http://git-wip-us.apache.org/repos/asf/arrow/blob/a9747cea/python/pyarrow/array.pyx
----------------------------------------------------------------------
diff --git a/python/pyarrow/array.pyx b/python/pyarrow/array.pyx
index cdbe73a..84ab4a4 100644
--- a/python/pyarrow/array.pyx
+++ b/python/pyarrow/array.pyx
@@ -37,7 +37,7 @@ import pyarrow.schema as schema
 
 
 def total_allocated_bytes():
-    cdef MemoryPool* pool = pyarrow.GetMemoryPool()
+    cdef MemoryPool* pool = pyarrow.get_memory_pool()
     return pool.bytes_allocated()
 
 
@@ -243,12 +243,14 @@ def from_pandas_series(object series, object mask=None, timestamps_to_ms=False):
         series_values = series_values.astype('datetime64[ms]')
 
     if mask is None:
-        check_status(pyarrow.PandasToArrow(pyarrow.GetMemoryPool(),
-                                           series_values, &out))
+        with nogil:
+            check_status(pyarrow.PandasToArrow(pyarrow.get_memory_pool(),
+                                               series_values, &out))
     else:
         mask = series_as_ndarray(mask)
-        check_status(pyarrow.PandasMaskedToArrow(
-            pyarrow.GetMemoryPool(), series_values, mask, &out))
+        with nogil:
+            check_status(pyarrow.PandasMaskedToArrow(
+                pyarrow.get_memory_pool(), series_values, mask, &out))
 
     return box_arrow_array(out)
 
@@ -262,35 +264,3 @@ cdef object series_as_ndarray(object obj):
         result = obj
 
     return result
-
-#----------------------------------------------------------------------
-# Table-like data structures
-
-cdef class RowBatch:
-    """
-
-    """
-    cdef readonly:
-        Schema schema
-        int num_rows
-        list arrays
-
-    def __cinit__(self, Schema schema, int num_rows, list arrays):
-        self.schema = schema
-        self.num_rows = num_rows
-        self.arrays = arrays
-
-        if len(self.schema) != len(arrays):
-            raise ValueError('Mismatch number of data arrays and '
-                             'schema fields')
-
-    def __len__(self):
-        return self.num_rows
-
-    property num_columns:
-
-        def __get__(self):
-            return len(self.arrays)
-
-    def __getitem__(self, i):
-        return self.arrays[i]

http://git-wip-us.apache.org/repos/asf/arrow/blob/a9747cea/python/pyarrow/includes/common.pxd
----------------------------------------------------------------------
diff --git a/python/pyarrow/includes/common.pxd b/python/pyarrow/includes/common.pxd
index 133797b..05c0123 100644
--- a/python/pyarrow/includes/common.pxd
+++ b/python/pyarrow/includes/common.pxd
@@ -47,7 +47,3 @@ cdef extern from "arrow/api.h" namespace "arrow" nogil:
         c_bool IsKeyError()
         c_bool IsNotImplemented()
         c_bool IsInvalid()
-
-    cdef cppclass Buffer:
-        uint8_t* data()
-        int64_t size()

http://git-wip-us.apache.org/repos/asf/arrow/blob/a9747cea/python/pyarrow/includes/libarrow.pxd
----------------------------------------------------------------------
diff --git a/python/pyarrow/includes/libarrow.pxd b/python/pyarrow/includes/libarrow.pxd
index 854d07d..3ae1789 100644
--- a/python/pyarrow/includes/libarrow.pxd
+++ b/python/pyarrow/includes/libarrow.pxd
@@ -54,6 +54,18 @@ cdef extern from "arrow/api.h" namespace "arrow" nogil:
     cdef cppclass MemoryPool" arrow::MemoryPool":
         int64_t bytes_allocated()
 
+    cdef cppclass CBuffer" arrow::Buffer":
+        uint8_t* data()
+        int64_t size()
+
+    cdef cppclass ResizableBuffer(CBuffer):
+        CStatus Resize(int64_t nbytes)
+        CStatus Reserve(int64_t nbytes)
+
+    cdef cppclass PoolBuffer(ResizableBuffer):
+        PoolBuffer()
+        PoolBuffer(MemoryPool*)
+
     cdef MemoryPool* default_memory_pool()
 
     cdef cppclass CListType" arrow::ListType"(CDataType):
@@ -149,6 +161,21 @@ cdef extern from "arrow/api.h" namespace "arrow" nogil:
         const shared_ptr[CDataType]& type()
         const shared_ptr[CChunkedArray]& data()
 
+    cdef cppclass CRecordBatch" arrow::RecordBatch":
+        CRecordBatch(const shared_ptr[CSchema]& schema, int32_t num_rows,
+                     const vector[shared_ptr[CArray]]& columns)
+
+        c_bool Equals(const CRecordBatch& other)
+
+        const shared_ptr[CSchema]& schema()
+        const shared_ptr[CArray]& column(int i)
+        const c_string& column_name(int i)
+
+        const vector[shared_ptr[CArray]]& columns()
+
+        int num_columns()
+        int32_t num_rows()
+
     cdef cppclass CTable" arrow::Table":
         CTable(const c_string& name, const shared_ptr[CSchema]& schema,
                const vector[shared_ptr[CColumn]]& columns)
@@ -186,7 +213,7 @@ cdef extern from "arrow/ipc/metadata.h" namespace "arrow::ipc" nogil:
         MessageType_DICTIONARY_BATCH" arrow::ipc::Message::DICTIONARY_BATCH"
 
     cdef cppclass Message:
-        CStatus Open(const shared_ptr[Buffer]& buf,
+        CStatus Open(const shared_ptr[CBuffer]& buf,
                      shared_ptr[Message]* out)
         int64_t body_length()
         MessageType type()

http://git-wip-us.apache.org/repos/asf/arrow/blob/a9747cea/python/pyarrow/includes/libarrow_io.pxd
----------------------------------------------------------------------
diff --git a/python/pyarrow/includes/libarrow_io.pxd b/python/pyarrow/includes/libarrow_io.pxd
index 56d8d4c..8074915 100644
--- a/python/pyarrow/includes/libarrow_io.pxd
+++ b/python/pyarrow/includes/libarrow_io.pxd
@@ -18,7 +18,7 @@
 # distutils: language = c++
 
 from pyarrow.includes.common cimport *
-from pyarrow.includes.libarrow cimport MemoryPool
+from pyarrow.includes.libarrow cimport *
 
 cdef extern from "arrow/io/interfaces.h" namespace "arrow::io" nogil:
     enum FileMode" arrow::io::FileMode::type":
@@ -36,7 +36,7 @@ cdef extern from "arrow/io/interfaces.h" namespace "arrow::io" nogil:
         FileMode mode()
 
     cdef cppclass Readable:
-        CStatus ReadB" Read"(int64_t nbytes, shared_ptr[Buffer]* out)
+        CStatus ReadB" Read"(int64_t nbytes, shared_ptr[CBuffer]* out)
         CStatus Read(int64_t nbytes, int64_t* bytes_read, uint8_t* out)
 
     cdef cppclass Seekable:
@@ -57,7 +57,7 @@ cdef extern from "arrow/io/interfaces.h" namespace "arrow::io" nogil:
         CStatus ReadAt(int64_t position, int64_t nbytes,
                        int64_t* bytes_read, uint8_t* buffer)
         CStatus ReadAt(int64_t position, int64_t nbytes,
-                       int64_t* bytes_read, shared_ptr[Buffer]* out)
+                       int64_t* bytes_read, shared_ptr[CBuffer]* out)
 
     cdef cppclass WriteableFileInterface(OutputStream, Seekable):
         CStatus WriteAt(int64_t position, const uint8_t* data,
@@ -143,9 +143,9 @@ cdef extern from "arrow/io/hdfs.h" namespace "arrow::io" nogil:
 
 
 cdef extern from "arrow/io/memory.h" namespace "arrow::io" nogil:
-    cdef cppclass BufferReader(ReadableFileInterface):
-        BufferReader(const uint8_t* data, int64_t nbytes)
+    cdef cppclass CBufferReader" arrow::io::BufferReader"\
+        (ReadableFileInterface):
+        CBufferReader(const uint8_t* data, int64_t nbytes)
 
     cdef cppclass BufferOutputStream(OutputStream):
-        # TODO(wesm)
-        pass
+        BufferOutputStream(const shared_ptr[ResizableBuffer]& buffer)

http://git-wip-us.apache.org/repos/asf/arrow/blob/a9747cea/python/pyarrow/includes/libarrow_ipc.pxd
----------------------------------------------------------------------
diff --git a/python/pyarrow/includes/libarrow_ipc.pxd b/python/pyarrow/includes/libarrow_ipc.pxd
new file mode 100644
index 0000000..eda5b9b
--- /dev/null
+++ b/python/pyarrow/includes/libarrow_ipc.pxd
@@ -0,0 +1,52 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# distutils: language = c++
+
+from pyarrow.includes.common cimport *
+from pyarrow.includes.libarrow cimport (MemoryPool, CArray, CSchema,
+                                        CRecordBatch)
+from pyarrow.includes.libarrow_io cimport (OutputStream, ReadableFileInterface)
+
+cdef extern from "arrow/ipc/file.h" namespace "arrow::ipc" nogil:
+
+    cdef cppclass CFileWriter " arrow::ipc::FileWriter":
+        @staticmethod
+        CStatus Open(OutputStream* sink, const shared_ptr[CSchema]& schema,
+                     shared_ptr[CFileWriter]* out)
+
+        CStatus WriteRecordBatch(const vector[shared_ptr[CArray]]& columns,
+                                 int32_t num_rows)
+
+        CStatus Close()
+
+    cdef cppclass CFileReader " arrow::ipc::FileReader":
+
+        @staticmethod
+        CStatus Open(const shared_ptr[ReadableFileInterface]& file,
+                     shared_ptr[CFileReader]* out)
+
+        @staticmethod
+        CStatus Open2" Open"(const shared_ptr[ReadableFileInterface]& file,
+                     int64_t footer_offset, shared_ptr[CFileReader]* out)
+
+        const shared_ptr[CSchema]& schema()
+
+        int num_dictionaries()
+        int num_record_batches()
+
+        CStatus GetRecordBatch(int i, shared_ptr[CRecordBatch]* batch)

http://git-wip-us.apache.org/repos/asf/arrow/blob/a9747cea/python/pyarrow/includes/pyarrow.pxd
----------------------------------------------------------------------
diff --git a/python/pyarrow/includes/pyarrow.pxd b/python/pyarrow/includes/pyarrow.pxd
index 4c97166..2fa5a7d 100644
--- a/python/pyarrow/includes/pyarrow.pxd
+++ b/python/pyarrow/includes/pyarrow.pxd
@@ -18,8 +18,8 @@
 # distutils: language = c++
 
 from pyarrow.includes.common cimport *
-from pyarrow.includes.libarrow cimport (CArray, CColumn, CDataType, CStatus,
-                                        Type, MemoryPool)
+from pyarrow.includes.libarrow cimport (CArray, CBuffer, CColumn,
+                                        CDataType, CStatus, Type, MemoryPool)
 
 cimport pyarrow.includes.libarrow_io as arrow_io
 
@@ -53,7 +53,12 @@ cdef extern from "pyarrow/api.h" namespace "pyarrow" nogil:
     PyStatus ArrowToPandas(const shared_ptr[CColumn]& arr, object py_ref,
                            PyObject** out)
 
-    MemoryPool* GetMemoryPool()
+    MemoryPool* get_memory_pool()
+
+
+cdef extern from "pyarrow/common.h" namespace "pyarrow" nogil:
+    cdef cppclass PyBytesBuffer(CBuffer):
+        PyBytesBuffer(object o)
 
 
 cdef extern from "pyarrow/io.h" namespace "pyarrow" nogil:
@@ -63,5 +68,5 @@ cdef extern from "pyarrow/io.h" namespace "pyarrow" nogil:
     cdef cppclass PyOutputStream(arrow_io.OutputStream):
         PyOutputStream(object fo)
 
-    cdef cppclass PyBytesReader(arrow_io.BufferReader):
+    cdef cppclass PyBytesReader(arrow_io.CBufferReader):
         PyBytesReader(object fo)

http://git-wip-us.apache.org/repos/asf/arrow/blob/a9747cea/python/pyarrow/io.pxd
----------------------------------------------------------------------
diff --git a/python/pyarrow/io.pxd b/python/pyarrow/io.pxd
index 1dbb3fd..d6966cd 100644
--- a/python/pyarrow/io.pxd
+++ b/python/pyarrow/io.pxd
@@ -22,6 +22,11 @@ from pyarrow.includes.libarrow cimport *
 from pyarrow.includes.libarrow_io cimport (ReadableFileInterface,
                                            OutputStream)
 
+cdef class Buffer:
+    cdef:
+        shared_ptr[CBuffer] buffer
+
+    cdef init(self, const shared_ptr[CBuffer]& buffer)
 
 cdef class NativeFile:
     cdef:
@@ -29,6 +34,7 @@ cdef class NativeFile:
         shared_ptr[OutputStream] wr_file
         bint is_readonly
         bint is_open
+        bint own_file
 
     # By implementing these "virtual" functions (all functions in Cython
     # extension classes are technically virtual in the C++ sense) we can expose

http://git-wip-us.apache.org/repos/asf/arrow/blob/a9747cea/python/pyarrow/io.pyx
----------------------------------------------------------------------
diff --git a/python/pyarrow/io.pyx b/python/pyarrow/io.pyx
index e6e2b62..00a492f 100644
--- a/python/pyarrow/io.pyx
+++ b/python/pyarrow/io.pyx
@@ -36,6 +36,217 @@ import re
 import sys
 import threading
 
+
+cdef class NativeFile:
+
+    def __cinit__(self):
+        self.is_open = False
+        self.own_file = False
+
+    def __dealloc__(self):
+        if self.is_open and self.own_file:
+            self.close()
+
+    def __enter__(self):
+        return self
+
+    def __exit__(self, exc_type, exc_value, tb):
+        self.close()
+
+    def close(self):
+        if self.is_open:
+            with nogil:
+                if self.is_readonly:
+                    check_cstatus(self.rd_file.get().Close())
+                else:
+                    check_cstatus(self.wr_file.get().Close())
+        self.is_open = False
+
+    cdef read_handle(self, shared_ptr[ReadableFileInterface]* file):
+        self._assert_readable()
+        file[0] = <shared_ptr[ReadableFileInterface]> self.rd_file
+
+    cdef write_handle(self, shared_ptr[OutputStream]* file):
+        self._assert_writeable()
+        file[0] = <shared_ptr[OutputStream]> self.wr_file
+
+    def _assert_readable(self):
+        if not self.is_readonly:
+            raise IOError("only valid on readonly files")
+
+        if not self.is_open:
+            raise IOError("file not open")
+
+    def _assert_writeable(self):
+        if self.is_readonly:
+            raise IOError("only valid on writeonly files")
+
+        if not self.is_open:
+            raise IOError("file not open")
+
+    def size(self):
+        cdef int64_t size
+        self._assert_readable()
+        with nogil:
+            check_cstatus(self.rd_file.get().GetSize(&size))
+        return size
+
+    def tell(self):
+        cdef int64_t position
+        with nogil:
+            if self.is_readonly:
+                check_cstatus(self.rd_file.get().Tell(&position))
+            else:
+                check_cstatus(self.wr_file.get().Tell(&position))
+        return position
+
+    def seek(self, int64_t position):
+        self._assert_readable()
+        with nogil:
+            check_cstatus(self.rd_file.get().Seek(position))
+
+    def write(self, data):
+        """
+        Write bytes-like (unicode, encoded to UTF-8) to file
+        """
+        self._assert_writeable()
+
+        data = tobytes(data)
+
+        cdef const uint8_t* buf = <const uint8_t*> cp.PyBytes_AS_STRING(data)
+        cdef int64_t bufsize = len(data)
+        with nogil:
+            check_cstatus(self.wr_file.get().Write(buf, bufsize))
+
+    def read(self, int nbytes):
+        cdef:
+            int64_t bytes_read = 0
+            uint8_t* buf
+            shared_ptr[CBuffer] out
+
+        self._assert_readable()
+
+        with nogil:
+            check_cstatus(self.rd_file.get()
+                          .ReadB(nbytes, &out))
+
+        result = cp.PyBytes_FromStringAndSize(
+            <const char*>out.get().data(), out.get().size())
+
+        return result
+
+
+# ----------------------------------------------------------------------
+# Python file-like objects
+
+cdef class PythonFileInterface(NativeFile):
+    cdef:
+        object handle
+
+    def __cinit__(self, handle, mode='w'):
+        self.handle = handle
+
+        if mode.startswith('w'):
+            self.wr_file.reset(new pyarrow.PyOutputStream(handle))
+            self.is_readonly = 0
+        elif mode.startswith('r'):
+            self.rd_file.reset(new pyarrow.PyReadableFile(handle))
+            self.is_readonly = 1
+        else:
+            raise ValueError('Invalid file mode: {0}'.format(mode))
+
+        self.is_open = True
+
+
+cdef class BytesReader(NativeFile):
+    cdef:
+        object obj
+
+    def __cinit__(self, obj):
+        if not isinstance(obj, bytes):
+            raise ValueError('Must pass bytes object')
+
+        self.obj = obj
+        self.is_readonly = 1
+        self.is_open = True
+
+        self.rd_file.reset(new pyarrow.PyBytesReader(obj))
+
+# ----------------------------------------------------------------------
+# Arrow buffers
+
+
+cdef class Buffer:
+
+    def __cinit__(self):
+        pass
+
+    cdef init(self, const shared_ptr[CBuffer]& buffer):
+        self.buffer = buffer
+
+    def __len__(self):
+        return self.size
+
+    property size:
+
+        def __get__(self):
+            return self.buffer.get().size()
+
+    def __getitem__(self, key):
+        # TODO(wesm): buffer slicing
+        raise NotImplementedError
+
+    def to_pybytes(self):
+        return cp.PyBytes_FromStringAndSize(
+            <const char*>self.buffer.get().data(),
+            self.buffer.get().size())
+
+
+cdef shared_ptr[PoolBuffer] allocate_buffer():
+    cdef shared_ptr[PoolBuffer] result
+    result.reset(new PoolBuffer(pyarrow.get_memory_pool()))
+    return result
+
+
+cdef class InMemoryOutputStream(NativeFile):
+
+    cdef:
+        shared_ptr[PoolBuffer] buffer
+
+    def __cinit__(self):
+        self.buffer = allocate_buffer()
+        self.wr_file.reset(new BufferOutputStream(
+            <shared_ptr[ResizableBuffer]> self.buffer))
+        self.is_readonly = 0
+        self.is_open = True
+
+    def get_result(self):
+        cdef Buffer result = Buffer()
+
+        check_cstatus(self.wr_file.get().Close())
+        result.init(<shared_ptr[CBuffer]> self.buffer)
+
+        self.is_open = False
+        return result
+
+
+def buffer_from_bytes(object obj):
+    """
+    Construct an Arrow buffer from a Python bytes object
+    """
+    if not isinstance(obj, bytes):
+        raise ValueError('Must pass bytes object')
+
+    cdef shared_ptr[CBuffer] buf
+    buf.reset(new pyarrow.PyBytesBuffer(obj))
+
+    cdef Buffer result = Buffer()
+    result.init(buf)
+    return result
+
+# ----------------------------------------------------------------------
+# HDFS IO implementation
+
 _HDFS_PATH_RE = re.compile('hdfs://(.*):(\d+)(.*)')
 
 try:
@@ -274,6 +485,7 @@ cdef class HdfsClient:
         out.buffer_size = c_buffer_size
         out.parent = self
         out.is_open = True
+        out.own_file = True
 
         return out
 
@@ -322,134 +534,6 @@ cdef class HdfsClient:
         f.download(stream)
 
 
-cdef class NativeFile:
-
-    def __cinit__(self):
-        self.is_open = False
-
-    def __dealloc__(self):
-        if self.is_open:
-            self.close()
-
-    def __enter__(self):
-        return self
-
-    def __exit__(self, exc_type, exc_value, tb):
-        self.close()
-
-    def close(self):
-        if self.is_open:
-            with nogil:
-                if self.is_readonly:
-                    check_cstatus(self.rd_file.get().Close())
-                else:
-                    check_cstatus(self.wr_file.get().Close())
-        self.is_open = False
-
-    cdef read_handle(self, shared_ptr[ReadableFileInterface]* file):
-        self._assert_readable()
-        file[0] = <shared_ptr[ReadableFileInterface]> self.rd_file
-
-    cdef write_handle(self, shared_ptr[OutputStream]* file):
-        self._assert_writeable()
-        file[0] = <shared_ptr[OutputStream]> self.wr_file
-
-    def _assert_readable(self):
-        if not self.is_readonly:
-            raise IOError("only valid on readonly files")
-
-    def _assert_writeable(self):
-        if self.is_readonly:
-            raise IOError("only valid on writeonly files")
-
-    def size(self):
-        cdef int64_t size
-        self._assert_readable()
-        with nogil:
-            check_cstatus(self.rd_file.get().GetSize(&size))
-        return size
-
-    def tell(self):
-        cdef int64_t position
-        with nogil:
-            if self.is_readonly:
-                check_cstatus(self.rd_file.get().Tell(&position))
-            else:
-                check_cstatus(self.wr_file.get().Tell(&position))
-        return position
-
-    def seek(self, int64_t position):
-        self._assert_readable()
-        with nogil:
-            check_cstatus(self.rd_file.get().Seek(position))
-
-    def write(self, data):
-        """
-        Write bytes-like (unicode, encoded to UTF-8) to file
-        """
-        self._assert_writeable()
-
-        data = tobytes(data)
-
-        cdef const uint8_t* buf = <const uint8_t*> cp.PyBytes_AS_STRING(data)
-        cdef int64_t bufsize = len(data)
-        with nogil:
-            check_cstatus(self.wr_file.get().Write(buf, bufsize))
-
-    def read(self, int nbytes):
-        cdef:
-            int64_t bytes_read = 0
-            uint8_t* buf
-            shared_ptr[Buffer] out
-
-        self._assert_readable()
-
-        with nogil:
-            check_cstatus(self.rd_file.get()
-                          .ReadB(nbytes, &out))
-
-        result = cp.PyBytes_FromStringAndSize(
-            <const char*>out.get().data(), out.get().size())
-
-        return result
-
-
-# ----------------------------------------------------------------------
-# Python file-like objects
-
-cdef class PythonFileInterface(NativeFile):
-    cdef:
-        object handle
-
-    def __cinit__(self, handle, mode='w'):
-        self.handle = handle
-
-        if mode.startswith('w'):
-            self.wr_file.reset(new pyarrow.PyOutputStream(handle))
-            self.is_readonly = 0
-        elif mode.startswith('r'):
-            self.rd_file.reset(new pyarrow.PyReadableFile(handle))
-            self.is_readonly = 1
-        else:
-            raise ValueError('Invalid file mode: {0}'.format(mode))
-
-        self.is_open = True
-
-
-cdef class BytesReader(NativeFile):
-    cdef:
-        object obj
-
-    def __cinit__(self, obj):
-        if not isinstance(obj, bytes):
-            raise ValueError('Must pass bytes object')
-
-        self.obj = obj
-        self.is_readonly = 1
-        self.is_open = True
-
-        self.rd_file.reset(new pyarrow.PyBytesReader(obj))
-
 # ----------------------------------------------------------------------
 # Specialization for HDFS
 

http://git-wip-us.apache.org/repos/asf/arrow/blob/a9747cea/python/pyarrow/ipc.pyx
----------------------------------------------------------------------
diff --git a/python/pyarrow/ipc.pyx b/python/pyarrow/ipc.pyx
new file mode 100644
index 0000000..f8da3a7
--- /dev/null
+++ b/python/pyarrow/ipc.pyx
@@ -0,0 +1,155 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Cython wrappers for arrow::ipc
+
+# cython: profile=False
+# distutils: language = c++
+# cython: embedsignature = True
+
+from pyarrow.includes.libarrow cimport *
+from pyarrow.includes.libarrow_io cimport *
+from pyarrow.includes.libarrow_ipc cimport *
+cimport pyarrow.includes.pyarrow as pyarrow
+
+from pyarrow.error cimport check_cstatus
+from pyarrow.io cimport NativeFile
+from pyarrow.schema cimport Schema
+from pyarrow.table cimport RecordBatch
+
+from pyarrow.compat import frombytes, tobytes
+import pyarrow.io as io
+
+cimport cpython as cp
+
+
+cdef get_reader(source, shared_ptr[ReadableFileInterface]* reader):
+    cdef NativeFile nf
+
+    if isinstance(source, bytes):
+        source = io.BytesReader(source)
+    elif not isinstance(source, io.NativeFile) and hasattr(source, 'read'):
+        # Optimistically hope this is file-like
+        source = io.PythonFileInterface(source, mode='r')
+
+    if isinstance(source, NativeFile):
+        nf = source
+
+        # TODO: what about read-write sources (e.g. memory maps)
+        if not nf.is_readonly:
+            raise IOError('Native file is not readable')
+
+        nf.read_handle(reader)
+    else:
+        raise TypeError('Unable to read from object of type: {0}'
+                        .format(type(source)))
+
+
+cdef get_writer(source, shared_ptr[OutputStream]* writer):
+    cdef NativeFile nf
+
+    if not isinstance(source, io.NativeFile) and hasattr(source, 'write'):
+        # Optimistically hope this is file-like
+        source = io.PythonFileInterface(source, mode='w')
+
+    if isinstance(source, io.NativeFile):
+        nf = source
+
+        if nf.is_readonly:
+            raise IOError('Native file is not writeable')
+
+        nf.write_handle(writer)
+    else:
+        raise TypeError('Unable to read from object of type: {0}'
+                        .format(type(source)))
+
+
+cdef class ArrowFileWriter:
+    cdef:
+        shared_ptr[CFileWriter] writer
+        shared_ptr[OutputStream] sink
+        bint closed
+
+    def __cinit__(self, sink, Schema schema):
+        self.closed = True
+        get_writer(sink, &self.sink)
+
+        with nogil:
+            check_cstatus(CFileWriter.Open(self.sink.get(), schema.sp_schema,
+                                           &self.writer))
+
+        self.closed = False
+
+    def __dealloc__(self):
+        if not self.closed:
+            self.close()
+
+    def write_record_batch(self, RecordBatch batch):
+        cdef CRecordBatch* bptr = batch.batch
+        with nogil:
+            check_cstatus(self.writer.get()
+                          .WriteRecordBatch(bptr.columns(), bptr.num_rows()))
+
+    def close(self):
+        with nogil:
+            check_cstatus(self.writer.get().Close())
+        self.closed = True
+
+
+cdef class ArrowFileReader:
+    cdef:
+        shared_ptr[CFileReader] reader
+
+    def __cinit__(self, source, footer_offset=None):
+        cdef shared_ptr[ReadableFileInterface] reader
+        get_reader(source, &reader)
+
+        cdef int64_t offset = 0
+        if footer_offset is not None:
+            offset = footer_offset
+
+        with nogil:
+            if offset != 0:
+                check_cstatus(CFileReader.Open2(reader, offset, &self.reader))
+            else:
+                check_cstatus(CFileReader.Open(reader, &self.reader))
+
+    property num_dictionaries:
+
+        def __get__(self):
+            return self.reader.get().num_dictionaries()
+
+    property num_record_batches:
+
+        def __get__(self):
+            return self.reader.get().num_record_batches()
+
+    def get_record_batch(self, int i):
+        cdef:
+            shared_ptr[CRecordBatch] batch
+            RecordBatch result
+
+        if i < 0 or i >= self.num_record_batches:
+            raise ValueError('Batch number {0} out of range'.format(i))
+
+        with nogil:
+            check_cstatus(self.reader.get().GetRecordBatch(i, &batch))
+
+        result = RecordBatch()
+        result.init(batch)
+
+        return result

http://git-wip-us.apache.org/repos/asf/arrow/blob/a9747cea/python/pyarrow/table.pxd
----------------------------------------------------------------------
diff --git a/python/pyarrow/table.pxd b/python/pyarrow/table.pxd
index 0a5c122..79c9ae3 100644
--- a/python/pyarrow/table.pxd
+++ b/python/pyarrow/table.pxd
@@ -16,7 +16,10 @@
 # under the License.
 
 from pyarrow.includes.common cimport shared_ptr
-from pyarrow.includes.libarrow cimport CChunkedArray, CColumn, CTable
+from pyarrow.includes.libarrow cimport (CChunkedArray, CColumn, CTable,
+                                        CRecordBatch)
+
+from pyarrow.schema cimport Schema
 
 
 cdef class ChunkedArray:
@@ -41,6 +44,16 @@ cdef class Table:
     cdef:
         shared_ptr[CTable] sp_table
         CTable* table
-    
+
     cdef init(self, const shared_ptr[CTable]& table)
     cdef _check_nullptr(self)
+
+
+cdef class RecordBatch:
+    cdef:
+        shared_ptr[CRecordBatch] sp_batch
+        CRecordBatch* batch
+        Schema _schema
+
+    cdef init(self, const shared_ptr[CRecordBatch]& table)
+    cdef _check_nullptr(self)

http://git-wip-us.apache.org/repos/asf/arrow/blob/a9747cea/python/pyarrow/table.pyx
----------------------------------------------------------------------
diff --git a/python/pyarrow/table.pyx b/python/pyarrow/table.pyx
index ade82aa..a1cadcd 100644
--- a/python/pyarrow/table.pyx
+++ b/python/pyarrow/table.pyx
@@ -19,6 +19,8 @@
 # distutils: language = c++
 # cython: embedsignature = True
 
+from cython.operator cimport dereference as deref
+
 from pyarrow.includes.libarrow cimport *
 cimport pyarrow.includes.pyarrow as pyarrow
 
@@ -45,8 +47,8 @@ cdef class ChunkedArray:
 
     cdef _check_nullptr(self):
         if self.chunked_array == NULL:
-            raise ReferenceError("ChunkedArray object references a NULL pointer."
-                    "Not initialized.")
+            raise ReferenceError("ChunkedArray object references a NULL "
+                                 "pointer. Not initialized.")
 
     def length(self):
         self._check_nullptr()
@@ -144,6 +146,130 @@ cdef class Column:
             return chunked_array
 
 
+cdef _schema_from_arrays(arrays, names, shared_ptr[CSchema]* schema):
+    cdef:
+        Array arr
+        c_string c_name
+        vector[shared_ptr[CField]] fields
+
+    cdef int K = len(arrays)
+
+    fields.resize(K)
+    for i in range(K):
+        arr = arrays[i]
+        c_name = tobytes(names[i])
+        fields[i].reset(new CField(c_name, arr.type.sp_type, True))
+
+    schema.reset(new CSchema(fields))
+
+
+
+cdef _dataframe_to_arrays(df, name, timestamps_to_ms):
+    from pyarrow.array import from_pandas_series
+
+    cdef:
+        list names = []
+        list arrays = []
+
+    for name in df.columns:
+        col = df[name]
+        arr = from_pandas_series(col, timestamps_to_ms=timestamps_to_ms)
+
+        names.append(name)
+        arrays.append(arr)
+
+    return names, arrays
+
+
+cdef class RecordBatch:
+
+    def __cinit__(self):
+        self.batch = NULL
+        self._schema = None
+
+    cdef init(self, const shared_ptr[CRecordBatch]& batch):
+        self.sp_batch = batch
+        self.batch = batch.get()
+
+    cdef _check_nullptr(self):
+        if self.batch == NULL:
+            raise ReferenceError("Object not initialized")
+
+    def __len__(self):
+        self._check_nullptr()
+        return self.batch.num_rows()
+
+    property num_columns:
+
+        def __get__(self):
+            self._check_nullptr()
+            return self.batch.num_columns()
+
+    property num_rows:
+
+        def __get__(self):
+            return len(self)
+
+    property schema:
+
+        def __get__(self):
+            cdef Schema schema
+            self._check_nullptr()
+            if self._schema is None:
+                schema = Schema()
+                schema.init_schema(self.batch.schema())
+                self._schema = schema
+
+            return self._schema
+
+    def __getitem__(self, i):
+        cdef Array arr = Array()
+        arr.init(self.batch.column(i))
+        return arr
+
+    def equals(self, RecordBatch other):
+        self._check_nullptr()
+        other._check_nullptr()
+
+        return self.batch.Equals(deref(other.batch))
+
+    @classmethod
+    def from_pandas(cls, df):
+        """
+        Convert pandas.DataFrame to an Arrow RecordBatch
+        """
+        names, arrays = _dataframe_to_arrays(df, None, False)
+        return cls.from_arrays(names, arrays)
+
+    @staticmethod
+    def from_arrays(names, arrays):
+        cdef:
+            Array arr
+            RecordBatch result
+            c_string c_name
+            shared_ptr[CSchema] schema
+            shared_ptr[CRecordBatch] batch
+            vector[shared_ptr[CArray]] c_arrays
+            int32_t num_rows
+
+        if len(arrays) == 0:
+            raise ValueError('Record batch cannot contain no arrays (for now)')
+
+        num_rows = len(arrays[0])
+        _schema_from_arrays(arrays, names, &schema)
+
+        for i in range(len(arrays)):
+            arr = arrays[i]
+            c_arrays.push_back(arr.sp_array)
+
+        batch.reset(new CRecordBatch(schema, num_rows, c_arrays))
+
+        result = RecordBatch()
+        result.init(batch)
+
+        return result
+
+
 cdef class Table:
     '''
     Do not call this class's constructor directly.
@@ -161,38 +287,50 @@ cdef class Table:
             raise ReferenceError("Table object references a NULL pointer."
                     "Not initialized.")
 
-    @staticmethod
-    def from_pandas(df, name=None):
-        return from_pandas_dataframe(df, name=name)
+    @classmethod
+    def from_pandas(cls, df, name=None, timestamps_to_ms=False):
+        """
+        Convert pandas.DataFrame to an Arrow Table
+
+        Parameters
+        ----------
+        df: pandas.DataFrame
+
+        name: str
+
+        timestamps_to_ms: bool
+            Convert datetime columns to ms resolution. This is needed for
+            compability with other functionality like Parquet I/O which
+            only supports milliseconds.
+        """
+        names, arrays = _dataframe_to_arrays(df, name=name,
+                                             timestamps_to_ms=timestamps_to_ms)
+        return cls.from_arrays(names, arrays, name=name)
 
     @staticmethod
     def from_arrays(names, arrays, name=None):
         cdef:
             Array arr
-            Table result
             c_string c_name
             vector[shared_ptr[CField]] fields
             vector[shared_ptr[CColumn]] columns
+            Table result
             shared_ptr[CSchema] schema
             shared_ptr[CTable] table
 
-        cdef int K = len(arrays)
+        _schema_from_arrays(arrays, names, &schema)
 
-        fields.resize(K)
+        cdef int K = len(arrays)
         columns.resize(K)
         for i in range(K):
             arr = arrays[i]
-            c_name = tobytes(names[i])
-
-            fields[i].reset(new CField(c_name, arr.type.sp_type, True))
-            columns[i].reset(new CColumn(fields[i], arr.sp_array))
+            columns[i].reset(new CColumn(schema.get().field(i), arr.sp_array))
 
         if name is None:
             c_name = ''
         else:
             c_name = tobytes(name)
 
-        schema.reset(new CSchema(fields))
         table.reset(new CTable(c_name, schema, columns))
 
         result = Table()
@@ -268,32 +406,4 @@ cdef class Table:
 
 
 
-def from_pandas_dataframe(object df, name=None, timestamps_to_ms=False):
-    """
-    Convert pandas.DataFrame to an Arrow Table
-
-    Parameters
-    ----------
-    df: pandas.DataFrame
-
-    name: str
-
-    timestamps_to_ms: bool
-        Convert datetime columns to ms resolution. This is needed for
-        compability with other functionality like Parquet I/O which
-        only supports milliseconds.
-    """
-    from pyarrow.array import from_pandas_series
-
-    cdef:
-        list names = []
-        list arrays = []
-
-    for name in df.columns:
-        col = df[name]
-        arr = from_pandas_series(col, timestamps_to_ms=timestamps_to_ms)
-
-        names.append(name)
-        arrays.append(arr)
-
-    return Table.from_arrays(names, arrays, name=name)
+from_pandas_dataframe = Table.from_pandas

http://git-wip-us.apache.org/repos/asf/arrow/blob/a9747cea/python/pyarrow/tests/test_array.py
----------------------------------------------------------------------
diff --git a/python/pyarrow/tests/test_array.py b/python/pyarrow/tests/test_array.py
index 86147f8..0a17f69 100644
--- a/python/pyarrow/tests/test_array.py
+++ b/python/pyarrow/tests/test_array.py
@@ -19,6 +19,10 @@ import pyarrow
 import pyarrow.formatting as fmt
 
 
+def test_total_bytes_allocated():
+    assert pyarrow.total_allocated_bytes() == 0
+
+
 def test_repr_on_pre_init_array():
     arr = pyarrow.array.Array()
     assert len(repr(arr)) > 0

http://git-wip-us.apache.org/repos/asf/arrow/blob/a9747cea/python/pyarrow/tests/test_io.py
----------------------------------------------------------------------
diff --git a/python/pyarrow/tests/test_io.py b/python/pyarrow/tests/test_io.py
index 9a41ebe..211a12b 100644
--- a/python/pyarrow/tests/test_io.py
+++ b/python/pyarrow/tests/test_io.py
@@ -98,3 +98,44 @@ def test_bytes_reader():
 def test_bytes_reader_non_bytes():
     with pytest.raises(ValueError):
         io.BytesReader(u('some sample data'))
+
+
+
+# ----------------------------------------------------------------------
+# Buffers
+
+
+def test_buffer_bytes():
+    val = b'some data'
+
+    buf = io.buffer_from_bytes(val)
+    assert isinstance(buf, io.Buffer)
+
+    result = buf.to_pybytes()
+
+    assert result == val
+
+
+def test_memory_output_stream():
+    # 10 bytes
+    val = b'dataabcdef'
+
+    f = io.InMemoryOutputStream()
+
+    K = 1000
+    for i in range(K):
+        f.write(val)
+
+    buf = f.get_result()
+
+    assert len(buf) == len(val) * K
+    assert buf.to_pybytes() == val * K
+
+
+def test_inmemory_write_after_closed():
+    f = io.InMemoryOutputStream()
+    f.write(b'ok')
+    f.get_result()
+
+    with pytest.raises(IOError):
+        f.write(b'not ok')

http://git-wip-us.apache.org/repos/asf/arrow/blob/a9747cea/python/pyarrow/tests/test_ipc.py
----------------------------------------------------------------------
diff --git a/python/pyarrow/tests/test_ipc.py b/python/pyarrow/tests/test_ipc.py
new file mode 100644
index 0000000..b9e9e6e
--- /dev/null
+++ b/python/pyarrow/tests/test_ipc.py
@@ -0,0 +1,116 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import io
+
+import numpy as np
+import pandas as pd
+
+import pyarrow as A
+import pyarrow.io as arrow_io
+import pyarrow.ipc as ipc
+
+
+class RoundtripTest(object):
+    # Also tests writing zero-copy NumPy array with additional padding
+
+    def __init__(self):
+        self.sink = self._get_sink()
+
+    def _get_sink(self):
+        return io.BytesIO()
+
+    def _get_source(self):
+        return self.sink.getvalue()
+
+    def run(self):
+        nrows = 5
+        df = pd.DataFrame({
+            'one': np.random.randn(nrows),
+            'two': ['foo', np.nan, 'bar', 'bazbaz', 'qux']})
+
+        batch = A.RecordBatch.from_pandas(df)
+        writer = ipc.ArrowFileWriter(self.sink, batch.schema)
+
+        num_batches = 5
+        frames = []
+        batches = []
+        for i in range(num_batches):
+            unique_df = df.copy()
+            unique_df['one'] = np.random.randn(nrows)
+
+            batch = A.RecordBatch.from_pandas(unique_df)
+            writer.write_record_batch(batch)
+            frames.append(unique_df)
+            batches.append(batch)
+
+        writer.close()
+
+        file_contents = self._get_source()
+        reader = ipc.ArrowFileReader(file_contents)
+
+        assert reader.num_record_batches == num_batches
+
+        for i in range(num_batches):
+            # it works. Must convert back to DataFrame
+            batch = reader.get_record_batch(i)
+            assert batches[i].equals(batch)
+
+
+class InMemoryStreamTest(RoundtripTest):
+
+    def _get_sink(self):
+        return arrow_io.InMemoryOutputStream()
+
+    def _get_source(self):
+        return self.sink.get_result()
+
+
+def test_ipc_file_simple_roundtrip():
+    helper = RoundtripTest()
+    helper.run()
+
+
+# XXX: For benchmarking
+
+def big_batch():
+    df = pd.DataFrame(
+        np.random.randn(2**4, 2**20).T,
+        columns=[str(i) for i in range(2**4)]
+    )
+
+    df = pd.concat([df] * 2 ** 3, ignore_index=True)
+
+    return A.RecordBatch.from_pandas(df)
+
+
+def write_to_memory(batch):
+    sink = io.BytesIO()
+    write_file(batch, sink)
+    return sink.getvalue()
+
+
+def write_file(batch, sink):
+    writer = ipc.ArrowFileWriter(sink, batch.schema)
+    writer.write_record_batch(batch)
+    writer.close()
+
+
+def read_file(source):
+    reader = ipc.ArrowFileReader(source)
+    return [reader.get_record_batch(i)
+            for i in range(reader.num_record_batches)]

http://git-wip-us.apache.org/repos/asf/arrow/blob/a9747cea/python/pyarrow/tests/test_table.py
----------------------------------------------------------------------
diff --git a/python/pyarrow/tests/test_table.py b/python/pyarrow/tests/test_table.py
index abf1431..c513032 100644
--- a/python/pyarrow/tests/test_table.py
+++ b/python/pyarrow/tests/test_table.py
@@ -15,60 +15,52 @@
 # specific language governing permissions and limitations
 # under the License.
 
-from pyarrow.compat import unittest
 import pyarrow as A
 
 
-class TestRowBatch(unittest.TestCase):
+def test_recordbatch_basics():
+    data = [
+        A.from_pylist(range(5)),
+        A.from_pylist([-10, -5, 0, 5, 10])
+    ]
 
-    def test_basics(self):
-        data = [
-            A.from_pylist(range(5)),
-            A.from_pylist([-10, -5, 0, 5, 10])
-        ]
-        num_rows = 5
+    batch = A.RecordBatch.from_arrays(['c0', 'c1'], data)
 
-        descr = A.schema([A.field('c0', data[0].type),
-                          A.field('c1', data[1].type)])
+    assert len(batch) == 5
+    assert batch.num_rows == 5
+    assert batch.num_columns == len(data)
 
-        batch = A.RowBatch(descr, num_rows, data)
 
-        assert len(batch) == num_rows
-        assert batch.num_rows == num_rows
-        assert batch.num_columns == len(data)
+def test_table_basics():
+    data = [
+        A.from_pylist(range(5)),
+        A.from_pylist([-10, -5, 0, 5, 10])
+    ]
+    table = A.Table.from_arrays(('a', 'b'), data, 'table_name')
+    assert table.name == 'table_name'
+    assert len(table) == 5
+    assert table.num_rows == 5
+    assert table.num_columns == 2
+    assert table.shape == (5, 2)
 
+    for col in table.itercolumns():
+        for chunk in col.data.iterchunks():
+            assert chunk is not None
 
-class TestTable(unittest.TestCase):
 
-    def test_basics(self):
-        data = [
-            A.from_pylist(range(5)),
-            A.from_pylist([-10, -5, 0, 5, 10])
-        ]
-        table = A.Table.from_arrays(('a', 'b'), data, 'table_name')
-        assert table.name == 'table_name'
-        assert len(table) == 5
-        assert table.num_rows == 5
-        assert table.num_columns == 2
-        assert table.shape == (5, 2)
+def test_table_pandas():
+    data = [
+        A.from_pylist(range(5)),
+        A.from_pylist([-10, -5, 0, 5, 10])
+    ]
+    table = A.Table.from_arrays(('a', 'b'), data, 'table_name')
 
-        for col in table.itercolumns():
-            for chunk in col.data.iterchunks():
-                assert chunk is not None
+    # TODO: Use this part once from_pandas is implemented
+    # data = {'a': range(5), 'b': [-10, -5, 0, 5, 10]}
+    # df = pd.DataFrame(data)
+    # A.Table.from_pandas(df)
 
-    def test_pandas(self):
-        data = [
-            A.from_pylist(range(5)),
-            A.from_pylist([-10, -5, 0, 5, 10])
-        ]
-        table = A.Table.from_arrays(('a', 'b'), data, 'table_name')
-
-        # TODO: Use this part once from_pandas is implemented
-        # data = {'a': range(5), 'b': [-10, -5, 0, 5, 10]}
-        # df = pd.DataFrame(data)
-        # A.Table.from_pandas(df)
-
-        df = table.to_pandas()
-        assert set(df.columns) == set(('a', 'b'))
-        assert df.shape == (5, 2)
-        assert df.ix[0, 'b'] == -10
+    df = table.to_pandas()
+    assert set(df.columns) == set(('a', 'b'))
+    assert df.shape == (5, 2)
+    assert df.loc[0, 'b'] == -10

http://git-wip-us.apache.org/repos/asf/arrow/blob/a9747cea/python/setup.py
----------------------------------------------------------------------
diff --git a/python/setup.py b/python/setup.py
index d1be122..d040ea7 100644
--- a/python/setup.py
+++ b/python/setup.py
@@ -102,6 +102,7 @@ class build_ext(_build_ext):
         'config',
         'error',
         'io',
+        'ipc',
         'parquet',
         'scalar',
         'schema',

http://git-wip-us.apache.org/repos/asf/arrow/blob/a9747cea/python/src/pyarrow/adapters/builtin.cc
----------------------------------------------------------------------
diff --git a/python/src/pyarrow/adapters/builtin.cc b/python/src/pyarrow/adapters/builtin.cc
index 78ef1b3..680f3a5 100644
--- a/python/src/pyarrow/adapters/builtin.cc
+++ b/python/src/pyarrow/adapters/builtin.cc
@@ -426,7 +426,7 @@ Status ConvertPySequence(PyObject* obj, std::shared_ptr<arrow::Array>* out) {
 
   // Give the sequence converter an array builder
   std::shared_ptr<ArrayBuilder> builder;
-  RETURN_ARROW_NOT_OK(arrow::MakeBuilder(GetMemoryPool(), type, &builder));
+  RETURN_ARROW_NOT_OK(arrow::MakeBuilder(get_memory_pool(), type, &builder));
   converter->Init(builder);
 
   PY_RETURN_NOT_OK(converter->AppendData(obj));

http://git-wip-us.apache.org/repos/asf/arrow/blob/a9747cea/python/src/pyarrow/adapters/pandas.cc
----------------------------------------------------------------------
diff --git a/python/src/pyarrow/adapters/pandas.cc b/python/src/pyarrow/adapters/pandas.cc
index d224074..ae24b7e 100644
--- a/python/src/pyarrow/adapters/pandas.cc
+++ b/python/src/pyarrow/adapters/pandas.cc
@@ -602,6 +602,8 @@ class ArrowDeserializer {
   }
 
   Status AllocateOutput(int type) {
+    PyAcquireGIL lock;
+
     npy_intp dims[1] = {col_->length()};
     out_ = reinterpret_cast<PyArrayObject*>(PyArray_SimpleNew(1, dims, type));
 
@@ -616,6 +618,8 @@ class ArrowDeserializer {
   }
 
   Status OutputFromData(int type, void* data) {
+    PyAcquireGIL lock;
+
     // Zero-Copy. We can pass the data pointer directly to NumPy.
     Py_INCREF(py_ref_);
     OwnedRef py_ref(py_ref_);
@@ -706,6 +710,8 @@ class ArrowDeserializer {
   inline typename std::enable_if<
     arrow_traits<T2>::is_boolean, Status>::type
   ConvertValues(const std::shared_ptr<Array>& arr) {
+    PyAcquireGIL lock;
+
     arrow::BooleanArray* bool_arr = static_cast<arrow::BooleanArray*>(arr.get());
 
     if (arr->null_count() > 0) {
@@ -743,6 +749,8 @@ class ArrowDeserializer {
   inline typename std::enable_if<
     T2 == arrow::Type::STRING, Status>::type
   ConvertValues(const std::shared_ptr<Array>& arr) {
+    PyAcquireGIL lock;
+
     RETURN_NOT_OK(AllocateOutput(NPY_OBJECT));
 
     PyObject** out_values = reinterpret_cast<PyObject**>(PyArray_DATA(out_));

http://git-wip-us.apache.org/repos/asf/arrow/blob/a9747cea/python/src/pyarrow/common.cc
----------------------------------------------------------------------
diff --git a/python/src/pyarrow/common.cc b/python/src/pyarrow/common.cc
index 82b14fd..09f3efb 100644
--- a/python/src/pyarrow/common.cc
+++ b/python/src/pyarrow/common.cc
@@ -63,7 +63,7 @@ class PyArrowMemoryPool : public arrow::MemoryPool {
   int64_t bytes_allocated_;
 };
 
-arrow::MemoryPool* GetMemoryPool() {
+arrow::MemoryPool* get_memory_pool() {
   static PyArrowMemoryPool memory_pool;
   return &memory_pool;
 }

http://git-wip-us.apache.org/repos/asf/arrow/blob/a9747cea/python/src/pyarrow/common.h
----------------------------------------------------------------------
diff --git a/python/src/pyarrow/common.h b/python/src/pyarrow/common.h
index bc599f8..96eed16 100644
--- a/python/src/pyarrow/common.h
+++ b/python/src/pyarrow/common.h
@@ -109,7 +109,8 @@ class PyGILGuard {
     return Status::UnknownError(message);           \
   }
 
-PYARROW_EXPORT arrow::MemoryPool* GetMemoryPool();
+// Return the common PyArrow memory pool
+PYARROW_EXPORT arrow::MemoryPool* get_memory_pool();
 
 class PYARROW_EXPORT NumPyBuffer : public arrow::Buffer {
  public:
@@ -120,6 +121,7 @@ class PYARROW_EXPORT NumPyBuffer : public arrow::Buffer {
 
     data_ = reinterpret_cast<const uint8_t*>(PyArray_DATA(arr_));
     size_ = PyArray_SIZE(arr_);
+    capacity_ = size_ * PyArray_DESCR(arr_)->elsize;
   }
 
   virtual ~NumPyBuffer() {
@@ -139,6 +141,22 @@ class PYARROW_EXPORT PyBytesBuffer : public arrow::Buffer {
   PyObject* obj_;
 };
 
+
+class PyAcquireGIL {
+ public:
+  PyAcquireGIL() {
+    state_ = PyGILState_Ensure();
+  }
+
+  ~PyAcquireGIL() {
+    PyGILState_Release(state_);
+  }
+
+ private:
+  PyGILState_STATE state_;
+  DISALLOW_COPY_AND_ASSIGN(PyAcquireGIL);
+};
+
 } // namespace pyarrow
 
 #endif // PYARROW_COMMON_H

http://git-wip-us.apache.org/repos/asf/arrow/blob/a9747cea/python/src/pyarrow/io.cc
----------------------------------------------------------------------
diff --git a/python/src/pyarrow/io.cc b/python/src/pyarrow/io.cc
index 35054e9..9879b34 100644
--- a/python/src/pyarrow/io.cc
+++ b/python/src/pyarrow/io.cc
@@ -47,9 +47,9 @@ static arrow::Status CheckPyError() {
     PyErr_Fetch(&exc_type, &exc_value, &traceback);
     PyObjectStringify stringified(exc_value);
     std::string message(stringified.bytes);
-    Py_DECREF(exc_type);
-    Py_DECREF(exc_value);
-    Py_DECREF(traceback);
+    Py_XDECREF(exc_type);
+    Py_XDECREF(exc_value);
+    Py_XDECREF(traceback);
     PyErr_Clear();
     return arrow::Status::IOError(message);
   }