You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2017/07/15 20:51:58 UTC

[2/2] arrow git commit: ARROW-1214: [Python/C++] Add C++ functionality to more easily handle encapsulated IPC messages, Python bindings

ARROW-1214: [Python/C++] Add C++ functionality to more easily handle encapsulated IPC messages, Python bindings

This patch does a bunch of things:

* Decouples the RecordBatchStreamReader from the actual message iteration (which is handled by a new `arrow::ipc::MessageReader` interface
* Enables `arrow::ipc::Message` to hold all of the memory for a complete unit of data: metadata plus body
* Renames some IPC methods for better consistency (GetNextRecordBatch -> ReadNextRecordBatch)
* Adds function to serialize a complete encapsulated message to an `arrow::io::OutputStream*
* Add Python bindings for all of the above, introduce `pyarrow.Message`, `pyarrow.MessageReader`. Add `read_message` and `Message.serialize` functions for efficient memory round trips
* Add `pyarrow.read_record_batch` for reading a single record batch given a message and a known schema

Later we will want to add `pyarrow.read_schema`, but it seemed like a bit of work to make it work for dictionaries.

This implements the C++ analogue to ARROW-1047, which was for Java. Not sure why I didn't create a JIRA about this. cc @icexelloss

Author: Wes McKinney <we...@twosigma.com>

Closes #839 from wesm/ARROW-1214 and squashes the following commits:

07f1820a [Wes McKinney] Refactor to introduce MessageReader abstract type, use unique_ptr for messages instead of shared_ptr. First cut at Message, MessageReader Python API. Add read_message, C++/Python machinery for message roundtrips to Buffer, comparison. Add function to read RecordBatch from encapsulated message given schema.


Project: http://git-wip-us.apache.org/repos/asf/arrow/repo
Commit: http://git-wip-us.apache.org/repos/asf/arrow/commit/bb0a7588
Tree: http://git-wip-us.apache.org/repos/asf/arrow/tree/bb0a7588
Diff: http://git-wip-us.apache.org/repos/asf/arrow/diff/bb0a7588

Branch: refs/heads/master
Commit: bb0a75885f2655ac54be47bd238811b74782532e
Parents: 099f61c
Author: Wes McKinney <we...@twosigma.com>
Authored: Sat Jul 15 16:51:51 2017 -0400
Committer: Wes McKinney <we...@twosigma.com>
Committed: Sat Jul 15 16:51:51 2017 -0400

----------------------------------------------------------------------
 c_glib/arrow-glib/reader.cpp                 |   4 +-
 cpp/src/arrow/buffer.cc                      |   2 -
 cpp/src/arrow/buffer.h                       |  15 +-
 cpp/src/arrow/builder.cc                     |  12 +-
 cpp/src/arrow/ipc/file-to-stream.cc          |   2 +-
 cpp/src/arrow/ipc/ipc-json-test.cc           |   6 +-
 cpp/src/arrow/ipc/ipc-read-write-test.cc     |  70 ++--
 cpp/src/arrow/ipc/json-integration-test.cc   |   8 +-
 cpp/src/arrow/ipc/json.cc                    |   6 +-
 cpp/src/arrow/ipc/json.h                     |   2 +-
 cpp/src/arrow/ipc/metadata.cc                | 150 +++++--
 cpp/src/arrow/ipc/metadata.h                 | 100 +++--
 cpp/src/arrow/ipc/reader.cc                  | 206 ++++------
 cpp/src/arrow/ipc/reader.h                   |  35 +-
 cpp/src/arrow/ipc/stream-to-file.cc          |   2 +-
 cpp/src/arrow/ipc/writer.cc                  |   4 +-
 cpp/src/arrow/python/builtin_convert.cc      |   5 +-
 python/doc/source/api.rst                    |  12 +-
 python/pyarrow/__init__.py                   |   9 +-
 python/pyarrow/feather.pxi                   | 109 +++++
 python/pyarrow/includes/libarrow.pxd         |  66 +--
 python/pyarrow/io.pxi                        | 353 ----------------
 python/pyarrow/ipc.pxi                       | 480 ++++++++++++++++++++++
 python/pyarrow/ipc.py                        |  13 +-
 python/pyarrow/lib.pyx                       |   8 +-
 python/pyarrow/pandas_compat.py              |   2 +-
 python/pyarrow/table.pxi                     |  16 +-
 python/pyarrow/tests/conftest.py             |   2 +-
 python/pyarrow/tests/test_array.py           |   2 +-
 python/pyarrow/tests/test_convert_builtin.py |   3 +
 python/pyarrow/tests/test_feather.py         |   3 +-
 python/pyarrow/tests/test_ipc.py             |  59 ++-
 python/pyarrow/tests/test_parquet.py         |   7 +-
 python/pyarrow/tests/test_table.py           |   4 +
 python/pyarrow/tests/test_tensor.py          |   1 +
 35 files changed, 1135 insertions(+), 643 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/arrow/blob/bb0a7588/c_glib/arrow-glib/reader.cpp
----------------------------------------------------------------------
diff --git a/c_glib/arrow-glib/reader.cpp b/c_glib/arrow-glib/reader.cpp
index 3ff6ba1..523bdee 100644
--- a/c_glib/arrow-glib/reader.cpp
+++ b/c_glib/arrow-glib/reader.cpp
@@ -173,7 +173,7 @@ garrow_record_batch_reader_get_next_record_batch(GArrowRecordBatchReader *reader
 {
   auto arrow_reader = garrow_record_batch_reader_get_raw(reader);
   std::shared_ptr<arrow::RecordBatch> arrow_record_batch;
-  auto status = arrow_reader->GetNextRecordBatch(&arrow_record_batch);
+  auto status = arrow_reader->ReadNextRecordBatch(&arrow_record_batch);
 
   if (garrow_error_check(error,
                          status,
@@ -410,7 +410,7 @@ garrow_record_batch_file_reader_get_record_batch(GArrowRecordBatchFileReader *re
 {
   auto arrow_reader = garrow_record_batch_file_reader_get_raw(reader);
   std::shared_ptr<arrow::RecordBatch> arrow_record_batch;
-  auto status = arrow_reader->GetRecordBatch(i, &arrow_record_batch);
+  auto status = arrow_reader->ReadRecordBatch(i, &arrow_record_batch);
 
   if (garrow_error_check(error,
                          status,

http://git-wip-us.apache.org/repos/asf/arrow/blob/bb0a7588/cpp/src/arrow/buffer.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/buffer.cc b/cpp/src/arrow/buffer.cc
index fb63798..a1d119e 100644
--- a/cpp/src/arrow/buffer.cc
+++ b/cpp/src/arrow/buffer.cc
@@ -27,8 +27,6 @@
 
 namespace arrow {
 
-Buffer::~Buffer() {}
-
 Status Buffer::Copy(
     int64_t start, int64_t nbytes, MemoryPool* pool, std::shared_ptr<Buffer>* out) const {
   // Sanity checks

http://git-wip-us.apache.org/repos/asf/arrow/blob/bb0a7588/cpp/src/arrow/buffer.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/buffer.h b/cpp/src/arrow/buffer.h
index bfbea77..b117b24 100644
--- a/cpp/src/arrow/buffer.h
+++ b/cpp/src/arrow/buffer.h
@@ -22,6 +22,7 @@
 #include <cstdint>
 #include <cstring>
 #include <memory>
+#include <string>
 
 #include "arrow/status.h"
 #include "arrow/util/macros.h"
@@ -47,7 +48,8 @@ class ARROW_EXPORT Buffer {
  public:
   Buffer(const uint8_t* data, int64_t size)
       : is_mutable_(false), data_(data), size_(size), capacity_(size) {}
-  virtual ~Buffer();
+
+  virtual ~Buffer() = default;
 
   /// An offset into data that is owned by another buffer, but we want to be
   /// able to retain a valid pointer to it even after other shared_ptr's to the
@@ -97,6 +99,17 @@ class ARROW_EXPORT Buffer {
   DISALLOW_COPY_AND_ASSIGN(Buffer);
 };
 
+/// \brief Create Buffer referencing std::string memory
+///
+/// Warning: string instance must stay alive
+///
+/// \param str std::string instance
+/// \return std::shared_ptr<Buffer>
+static inline std::shared_ptr<Buffer> GetBufferFromString(const std::string& str) {
+  return std::make_shared<Buffer>(
+      reinterpret_cast<const uint8_t*>(str.c_str()), static_cast<int64_t>(str.size()));
+}
+
 /// Construct a view on passed buffer at the indicated offset and length. This
 /// function cannot fail and does not error checking (except in debug builds)
 static inline std::shared_ptr<Buffer> SliceBuffer(

http://git-wip-us.apache.org/repos/asf/arrow/blob/bb0a7588/cpp/src/arrow/builder.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/builder.cc b/cpp/src/arrow/builder.cc
index 155d81a..e466838 100644
--- a/cpp/src/arrow/builder.cc
+++ b/cpp/src/arrow/builder.cc
@@ -342,8 +342,8 @@ Status AdaptiveIntBuilder::Append(
         sizeof(int64_t) * length);
   } else {
 #ifdef _MSC_VER
-#  pragma warning(push)
-#  pragma warning(disable:4996)
+#pragma warning(push)
+#pragma warning(disable : 4996)
 #endif
     // int_size_ may have changed, so we need to recheck
     switch (int_size_) {
@@ -366,7 +366,7 @@ Status AdaptiveIntBuilder::Append(
         DCHECK(false);
     }
 #ifdef _MSC_VER
-#  pragma warning(pop)
+#pragma warning(pop)
 #endif
   }
 
@@ -497,8 +497,8 @@ Status AdaptiveUIntBuilder::Append(
         sizeof(uint64_t) * length);
   } else {
 #ifdef _MSC_VER
-#  pragma warning(push)
-#  pragma warning(disable:4996)
+#pragma warning(push)
+#pragma warning(disable : 4996)
 #endif
     // int_size_ may have changed, so we need to recheck
     switch (int_size_) {
@@ -521,7 +521,7 @@ Status AdaptiveUIntBuilder::Append(
         DCHECK(false);
     }
 #ifdef _MSC_VER
-#  pragma warning(pop)
+#pragma warning(pop)
 #endif
   }
 

http://git-wip-us.apache.org/repos/asf/arrow/blob/bb0a7588/cpp/src/arrow/ipc/file-to-stream.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/file-to-stream.cc b/cpp/src/arrow/ipc/file-to-stream.cc
index 39c720c..a1feedc 100644
--- a/cpp/src/arrow/ipc/file-to-stream.cc
+++ b/cpp/src/arrow/ipc/file-to-stream.cc
@@ -39,7 +39,7 @@ Status ConvertToStream(const char* path) {
   RETURN_NOT_OK(RecordBatchStreamWriter::Open(&sink, reader->schema(), &writer));
   for (int i = 0; i < reader->num_record_batches(); ++i) {
     std::shared_ptr<RecordBatch> chunk;
-    RETURN_NOT_OK(reader->GetRecordBatch(i, &chunk));
+    RETURN_NOT_OK(reader->ReadRecordBatch(i, &chunk));
     RETURN_NOT_OK(writer->WriteRecordBatch(*chunk));
   }
   return writer->Close();

http://git-wip-us.apache.org/repos/asf/arrow/blob/bb0a7588/cpp/src/arrow/ipc/ipc-json-test.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/ipc-json-test.cc b/cpp/src/arrow/ipc/ipc-json-test.cc
index 9297146..318e318 100644
--- a/cpp/src/arrow/ipc/ipc-json-test.cc
+++ b/cpp/src/arrow/ipc/ipc-json-test.cc
@@ -276,7 +276,7 @@ TEST(TestJsonFileReadWrite, BasicRoundTrip) {
 
   for (int i = 0; i < nbatches; ++i) {
     std::shared_ptr<RecordBatch> batch;
-    ASSERT_OK(reader->GetRecordBatch(i, &batch));
+    ASSERT_OK(reader->ReadRecordBatch(i, &batch));
     ASSERT_TRUE(batch->Equals(*batches[i]));
   }
 }
@@ -344,7 +344,7 @@ TEST(TestJsonFileReadWrite, MinimalFormatExample) {
   ASSERT_EQ(1, reader->num_record_batches());
 
   std::shared_ptr<RecordBatch> batch;
-  ASSERT_OK(reader->GetRecordBatch(0, &batch));
+  ASSERT_OK(reader->ReadRecordBatch(0, &batch));
 
   std::vector<bool> foo_valid = {true, false, true, true, true};
   std::vector<int32_t> foo_values = {1, 2, 3, 4, 5};
@@ -388,7 +388,7 @@ void CheckRoundtrip(const RecordBatch& batch) {
   ASSERT_OK(JsonReader::Open(buffer, &reader));
 
   std::shared_ptr<RecordBatch> result_batch;
-  ASSERT_OK(reader->GetRecordBatch(0, &result_batch));
+  ASSERT_OK(reader->ReadRecordBatch(0, &result_batch));
 
   CompareBatch(batch, *result_batch);
 }

http://git-wip-us.apache.org/repos/asf/arrow/blob/bb0a7588/cpp/src/arrow/ipc/ipc-read-write-test.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/ipc-read-write-test.cc b/cpp/src/arrow/ipc/ipc-read-write-test.cc
index c71d046..42f14b0 100644
--- a/cpp/src/arrow/ipc/ipc-read-write-test.cc
+++ b/cpp/src/arrow/ipc/ipc-read-write-test.cc
@@ -51,8 +51,8 @@ class TestSchemaMetadata : public ::testing::Test {
     std::shared_ptr<Buffer> buffer;
     ASSERT_OK(WriteSchemaMessage(schema, memo, &buffer));
 
-    std::shared_ptr<Message> message;
-    ASSERT_OK(Message::Open(buffer, 0, &message));
+    std::unique_ptr<Message> message;
+    ASSERT_OK(Message::Open(buffer, nullptr, &message));
 
     ASSERT_EQ(Message::SCHEMA, message->type());
 
@@ -65,6 +65,32 @@ class TestSchemaMetadata : public ::testing::Test {
   }
 };
 
+TEST(TestMessage, Equals) {
+  std::string metadata = "foo";
+  std::string body = "bar";
+
+  auto b1 = GetBufferFromString(metadata);
+  auto b2 = GetBufferFromString(metadata);
+  auto b3 = GetBufferFromString(body);
+  auto b4 = GetBufferFromString(body);
+
+  Message msg1(b1, b3);
+  Message msg2(b2, b4);
+  Message msg3(b1, nullptr);
+  Message msg4(b2, nullptr);
+
+  ASSERT_TRUE(msg1.Equals(msg2));
+  ASSERT_TRUE(msg3.Equals(msg4));
+
+  ASSERT_FALSE(msg1.Equals(msg3));
+  ASSERT_FALSE(msg3.Equals(msg1));
+
+  // same metadata as msg1, different body
+  Message msg5(b2, b1);
+  ASSERT_FALSE(msg1.Equals(msg5));
+  ASSERT_FALSE(msg5.Equals(msg1));
+}
+
 const std::shared_ptr<DataType> INT32 = std::make_shared<Int32Type>();
 
 TEST_F(TestSchemaMetadata, PrimitiveFields) {
@@ -123,16 +149,12 @@ class IpcTestFixture : public io::MemoryMapFixture {
     RETURN_NOT_OK(WriteRecordBatch(
         batch, buffer_offset, mmap_.get(), &metadata_length, &body_length, pool_));
 
-    std::shared_ptr<Message> message;
+    std::unique_ptr<Message> message;
     RETURN_NOT_OK(ReadMessage(0, metadata_length, mmap_.get(), &message));
 
-    // The buffer offsets start at 0, so we must construct a
-    // RandomAccessFile according to that frame of reference
-    std::shared_ptr<Buffer> buffer_payload;
-    RETURN_NOT_OK(mmap_->ReadAt(metadata_length, body_length, &buffer_payload));
-    io::BufferReader buffer_reader(buffer_payload);
-
-    return ReadRecordBatch(*message, batch.schema(), &buffer_reader, batch_result);
+    io::BufferReader buffer_reader(message->body());
+    return ReadRecordBatch(
+        *message->metadata(), batch.schema(), &buffer_reader, batch_result);
   }
 
   Status DoLargeRoundTrip(
@@ -151,7 +173,7 @@ class IpcTestFixture : public io::MemoryMapFixture {
     std::shared_ptr<RecordBatchFileReader> file_reader;
     RETURN_NOT_OK(RecordBatchFileReader::Open(mmap_, offset, &file_reader));
 
-    return file_reader->GetRecordBatch(0, result);
+    return file_reader->ReadRecordBatch(0, result);
   }
 
   void CheckReadResult(const RecordBatch& result, const RecordBatch& expected) {
@@ -225,7 +247,7 @@ TEST_F(TestIpcRoundTrip, MetadataVersion) {
   ASSERT_OK(WriteRecordBatch(
       *batch, buffer_offset, mmap_.get(), &metadata_length, &body_length, pool_));
 
-  std::shared_ptr<Message> message;
+  std::unique_ptr<Message> message;
   ASSERT_OK(ReadMessage(0, metadata_length, mmap_.get(), &message));
 
   ASSERT_EQ(MetadataVersion::V3, message->metadata_version());
@@ -434,16 +456,13 @@ TEST_F(RecursionLimits, ReadLimit) {
   ASSERT_OK(WriteToMmap(
       recursion_depth, true, &metadata_length, &body_length, &batch, &schema));
 
-  std::shared_ptr<Message> message;
+  std::unique_ptr<Message> message;
   ASSERT_OK(ReadMessage(0, metadata_length, mmap_.get(), &message));
 
-  std::shared_ptr<Buffer> payload;
-  ASSERT_OK(mmap_->ReadAt(metadata_length, body_length, &payload));
-
-  io::BufferReader reader(payload);
+  io::BufferReader reader(message->body());
 
   std::shared_ptr<RecordBatch> result;
-  ASSERT_RAISES(Invalid, ReadRecordBatch(*message, schema, &reader, &result));
+  ASSERT_RAISES(Invalid, ReadRecordBatch(*message->metadata(), schema, &reader, &result));
 }
 
 TEST_F(RecursionLimits, StressLimit) {
@@ -455,16 +474,13 @@ TEST_F(RecursionLimits, StressLimit) {
     ASSERT_OK(WriteToMmap(
         recursion_depth, true, &metadata_length, &body_length, &batch, &schema));
 
-    std::shared_ptr<Message> message;
+    std::unique_ptr<Message> message;
     ASSERT_OK(ReadMessage(0, metadata_length, mmap_.get(), &message));
 
-    std::shared_ptr<Buffer> payload;
-    ASSERT_OK(mmap_->ReadAt(metadata_length, body_length, &payload));
-
-    io::BufferReader reader(payload);
-
+    io::BufferReader reader(message->body());
     std::shared_ptr<RecordBatch> result;
-    ASSERT_OK(ReadRecordBatch(*message, schema, recursion_depth + 1, &reader, &result));
+    ASSERT_OK(ReadRecordBatch(
+        *message->metadata(), schema, recursion_depth + 1, &reader, &result));
     *it_works = result->Equals(*batch);
   };
 
@@ -511,7 +527,7 @@ class TestFileFormat : public ::testing::TestWithParam<MakeRecordBatch*> {
     EXPECT_EQ(num_batches, reader->num_record_batches());
     for (int i = 0; i < num_batches; ++i) {
       std::shared_ptr<RecordBatch> chunk;
-      RETURN_NOT_OK(reader->GetRecordBatch(i, &chunk));
+      RETURN_NOT_OK(reader->ReadRecordBatch(i, &chunk));
       out_batches->emplace_back(chunk);
     }
 
@@ -571,7 +587,7 @@ class TestStreamFormat : public ::testing::TestWithParam<MakeRecordBatch*> {
 
     std::shared_ptr<RecordBatch> chunk;
     while (true) {
-      RETURN_NOT_OK(reader->GetNextRecordBatch(&chunk));
+      RETURN_NOT_OK(reader->ReadNextRecordBatch(&chunk));
       if (chunk == nullptr) { break; }
       out_batches->emplace_back(chunk);
     }

http://git-wip-us.apache.org/repos/asf/arrow/blob/bb0a7588/cpp/src/arrow/ipc/json-integration-test.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/json-integration-test.cc b/cpp/src/arrow/ipc/json-integration-test.cc
index 424755a..18f5dfa 100644
--- a/cpp/src/arrow/ipc/json-integration-test.cc
+++ b/cpp/src/arrow/ipc/json-integration-test.cc
@@ -82,7 +82,7 @@ static Status ConvertJsonToArrow(
 
   for (int i = 0; i < reader->num_record_batches(); ++i) {
     std::shared_ptr<RecordBatch> batch;
-    RETURN_NOT_OK(reader->GetRecordBatch(i, &batch));
+    RETURN_NOT_OK(reader->ReadRecordBatch(i, &batch));
     RETURN_NOT_OK(writer->WriteRecordBatch(*batch));
   }
   return writer->Close();
@@ -109,7 +109,7 @@ static Status ConvertArrowToJson(
 
   for (int i = 0; i < reader->num_record_batches(); ++i) {
     std::shared_ptr<RecordBatch> batch;
-    RETURN_NOT_OK(reader->GetRecordBatch(i, &batch));
+    RETURN_NOT_OK(reader->ReadRecordBatch(i, &batch));
     RETURN_NOT_OK(writer->WriteRecordBatch(*batch));
   }
 
@@ -168,8 +168,8 @@ static Status ValidateArrowVsJson(
   std::shared_ptr<RecordBatch> arrow_batch;
   std::shared_ptr<RecordBatch> json_batch;
   for (int i = 0; i < json_nbatches; ++i) {
-    RETURN_NOT_OK(json_reader->GetRecordBatch(i, &json_batch));
-    RETURN_NOT_OK(arrow_reader->GetRecordBatch(i, &arrow_batch));
+    RETURN_NOT_OK(json_reader->ReadRecordBatch(i, &json_batch));
+    RETURN_NOT_OK(arrow_reader->ReadRecordBatch(i, &arrow_batch));
 
     if (!json_batch->ApproxEquals(*arrow_batch)) {
       std::stringstream ss;

http://git-wip-us.apache.org/repos/asf/arrow/blob/bb0a7588/cpp/src/arrow/ipc/json.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/json.cc b/cpp/src/arrow/ipc/json.cc
index f8c0b62..36e343e 100644
--- a/cpp/src/arrow/ipc/json.cc
+++ b/cpp/src/arrow/ipc/json.cc
@@ -115,7 +115,7 @@ class JsonReader::JsonReaderImpl {
     return Status::OK();
   }
 
-  Status GetRecordBatch(int i, std::shared_ptr<RecordBatch>* batch) const {
+  Status ReadRecordBatch(int i, std::shared_ptr<RecordBatch>* batch) const {
     DCHECK_GE(i, 0) << "i out of bounds";
     DCHECK_LT(i, static_cast<int>(record_batches_->GetArray().Size()))
         << "i out of bounds";
@@ -164,8 +164,8 @@ int JsonReader::num_record_batches() const {
   return impl_->num_record_batches();
 }
 
-Status JsonReader::GetRecordBatch(int i, std::shared_ptr<RecordBatch>* batch) const {
-  return impl_->GetRecordBatch(i, batch);
+Status JsonReader::ReadRecordBatch(int i, std::shared_ptr<RecordBatch>* batch) const {
+  return impl_->ReadRecordBatch(i, batch);
 }
 
 }  // namespace ipc

http://git-wip-us.apache.org/repos/asf/arrow/blob/bb0a7588/cpp/src/arrow/ipc/json.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/json.h b/cpp/src/arrow/ipc/json.h
index ad94def..2ba27c7 100644
--- a/cpp/src/arrow/ipc/json.h
+++ b/cpp/src/arrow/ipc/json.h
@@ -72,7 +72,7 @@ class ARROW_EXPORT JsonReader {
   int num_record_batches() const;
 
   // Read a record batch from the file
-  Status GetRecordBatch(int i, std::shared_ptr<RecordBatch>* batch) const;
+  Status ReadRecordBatch(int i, std::shared_ptr<RecordBatch>* batch) const;
 
  private:
   JsonReader(MemoryPool* pool, const std::shared_ptr<Buffer>& data);

http://git-wip-us.apache.org/repos/asf/arrow/blob/bb0a7588/cpp/src/arrow/ipc/metadata.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/metadata.cc b/cpp/src/arrow/ipc/metadata.cc
index 54f0547..5b2ca3b 100644
--- a/cpp/src/arrow/ipc/metadata.cc
+++ b/cpp/src/arrow/ipc/metadata.cc
@@ -17,6 +17,7 @@
 
 #include "arrow/ipc/metadata.h"
 
+#include <algorithm>
 #include <cstdint>
 #include <memory>
 #include <sstream>
@@ -834,11 +835,12 @@ Status DictionaryMemo::AddDictionary(
 
 class Message::MessageImpl {
  public:
-  explicit MessageImpl(const std::shared_ptr<Buffer>& buffer, int64_t offset)
-      : buffer_(buffer), offset_(offset), message_(nullptr) {}
+  explicit MessageImpl(
+      const std::shared_ptr<Buffer>& metadata, const std::shared_ptr<Buffer>& body)
+      : metadata_(metadata), message_(nullptr), body_(body) {}
 
   Status Open() {
-    message_ = flatbuf::GetMessage(buffer_->data() + offset_);
+    message_ = flatbuf::GetMessage(metadata_->data());
 
     // Check that the metadata version is supported
     if (message_->version() < kMinMetadataVersion) {
@@ -872,7 +874,7 @@ class Message::MessageImpl {
         // Arrow 0.2
         return MetadataVersion::V2;
       case flatbuf::MetadataVersion_V3:
-        // Arrow 0.3
+        // Arrow >= 0.3
         return MetadataVersion::V3;
       // Add cases as other versions become available
       default:
@@ -882,28 +884,38 @@ class Message::MessageImpl {
 
   const void* header() const { return message_->header(); }
 
-  int64_t body_length() const { return message_->bodyLength(); }
+  std::shared_ptr<Buffer> body() const { return body_; }
 
- private:
-  // Retain reference to memory
-  std::shared_ptr<Buffer> buffer_;
-  int64_t offset_;
+  std::shared_ptr<Buffer> metadata() const { return metadata_; }
 
+ private:
+  // The Flatbuffer metadata
+  std::shared_ptr<Buffer> metadata_;
   const flatbuf::Message* message_;
+
+  // The message body, if any
+  std::shared_ptr<Buffer> body_;
 };
 
-Message::Message(const std::shared_ptr<Buffer>& buffer, int64_t offset) {
-  impl_.reset(new MessageImpl(buffer, offset));
+Message::Message(
+    const std::shared_ptr<Buffer>& metadata, const std::shared_ptr<Buffer>& body) {
+  impl_.reset(new MessageImpl(metadata, body));
+}
+
+Status Message::Open(const std::shared_ptr<Buffer>& metadata,
+    const std::shared_ptr<Buffer>& body, std::unique_ptr<Message>* out) {
+  out->reset(new Message(metadata, body));
+  return (*out)->impl_->Open();
 }
 
 Message::~Message() {}
 
-Status Message::Open(const std::shared_ptr<Buffer>& buffer, int64_t offset,
-    std::shared_ptr<Message>* out) {
-  // ctor is private
+std::shared_ptr<Buffer> Message::body() const {
+  return impl_->body();
+}
 
-  *out = std::shared_ptr<Message>(new Message(buffer, offset));
-  return (*out)->impl_->Open();
+std::shared_ptr<Buffer> Message::metadata() const {
+  return impl_->metadata();
 }
 
 Message::Type Message::type() const {
@@ -914,14 +926,64 @@ MetadataVersion Message::metadata_version() const {
   return impl_->version();
 }
 
-int64_t Message::body_length() const {
-  return impl_->body_length();
-}
-
 const void* Message::header() const {
   return impl_->header();
 }
 
+bool Message::Equals(const Message& other) const {
+  int64_t metadata_bytes = std::min(metadata()->size(), other.metadata()->size());
+
+  if (!metadata()->Equals(*other.metadata(), metadata_bytes)) {
+    return false;
+  }
+
+  // Compare bodies, if they have them
+  auto this_body = body();
+  auto other_body = other.body();
+
+  const bool this_has_body = (this_body != nullptr) && (this_body->size() > 0);
+  const bool other_has_body = (other_body != nullptr) && (other_body->size() > 0);
+
+  if (this_has_body && other_has_body) {
+    return this_body->Equals(*other_body);
+  } else if (this_has_body ^ other_has_body) {
+    // One has a body but not the other
+    return false;
+  } else {
+    // Neither has a body
+    return true;
+  }
+}
+
+Status Message::SerializeTo(io::OutputStream* file, int64_t* output_length) const {
+  int32_t metadata_length = 0;
+  RETURN_NOT_OK(WriteMessage(*metadata(), file, &metadata_length));
+
+  *output_length = metadata_length;
+
+  auto body_buffer = body();
+  if (body_buffer) {
+    RETURN_NOT_OK(file->Write(body_buffer->data(), body_buffer->size()));
+    *output_length += body_buffer->size();
+  }
+
+  return Status::OK();
+}
+
+std::string FormatMessageType(Message::Type type) {
+  switch (type) {
+    case Message::SCHEMA:
+      return "schema";
+    case Message::RECORD_BATCH:
+      return "record batch";
+    case Message::DICTIONARY_BATCH:
+      return "dictionary";
+    default:
+      break;
+  }
+  return "unknown";
+}
+
 // ----------------------------------------------------------------------
 
 static Status VisitField(const flatbuf::Field* field, DictionaryTypeMap* id_to_field) {
@@ -975,10 +1037,11 @@ Status GetSchema(const void* opaque_schema, const DictionaryMemo& dictionary_mem
   return Status::OK();
 }
 
-Status GetTensorMetadata(const void* opaque_tensor, std::shared_ptr<DataType>* type,
+Status GetTensorMetadata(const Buffer& metadata, std::shared_ptr<DataType>* type,
     std::vector<int64_t>* shape, std::vector<int64_t>* strides,
     std::vector<std::string>* dim_names) {
-  auto tensor = static_cast<const flatbuf::Tensor*>(opaque_tensor);
+  auto message = flatbuf::GetMessage(metadata.data());
+  auto tensor = reinterpret_cast<const flatbuf::Tensor*>(message->header());
 
   int ndim = static_cast<int>(tensor->shape()->size());
 
@@ -1006,8 +1069,27 @@ Status GetTensorMetadata(const void* opaque_tensor, std::shared_ptr<DataType>* t
 // ----------------------------------------------------------------------
 // Read and write messages
 
+static Status ReadFullMessage(const std::shared_ptr<Buffer>& metadata,
+    io::InputStream* stream, std::unique_ptr<Message>* message) {
+  auto fb_message = flatbuf::GetMessage(metadata->data());
+
+  int64_t body_length = fb_message->bodyLength();
+
+  std::shared_ptr<Buffer> body;
+  RETURN_NOT_OK(stream->Read(body_length, &body));
+
+  if (body->size() < body_length) {
+    std::stringstream ss;
+    ss << "Expected to be able to read " << body_length << " bytes for message body, got "
+       << body->size();
+    return Status::IOError(ss.str());
+  }
+
+  return Message::Open(metadata, body, message);
+}
+
 Status ReadMessage(int64_t offset, int32_t metadata_length, io::RandomAccessFile* file,
-    std::shared_ptr<Message>* message) {
+    std::unique_ptr<Message>* message) {
   std::shared_ptr<Buffer> buffer;
   RETURN_NOT_OK(file->ReadAt(offset, metadata_length, &buffer));
 
@@ -1019,13 +1101,15 @@ Status ReadMessage(int64_t offset, int32_t metadata_length, io::RandomAccessFile
        << ", metadata length: " << metadata_length;
     return Status::Invalid(ss.str());
   }
-  return Message::Open(buffer, 4, message);
+
+  auto metadata = SliceBuffer(buffer, 4, buffer->size() - 4);
+  return ReadFullMessage(metadata, file, message);
 }
 
-Status ReadMessage(io::InputStream* file, std::shared_ptr<Message>* message) {
+Status ReadMessage(io::InputStream* file, std::unique_ptr<Message>* message) {
   std::shared_ptr<Buffer> buffer;
-  RETURN_NOT_OK(file->Read(sizeof(int32_t), &buffer));
 
+  RETURN_NOT_OK(file->Read(sizeof(int32_t), &buffer));
   if (buffer->size() != sizeof(int32_t)) {
     *message = nullptr;
     return Status::OK();
@@ -1044,9 +1128,21 @@ Status ReadMessage(io::InputStream* file, std::shared_ptr<Message>* message) {
     return Status::IOError("Unexpected end of stream trying to read message");
   }
 
-  return Message::Open(buffer, 0, message);
+  return ReadFullMessage(buffer, file, message);
+}
+
+// ----------------------------------------------------------------------
+// Implement InputStream message reader
+
+Status InputStreamMessageReader::ReadNextMessage(std::unique_ptr<Message>* message) {
+  return ReadMessage(stream_.get(), message);
 }
 
+InputStreamMessageReader::~InputStreamMessageReader() {}
+
+// ----------------------------------------------------------------------
+// Implement message writing
+
 Status WriteMessage(
     const Buffer& message, io::OutputStream* file, int32_t* message_length) {
   // Need to write 4 bytes (message size), the message, plus padding to

http://git-wip-us.apache.org/repos/asf/arrow/blob/bb0a7588/cpp/src/arrow/ipc/metadata.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/metadata.h b/cpp/src/arrow/ipc/metadata.h
index 257bbd8..64b2571 100644
--- a/cpp/src/arrow/ipc/metadata.h
+++ b/cpp/src/arrow/ipc/metadata.h
@@ -59,26 +59,12 @@ static constexpr const char* kArrowMagicBytes = "ARROW1";
 constexpr int kMaxNestingDepth = 64;
 
 struct ARROW_EXPORT FieldMetadata {
-  FieldMetadata() {}
-  FieldMetadata(int64_t length, int64_t null_count, int64_t offset)
-      : length(length), null_count(null_count), offset(offset) {}
-
-  FieldMetadata(const FieldMetadata& other) {
-    this->length = other.length;
-    this->null_count = other.null_count;
-    this->offset = other.offset;
-  }
-
   int64_t length;
   int64_t null_count;
   int64_t offset;
 };
 
 struct ARROW_EXPORT BufferMetadata {
-  BufferMetadata() {}
-  BufferMetadata(int32_t page, int64_t offset, int64_t length)
-      : page(page), offset(offset), length(length) {}
-
   /// The shared memory page id where to find this. Set to -1 if unused
   int32_t page;
 
@@ -90,10 +76,6 @@ struct ARROW_EXPORT BufferMetadata {
 };
 
 struct FileBlock {
-  FileBlock() {}
-  FileBlock(int64_t offset, int32_t metadata_length, int64_t body_length)
-      : offset(offset), metadata_length(metadata_length), body_length(body_length) {}
-
   int64_t offset;
   int32_t metadata_length;
   int64_t body_length;
@@ -153,20 +135,46 @@ Status GetDictionaryTypes(const void* opaque_schema, DictionaryTypeMap* id_to_fi
 Status ARROW_EXPORT GetSchema(const void* opaque_schema,
     const DictionaryMemo& dictionary_memo, std::shared_ptr<Schema>* out);
 
-Status ARROW_EXPORT GetTensorMetadata(const void* opaque_tensor,
+Status ARROW_EXPORT GetTensorMetadata(const Buffer& metadata,
     std::shared_ptr<DataType>* type, std::vector<int64_t>* shape,
     std::vector<int64_t>* strides, std::vector<std::string>* dim_names);
 
+/// \brief An IPC message including metadata and body
 class ARROW_EXPORT Message {
  public:
   enum Type { NONE, SCHEMA, DICTIONARY_BATCH, RECORD_BATCH, TENSOR };
 
-  ~Message();
+  /// \brief Construct message, but do not validate
+  ///
+  /// Use at your own risk; Message::Open has more metadata validation
+  Message(const std::shared_ptr<Buffer>& metadata, const std::shared_ptr<Buffer>& body);
 
-  static Status Open(const std::shared_ptr<Buffer>& buffer, int64_t offset,
-      std::shared_ptr<Message>* out);
+  ~Message();
 
-  int64_t body_length() const;
+  /// \brief Create and validate a Message instance from two buffers
+  ///
+  /// \param[in] metadata a buffer containing the Flatbuffer metadata
+  /// \param[in] body a buffer containing the message body, which may be nullptr
+  /// \param[out] out the created message
+  static Status Open(const std::shared_ptr<Buffer>& metadata,
+      const std::shared_ptr<Buffer>& body, std::unique_ptr<Message>* out);
+
+  /// \brief Write length-prefixed metadata and body to output stream
+  ///
+  /// \param[in] file output stream to write to
+  /// \param[out] output_length the number of bytes written
+  /// \return Status
+  bool Equals(const Message& other) const;
+
+  /// \brief the Message metadata
+  ///
+  /// \return buffer
+  std::shared_ptr<Buffer> metadata() const;
+
+  /// \brief the Message body, if any
+  ///
+  /// \return buffer is nullptr if no body
+  std::shared_ptr<Buffer> body() const;
 
   Type type() const;
 
@@ -174,9 +182,14 @@ class ARROW_EXPORT Message {
 
   const void* header() const;
 
- private:
-  Message(const std::shared_ptr<Buffer>& buffer, int64_t offset);
+  /// \brief Write length-prefixed metadata and body to output stream
+  ///
+  /// \param[in] file output stream to write to
+  /// \param[out] output_length the number of bytes written
+  /// \return Status
+  Status SerializeTo(io::OutputStream* file, int64_t* output_length) const;
 
+ private:
   // Hide serialization details from user API
   class MessageImpl;
   std::unique_ptr<MessageImpl> impl_;
@@ -184,8 +197,34 @@ class ARROW_EXPORT Message {
   DISALLOW_COPY_AND_ASSIGN(Message);
 };
 
+ARROW_EXPORT std::string FormatMessageType(Message::Type type);
+
+/// \brief Abstract interface for a sequence of messages
+class ARROW_EXPORT MessageReader {
+ public:
+  virtual ~MessageReader() = default;
+
+  virtual Status ReadNextMessage(std::unique_ptr<Message>* message) = 0;
+};
+
+class ARROW_EXPORT InputStreamMessageReader : public MessageReader {
+ public:
+  explicit InputStreamMessageReader(const std::shared_ptr<io::InputStream>& stream)
+      : stream_(stream) {}
+
+  ~InputStreamMessageReader();
+
+  Status ReadNextMessage(std::unique_ptr<Message>* message) override;
+
+ private:
+  std::shared_ptr<io::InputStream> stream_;
+};
+
+/// \brief Read encapulated RPC message from position in file
+///
 /// Read a length-prefixed message flatbuffer starting at the indicated file
-/// offset
+/// offset. If the message has a body with non-zero length, it will also be
+/// read
 ///
 /// The metadata_length includes at least the length prefix and the flatbuffer
 ///
@@ -196,15 +235,18 @@ class ARROW_EXPORT Message {
 /// \param[out] message the message read
 /// \return Status success or failure
 Status ARROW_EXPORT ReadMessage(int64_t offset, int32_t metadata_length,
-    io::RandomAccessFile* file, std::shared_ptr<Message>* message);
+    io::RandomAccessFile* file, std::unique_ptr<Message>* message);
 
+/// \brief Read encapulated RPC message (metadata and body) from InputStream
+///
 /// Read length-prefixed message with as-yet unknown length. Returns nullptr if
 /// there are not enough bytes available or the message length is 0 (e.g. EOS
 /// in a stream)
 Status ARROW_EXPORT ReadMessage(
-    io::InputStream* stream, std::shared_ptr<Message>* message);
+    io::InputStream* stream, std::unique_ptr<Message>* message);
 
-/// Write a serialized message with a length-prefix and padding to an 8-byte offset
+/// Write a serialized message metadata with a length-prefix and padding to an
+/// 8-byte offset
 ///
 /// <message_size: int32><message: const void*><padding>
 Status ARROW_EXPORT WriteMessage(

http://git-wip-us.apache.org/repos/asf/arrow/blob/bb0a7588/cpp/src/arrow/ipc/reader.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/reader.cc b/cpp/src/arrow/ipc/reader.cc
index 8ca4d82..88ab330 100644
--- a/cpp/src/arrow/ipc/reader.cc
+++ b/cpp/src/arrow/ipc/reader.cc
@@ -257,11 +257,18 @@ static Status LoadArray(const std::shared_ptr<DataType>& type,
   return loader.Load();
 }
 
-Status ReadRecordBatch(const Message& metadata, const std::shared_ptr<Schema>& schema,
+Status ReadRecordBatch(const Buffer& metadata, const std::shared_ptr<Schema>& schema,
     io::RandomAccessFile* file, std::shared_ptr<RecordBatch>* out) {
   return ReadRecordBatch(metadata, schema, kMaxNestingDepth, file, out);
 }
 
+Status ReadRecordBatch(const Message& message, const std::shared_ptr<Schema>& schema,
+    std::shared_ptr<RecordBatch>* out) {
+  io::BufferReader reader(message.body());
+  DCHECK_EQ(message.type(), Message::RECORD_BATCH);
+  return ReadRecordBatch(*message.metadata(), schema, kMaxNestingDepth, &reader, out);
+}
+
 // ----------------------------------------------------------------------
 // Array loading
 
@@ -294,18 +301,22 @@ static inline Status ReadRecordBatch(const flatbuf::RecordBatch* metadata,
       schema, metadata->length(), max_recursion_depth, &source, out);
 }
 
-Status ReadRecordBatch(const Message& metadata, const std::shared_ptr<Schema>& schema,
+Status ReadRecordBatch(const Buffer& metadata, const std::shared_ptr<Schema>& schema,
     int max_recursion_depth, io::RandomAccessFile* file,
     std::shared_ptr<RecordBatch>* out) {
-  DCHECK_EQ(metadata.type(), Message::RECORD_BATCH);
-  auto batch = reinterpret_cast<const flatbuf::RecordBatch*>(metadata.header());
+  auto message = flatbuf::GetMessage(metadata.data());
+  if (message->header_type() != flatbuf::MessageHeader_RecordBatch) {
+    DCHECK_EQ(message->header_type(), flatbuf::MessageHeader_RecordBatch);
+  }
+  auto batch = reinterpret_cast<const flatbuf::RecordBatch*>(message->header());
   return ReadRecordBatch(batch, schema, max_recursion_depth, file, out);
 }
 
-Status ReadDictionary(const Message& metadata, const DictionaryTypeMap& dictionary_types,
+Status ReadDictionary(const Buffer& metadata, const DictionaryTypeMap& dictionary_types,
     io::RandomAccessFile* file, int64_t* dictionary_id, std::shared_ptr<Array>* out) {
+  auto message = flatbuf::GetMessage(metadata.data());
   auto dictionary_batch =
-      reinterpret_cast<const flatbuf::DictionaryBatch*>(metadata.header());
+      reinterpret_cast<const flatbuf::DictionaryBatch*>(message->header());
 
   int64_t id = *dictionary_id = dictionary_batch->id();
   auto it = dictionary_types.find(id);
@@ -335,25 +346,33 @@ Status ReadDictionary(const Message& metadata, const DictionaryTypeMap& dictiona
   return Status::OK();
 }
 
+static Status ReadMessageAndValidate(MessageReader* reader, Message::Type expected_type,
+    bool allow_null, std::unique_ptr<Message>* message) {
+  RETURN_NOT_OK(reader->ReadNextMessage(message));
+
+  if (!(*message) && !allow_null) {
+    std::stringstream ss;
+    ss << "Expected " << FormatMessageType(expected_type)
+       << " message in stream, was null or length 0";
+    return Status::Invalid(ss.str());
+  }
+
+  if ((*message) == nullptr) { return Status::OK(); }
+
+  if ((*message)->type() != expected_type) {
+    std::stringstream ss;
+    ss << "Message not expected type: " << FormatMessageType(expected_type)
+       << ", was: " << (*message)->type();
+    return Status::IOError(ss.str());
+  }
+  return Status::OK();
+}
+
 // ----------------------------------------------------------------------
 // RecordBatchStreamReader implementation
 
 static inline FileBlock FileBlockFromFlatbuffer(const flatbuf::Block* block) {
-  return FileBlock(block->offset(), block->metaDataLength(), block->bodyLength());
-}
-
-static inline std::string FormatMessageType(Message::Type type) {
-  switch (type) {
-    case Message::SCHEMA:
-      return "schema";
-    case Message::RECORD_BATCH:
-      return "record batch";
-    case Message::DICTIONARY_BATCH:
-      return "dictionary";
-    default:
-      break;
-  }
-  return "unknown";
+  return FileBlock{block->offset(), block->metaDataLength(), block->bodyLength()};
 }
 
 RecordBatchReader::~RecordBatchReader() {}
@@ -363,59 +382,29 @@ class RecordBatchStreamReader::RecordBatchStreamReaderImpl {
   RecordBatchStreamReaderImpl() {}
   ~RecordBatchStreamReaderImpl() {}
 
-  Status Open(const std::shared_ptr<io::InputStream>& stream) {
-    stream_ = stream;
+  Status Open(std::unique_ptr<MessageReader> message_reader) {
+    message_reader_ = std::move(message_reader);
     return ReadSchema();
   }
 
-  Status ReadNextMessage(
-      Message::Type expected_type, bool allow_null, std::shared_ptr<Message>* message) {
-    RETURN_NOT_OK(ReadMessage(stream_.get(), message));
-
-    if (!(*message) && !allow_null) {
-      std::stringstream ss;
-      ss << "Expected " << FormatMessageType(expected_type)
-         << " message in stream, was null or length 0";
-      return Status::Invalid(ss.str());
-    }
-
-    if ((*message) == nullptr) { return Status::OK(); }
-
-    if ((*message)->type() != expected_type) {
-      std::stringstream ss;
-      ss << "Message not expected type: " << FormatMessageType(expected_type)
-         << ", was: " << (*message)->type();
-      return Status::IOError(ss.str());
-    }
-    return Status::OK();
-  }
-
-  Status ReadExact(int64_t size, std::shared_ptr<Buffer>* buffer) {
-    RETURN_NOT_OK(stream_->Read(size, buffer));
-
-    if ((*buffer)->size() < size) {
-      return Status::IOError("Unexpected EOS when reading buffer");
-    }
-    return Status::OK();
-  }
-
   Status ReadNextDictionary() {
-    std::shared_ptr<Message> message;
-    RETURN_NOT_OK(ReadNextMessage(Message::DICTIONARY_BATCH, false, &message));
+    std::unique_ptr<Message> message;
+    RETURN_NOT_OK(ReadMessageAndValidate(
+        message_reader_.get(), Message::DICTIONARY_BATCH, false, &message));
 
-    std::shared_ptr<Buffer> batch_body;
-    RETURN_NOT_OK(ReadExact(message->body_length(), &batch_body));
-    io::BufferReader reader(batch_body);
+    io::BufferReader reader(message->body());
 
     std::shared_ptr<Array> dictionary;
     int64_t id;
-    RETURN_NOT_OK(ReadDictionary(*message, dictionary_types_, &reader, &id, &dictionary));
+    RETURN_NOT_OK(ReadDictionary(
+        *message->metadata(), dictionary_types_, &reader, &id, &dictionary));
     return dictionary_memo_.AddDictionary(id, dictionary);
   }
 
   Status ReadSchema() {
-    std::shared_ptr<Message> message;
-    RETURN_NOT_OK(ReadNextMessage(Message::SCHEMA, false, &message));
+    std::unique_ptr<Message> message;
+    RETURN_NOT_OK(
+        ReadMessageAndValidate(message_reader_.get(), Message::SCHEMA, false, &message));
 
     RETURN_NOT_OK(GetDictionaryTypes(message->header(), &dictionary_types_));
 
@@ -429,9 +418,10 @@ class RecordBatchStreamReader::RecordBatchStreamReaderImpl {
     return GetSchema(message->header(), dictionary_memo_, &schema_);
   }
 
-  Status GetNextRecordBatch(std::shared_ptr<RecordBatch>* batch) {
-    std::shared_ptr<Message> message;
-    RETURN_NOT_OK(ReadNextMessage(Message::RECORD_BATCH, true, &message));
+  Status ReadNextRecordBatch(std::shared_ptr<RecordBatch>* batch) {
+    std::unique_ptr<Message> message;
+    RETURN_NOT_OK(ReadMessageAndValidate(
+        message_reader_.get(), Message::RECORD_BATCH, true, &message));
 
     if (message == nullptr) {
       // End of stream
@@ -439,21 +429,18 @@ class RecordBatchStreamReader::RecordBatchStreamReaderImpl {
       return Status::OK();
     }
 
-    std::shared_ptr<Buffer> batch_body;
-    RETURN_NOT_OK(ReadExact(message->body_length(), &batch_body));
-    io::BufferReader reader(batch_body);
-    return ReadRecordBatch(*message, schema_, &reader, batch);
+    io::BufferReader reader(message->body());
+    return ReadRecordBatch(*message->metadata(), schema_, &reader, batch);
   }
 
   std::shared_ptr<Schema> schema() const { return schema_; }
 
  private:
+  std::unique_ptr<MessageReader> message_reader_;
+
   // dictionary_id -> type
   DictionaryTypeMap dictionary_types_;
-
   DictionaryMemo dictionary_memo_;
-
-  std::shared_ptr<io::InputStream> stream_;
   std::shared_ptr<Schema> schema_;
 };
 
@@ -463,19 +450,25 @@ RecordBatchStreamReader::RecordBatchStreamReader() {
 
 RecordBatchStreamReader::~RecordBatchStreamReader() {}
 
-Status RecordBatchStreamReader::Open(const std::shared_ptr<io::InputStream>& stream,
+Status RecordBatchStreamReader::Open(std::unique_ptr<MessageReader> message_reader,
     std::shared_ptr<RecordBatchStreamReader>* reader) {
   // Private ctor
   *reader = std::shared_ptr<RecordBatchStreamReader>(new RecordBatchStreamReader());
-  return (*reader)->impl_->Open(stream);
+  return (*reader)->impl_->Open(std::move(message_reader));
+}
+
+Status RecordBatchStreamReader::Open(const std::shared_ptr<io::InputStream>& stream,
+    std::shared_ptr<RecordBatchStreamReader>* out) {
+  std::unique_ptr<MessageReader> message_reader(new InputStreamMessageReader(stream));
+  return Open(std::move(message_reader), out);
 }
 
 std::shared_ptr<Schema> RecordBatchStreamReader::schema() const {
   return impl_->schema();
 }
 
-Status RecordBatchStreamReader::GetNextRecordBatch(std::shared_ptr<RecordBatch>* batch) {
-  return impl_->GetNextRecordBatch(batch);
+Status RecordBatchStreamReader::ReadNextRecordBatch(std::shared_ptr<RecordBatch>* batch) {
+  return impl_->ReadNextRecordBatch(batch);
 }
 
 // ----------------------------------------------------------------------
@@ -547,22 +540,17 @@ class RecordBatchFileReader::RecordBatchFileReaderImpl {
     return FileBlockFromFlatbuffer(footer_->dictionaries()->Get(i));
   }
 
-  Status GetRecordBatch(int i, std::shared_ptr<RecordBatch>* batch) {
+  Status ReadRecordBatch(int i, std::shared_ptr<RecordBatch>* batch) {
     DCHECK_GE(i, 0);
     DCHECK_LT(i, num_record_batches());
     FileBlock block = record_batch(i);
 
-    std::shared_ptr<Message> message;
+    std::unique_ptr<Message> message;
     RETURN_NOT_OK(
         ReadMessage(block.offset, block.metadata_length, file_.get(), &message));
 
-    // TODO(wesm): ARROW-388 -- the buffer frame of reference is 0 (see
-    // ARROW-384).
-    std::shared_ptr<Buffer> buffer_block;
-    RETURN_NOT_OK(file_->Read(block.body_length, &buffer_block));
-    io::BufferReader reader(buffer_block);
-
-    return ReadRecordBatch(*message, schema_, &reader, batch);
+    io::BufferReader reader(message->body());
+    return ::arrow::ipc::ReadRecordBatch(*message->metadata(), schema_, &reader, batch);
   }
 
   Status ReadSchema() {
@@ -571,23 +559,16 @@ class RecordBatchFileReader::RecordBatchFileReaderImpl {
     // Read all the dictionaries
     for (int i = 0; i < num_dictionaries(); ++i) {
       FileBlock block = dictionary(i);
-      std::shared_ptr<Message> message;
+      std::unique_ptr<Message> message;
       RETURN_NOT_OK(
           ReadMessage(block.offset, block.metadata_length, file_.get(), &message));
 
-      // TODO(wesm): ARROW-577: This code is a bit duplicated, can be fixed
-      // with a more invasive refactor
-
-      // TODO(wesm): ARROW-388 -- the buffer frame of reference is 0 (see
-      // ARROW-384).
-      std::shared_ptr<Buffer> buffer_block;
-      RETURN_NOT_OK(file_->Read(block.body_length, &buffer_block));
-      io::BufferReader reader(buffer_block);
+      io::BufferReader reader(message->body());
 
       std::shared_ptr<Array> dictionary;
       int64_t dictionary_id;
-      RETURN_NOT_OK(ReadDictionary(
-          *message, dictionary_fields_, &reader, &dictionary_id, &dictionary));
+      RETURN_NOT_OK(ReadDictionary(*message->metadata(), dictionary_fields_, &reader,
+          &dictionary_id, &dictionary));
       RETURN_NOT_OK(dictionary_memo_->AddDictionary(dictionary_id, dictionary));
     }
 
@@ -653,12 +634,13 @@ MetadataVersion RecordBatchFileReader::version() const {
   return impl_->version();
 }
 
-Status RecordBatchFileReader::GetRecordBatch(int i, std::shared_ptr<RecordBatch>* batch) {
-  return impl_->GetRecordBatch(i, batch);
+Status RecordBatchFileReader::ReadRecordBatch(
+    int i, std::shared_ptr<RecordBatch>* batch) {
+  return impl_->ReadRecordBatch(i, batch);
 }
 
-static Status ReadContiguousPayload(int64_t offset, io::RandomAccessFile* file,
-    std::shared_ptr<Message>* message, std::shared_ptr<Buffer>* payload) {
+static Status ReadContiguousPayload(
+    int64_t offset, io::RandomAccessFile* file, std::unique_ptr<Message>* message) {
   std::shared_ptr<Buffer> buffer;
   RETURN_NOT_OK(file->Seek(offset));
   RETURN_NOT_OK(ReadMessage(file, message));
@@ -666,38 +648,32 @@ static Status ReadContiguousPayload(int64_t offset, io::RandomAccessFile* file,
   if (*message == nullptr) {
     return Status::Invalid("Unable to read metadata at offset");
   }
-
-  // TODO(ARROW-388): The buffer offsets start at 0, so we must construct a
-  // RandomAccessFile according to that frame of reference
-  RETURN_NOT_OK(file->Read((*message)->body_length(), payload));
   return Status::OK();
 }
 
 Status ReadRecordBatch(const std::shared_ptr<Schema>& schema, int64_t offset,
     io::RandomAccessFile* file, std::shared_ptr<RecordBatch>* out) {
-  std::shared_ptr<Buffer> payload;
-  std::shared_ptr<Message> message;
-
-  RETURN_NOT_OK(ReadContiguousPayload(offset, file, &message, &payload));
-  io::BufferReader buffer_reader(payload);
-  return ReadRecordBatch(*message, schema, kMaxNestingDepth, &buffer_reader, out);
+  std::unique_ptr<Message> message;
+  RETURN_NOT_OK(ReadContiguousPayload(offset, file, &message));
+  io::BufferReader buffer_reader(message->body());
+  return ReadRecordBatch(
+      *message->metadata(), schema, kMaxNestingDepth, &buffer_reader, out);
 }
 
 Status ReadTensor(
     int64_t offset, io::RandomAccessFile* file, std::shared_ptr<Tensor>* out) {
   // Respect alignment of Tensor messages (see WriteTensor)
   offset = PaddedLength(offset);
-  std::shared_ptr<Message> message;
-  std::shared_ptr<Buffer> data;
-  RETURN_NOT_OK(ReadContiguousPayload(offset, file, &message, &data));
+  std::unique_ptr<Message> message;
+  RETURN_NOT_OK(ReadContiguousPayload(offset, file, &message));
 
   std::shared_ptr<DataType> type;
   std::vector<int64_t> shape;
   std::vector<int64_t> strides;
   std::vector<std::string> dim_names;
   RETURN_NOT_OK(
-      GetTensorMetadata(message->header(), &type, &shape, &strides, &dim_names));
-  *out = std::make_shared<Tensor>(type, data, shape, strides, dim_names);
+      GetTensorMetadata(*message->metadata(), &type, &shape, &strides, &dim_names));
+  *out = std::make_shared<Tensor>(type, message->body(), shape, strides, dim_names);
   return Status::OK();
 }
 

http://git-wip-us.apache.org/repos/asf/arrow/blob/bb0a7588/cpp/src/arrow/ipc/reader.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/reader.h b/cpp/src/arrow/ipc/reader.h
index dd29a36..d6c2614 100644
--- a/cpp/src/arrow/ipc/reader.h
+++ b/cpp/src/arrow/ipc/reader.h
@@ -57,7 +57,7 @@ class ARROW_EXPORT RecordBatchReader {
   ///
   /// \param(out) batch the next loaded batch, nullptr at end of stream
   /// \return Status
-  virtual Status GetNextRecordBatch(std::shared_ptr<RecordBatch>* batch) = 0;
+  virtual Status ReadNextRecordBatch(std::shared_ptr<RecordBatch>* batch) = 0;
 };
 
 /// \class RecordBatchStreamReader
@@ -66,16 +66,24 @@ class ARROW_EXPORT RecordBatchStreamReader : public RecordBatchReader {
  public:
   virtual ~RecordBatchStreamReader();
 
-  /// Create batch reader from InputStream
+  /// Create batch reader from generic MessageReader
+  ///
+  /// \param(in) message_reader a MessageReader implementation
+  /// \param(out) out the created RecordBatchStreamReader object
+  /// \return Status
+  static Status Open(std::unique_ptr<MessageReader> message_reader,
+      std::shared_ptr<RecordBatchStreamReader>* out);
+
+  /// \Create Record batch stream reader from InputStream
   ///
   /// \param(in) stream an input stream instance
-  /// \param(out) reader the created reader object
+  /// \param(out) out the created RecordBatchStreamReader object
   /// \return Status
   static Status Open(const std::shared_ptr<io::InputStream>& stream,
-      std::shared_ptr<RecordBatchStreamReader>* reader);
+      std::shared_ptr<RecordBatchStreamReader>* out);
 
   std::shared_ptr<Schema> schema() const override;
-  Status GetNextRecordBatch(std::shared_ptr<RecordBatch>* batch) override;
+  Status ReadNextRecordBatch(std::shared_ptr<RecordBatch>* batch) override;
 
  private:
   RecordBatchStreamReader();
@@ -122,7 +130,7 @@ class ARROW_EXPORT RecordBatchFileReader {
   /// \param(in) i the index of the record batch to return
   /// \param(out) batch the read batch
   /// \return Status
-  Status GetRecordBatch(int i, std::shared_ptr<RecordBatch>* batch);
+  Status ReadRecordBatch(int i, std::shared_ptr<RecordBatch>* batch);
 
  private:
   RecordBatchFileReader();
@@ -133,16 +141,25 @@ class ARROW_EXPORT RecordBatchFileReader {
 
 // Generic read functions; does not copy data if the input supports zero copy reads
 
-/// Read record batch from file given metadata and schema
+/// \brief Read record batch from file given metadata and schema
 ///
 /// \param(in) metadata a Message containing the record batch metadata
 /// \param(in) schema the record batch schema
 /// \param(in) file a random access file
 /// \param(out) out the read record batch
-Status ARROW_EXPORT ReadRecordBatch(const Message& metadata,
+Status ARROW_EXPORT ReadRecordBatch(const Buffer& metadata,
     const std::shared_ptr<Schema>& schema, io::RandomAccessFile* file,
     std::shared_ptr<RecordBatch>* out);
 
+/// \brief Read record batch from fully encapulated Message
+///
+/// \param[in] message a message instance containing metadata and body
+/// \param[in] schema
+/// \param[out] out the resulting RecordBatch
+/// \return Status
+Status ARROW_EXPORT ReadRecordBatch(const Message& message,
+    const std::shared_ptr<Schema>& schema, std::shared_ptr<RecordBatch>* out);
+
 /// Read record batch from file given metadata and schema
 ///
 /// \param(in) metadata a Message containing the record batch metadata
@@ -150,7 +167,7 @@ Status ARROW_EXPORT ReadRecordBatch(const Message& metadata,
 /// \param(in) file a random access file
 /// \param(in) max_recursion_depth the maximum permitted nesting depth
 /// \param(out) out the read record batch
-Status ARROW_EXPORT ReadRecordBatch(const Message& metadata,
+Status ARROW_EXPORT ReadRecordBatch(const Buffer& metadata,
     const std::shared_ptr<Schema>& schema, int max_recursion_depth,
     io::RandomAccessFile* file, std::shared_ptr<RecordBatch>* out);
 

http://git-wip-us.apache.org/repos/asf/arrow/blob/bb0a7588/cpp/src/arrow/ipc/stream-to-file.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/stream-to-file.cc b/cpp/src/arrow/ipc/stream-to-file.cc
index b942054..de65883 100644
--- a/cpp/src/arrow/ipc/stream-to-file.cc
+++ b/cpp/src/arrow/ipc/stream-to-file.cc
@@ -40,7 +40,7 @@ Status ConvertToFile() {
 
   std::shared_ptr<RecordBatch> batch;
   while (true) {
-    RETURN_NOT_OK(reader->GetNextRecordBatch(&batch));
+    RETURN_NOT_OK(reader->ReadNextRecordBatch(&batch));
     if (batch == nullptr) break;
     RETURN_NOT_OK(writer->WriteRecordBatch(*batch));
   }

http://git-wip-us.apache.org/repos/asf/arrow/blob/bb0a7588/cpp/src/arrow/ipc/writer.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/writer.cc b/cpp/src/arrow/ipc/writer.cc
index 7563343..14708a1 100644
--- a/cpp/src/arrow/ipc/writer.cc
+++ b/cpp/src/arrow/ipc/writer.cc
@@ -110,7 +110,7 @@ class RecordBatchSerializer : public ArrayVisitor {
     }
 
     // push back all common elements
-    field_nodes_.emplace_back(arr.length(), arr.null_count(), 0);
+    field_nodes_.push_back({arr.length(), arr.null_count(), 0});
 
     if (arr.null_count() > 0) {
       std::shared_ptr<Buffer> bitmap;
@@ -680,7 +680,7 @@ class RecordBatchStreamWriter::RecordBatchStreamWriterImpl {
 
   Status WriteRecordBatch(const RecordBatch& batch, bool allow_64bit) {
     // Push an empty FileBlock. Can be written in the footer later
-    record_batches_.emplace_back(0, 0, 0);
+    record_batches_.push_back({0, 0, 0});
     return WriteRecordBatch(
         batch, allow_64bit, &record_batches_[record_batches_.size() - 1]);
   }

http://git-wip-us.apache.org/repos/asf/arrow/blob/bb0a7588/cpp/src/arrow/python/builtin_convert.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/python/builtin_convert.cc b/cpp/src/arrow/python/builtin_convert.cc
index f10dac7..816f95a 100644
--- a/cpp/src/arrow/python/builtin_convert.cc
+++ b/cpp/src/arrow/python/builtin_convert.cc
@@ -477,9 +477,8 @@ class FixedWidthBytesConverter
   inline Status AppendItem(const OwnedRef& item) {
     PyObject* bytes_obj;
     OwnedRef tmp;
-    Py_ssize_t expected_length =
-        std::dynamic_pointer_cast<FixedSizeBinaryType>(typed_builder_->type())
-            ->byte_width();
+    Py_ssize_t expected_length = std::dynamic_pointer_cast<FixedSizeBinaryType>(
+        typed_builder_->type())->byte_width();
     if (item.obj() == Py_None) {
       RETURN_NOT_OK(typed_builder_->AppendNull());
       return Status::OK();

http://git-wip-us.apache.org/repos/asf/arrow/blob/bb0a7588/python/doc/source/api.rst
----------------------------------------------------------------------
diff --git a/python/doc/source/api.rst b/python/doc/source/api.rst
index 4810a31..400614d 100644
--- a/python/doc/source/api.rst
+++ b/python/doc/source/api.rst
@@ -137,7 +137,6 @@ Tables and Record Batches
    Column
    RecordBatch
    Table
-   get_record_batch_size
 
 .. _api.tensor:
 
@@ -148,9 +147,6 @@ Tensor type and Functions
    :toctree: generated/
 
    Tensor
-   write_tensor
-   get_tensor_size
-   read_tensor
 
 .. _api.io:
 
@@ -177,12 +173,20 @@ Interprocess Communication and Messaging
 .. autosummary::
    :toctree: generated/
 
+   Message
+   MessageReader
    RecordBatchFileReader
    RecordBatchFileWriter
    RecordBatchStreamReader
    RecordBatchStreamWriter
    open_file
    open_stream
+   read_message
+   read_record_batch
+   get_record_batch_size
+   read_tensor
+   write_tensor
+   get_tensor_size
 
 .. _api.memory_pool:
 

http://git-wip-us.apache.org/repos/asf/arrow/blob/bb0a7588/python/pyarrow/__init__.py
----------------------------------------------------------------------
diff --git a/python/pyarrow/__init__.py b/python/pyarrow/__init__.py
index 434722c..f7cddd0 100644
--- a/python/pyarrow/__init__.py
+++ b/python/pyarrow/__init__.py
@@ -69,9 +69,8 @@ from pyarrow.lib import (null, bool_,
 from pyarrow.lib import (HdfsFile, NativeFile, PythonFile,
                          Buffer, BufferReader, BufferOutputStream,
                          OSFile, MemoryMappedFile, memory_map,
-                         frombuffer, read_tensor, write_tensor,
+                         frombuffer,
                          memory_map, create_memory_map,
-                         get_record_batch_size, get_tensor_size,
                          have_libhdfs, have_libhdfs3, MockOutputStream)
 
 from pyarrow.lib import (MemoryPool, total_allocated_bytes,
@@ -89,8 +88,12 @@ from pyarrow.lib import (ArrowException,
 
 from pyarrow.filesystem import Filesystem, HdfsClient, LocalFilesystem
 
-from pyarrow.ipc import (RecordBatchFileReader, RecordBatchFileWriter,
+from pyarrow.ipc import (Message, MessageReader,
+                         RecordBatchFileReader, RecordBatchFileWriter,
                          RecordBatchStreamReader, RecordBatchStreamWriter,
+                         read_message, read_record_batch, read_tensor,
+                         write_tensor,
+                         get_record_batch_size, get_tensor_size,
                          open_stream,
                          open_file,
                          serialize_pandas, deserialize_pandas)

http://git-wip-us.apache.org/repos/asf/arrow/blob/bb0a7588/python/pyarrow/feather.pxi
----------------------------------------------------------------------
diff --git a/python/pyarrow/feather.pxi b/python/pyarrow/feather.pxi
new file mode 100644
index 0000000..2e7cf6c
--- /dev/null
+++ b/python/pyarrow/feather.pxi
@@ -0,0 +1,109 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+#----------------------------------------------------------------------
+# Implement legacy Feather file format
+
+
+class FeatherError(Exception):
+    pass
+
+
+cdef class FeatherWriter:
+    cdef:
+        unique_ptr[CFeatherWriter] writer
+
+    cdef public:
+        int64_t num_rows
+
+    def __cinit__(self):
+        self.num_rows = -1
+
+    def open(self, object dest):
+        cdef shared_ptr[OutputStream] sink
+        get_writer(dest, &sink)
+
+        with nogil:
+            check_status(CFeatherWriter.Open(sink, &self.writer))
+
+    def close(self):
+        if self.num_rows < 0:
+            self.num_rows = 0
+        self.writer.get().SetNumRows(self.num_rows)
+        check_status(self.writer.get().Finalize())
+
+    def write_array(self, object name, object col, object mask=None):
+        cdef Array arr
+
+        if self.num_rows >= 0:
+            if len(col) != self.num_rows:
+                raise ValueError('prior column had a different number of rows')
+        else:
+            self.num_rows = len(col)
+
+        if isinstance(col, Array):
+            arr = col
+        else:
+            arr = Array.from_pandas(col, mask=mask)
+
+        cdef c_string c_name = tobytes(name)
+
+        with nogil:
+            check_status(
+                self.writer.get().Append(c_name, deref(arr.sp_array)))
+
+
+cdef class FeatherReader:
+    cdef:
+        unique_ptr[CFeatherReader] reader
+
+    def __cinit__(self):
+        pass
+
+    def open(self, source):
+        cdef shared_ptr[RandomAccessFile] reader
+        get_reader(source, &reader)
+
+        with nogil:
+            check_status(CFeatherReader.Open(reader, &self.reader))
+
+    property num_rows:
+
+        def __get__(self):
+            return self.reader.get().num_rows()
+
+    property num_columns:
+
+        def __get__(self):
+            return self.reader.get().num_columns()
+
+    def get_column_name(self, int i):
+        cdef c_string name = self.reader.get().GetColumnName(i)
+        return frombytes(name)
+
+    def get_column(self, int i):
+        if i < 0 or i >= self.num_columns:
+            raise IndexError(i)
+
+        cdef shared_ptr[CColumn] sp_column
+        with nogil:
+            check_status(self.reader.get()
+                         .GetColumn(i, &sp_column))
+
+        cdef Column col = Column()
+        col.init(sp_column)
+        return col

http://git-wip-us.apache.org/repos/asf/arrow/blob/bb0a7588/python/pyarrow/includes/libarrow.pxd
----------------------------------------------------------------------
diff --git a/python/pyarrow/includes/libarrow.pxd b/python/pyarrow/includes/libarrow.pxd
index 9fad824..dd791cd 100644
--- a/python/pyarrow/includes/libarrow.pxd
+++ b/python/pyarrow/includes/libarrow.pxd
@@ -546,41 +546,41 @@ cdef extern from "arrow/io/memory.h" namespace "arrow::io" nogil:
         int64_t GetExtentBytesWritten()
 
 
-cdef extern from "arrow/ipc/metadata.h" namespace "arrow::ipc" nogil:
-    cdef cppclass SchemaMessage:
-        int num_fields()
-        CStatus GetField(int i, shared_ptr[CField]* out)
-        CStatus GetSchema(shared_ptr[CSchema]* out)
-
-    cdef cppclass FieldMetadata:
-        pass
-
-    cdef cppclass BufferMetadata:
-        pass
-
-    cdef cppclass RecordBatchMessage:
-        pass
-
-    cdef cppclass DictionaryBatchMessage:
-        pass
-
+cdef extern from "arrow/ipc/api.h" namespace "arrow::ipc" nogil:
     enum MessageType" arrow::ipc::Message::Type":
         MessageType_SCHEMA" arrow::ipc::Message::SCHEMA"
         MessageType_RECORD_BATCH" arrow::ipc::Message::RECORD_BATCH"
         MessageType_DICTIONARY_BATCH" arrow::ipc::Message::DICTIONARY_BATCH"
 
-    cdef cppclass Message:
-        CStatus Open(const shared_ptr[CBuffer]& buf,
-                     shared_ptr[Message]* out)
-        int64_t body_length()
+    enum MetadataVersion" arrow::ipc::MetadataVersion":
+        MessageType_V1" arrow::ipc::MetadataVersion::V1"
+        MessageType_V2" arrow::ipc::MetadataVersion::V2"
+        MessageType_V3" arrow::ipc::MetadataVersion::V3"
+
+    cdef cppclass CMessage" arrow::ipc::Message":
+        CStatus Open(const shared_ptr[CBuffer]& metadata,
+                     const shared_ptr[CBuffer]& body,
+                     unique_ptr[CMessage]* out)
+
+        shared_ptr[CBuffer] body()
+
+        c_bool Equals(const CMessage& other)
+
+        shared_ptr[CBuffer] metadata()
+        MetadataVersion metadata_version()
         MessageType type()
 
-        shared_ptr[SchemaMessage] GetSchema()
-        shared_ptr[RecordBatchMessage] GetRecordBatch()
-        shared_ptr[DictionaryBatchMessage] GetDictionaryBatch()
+        CStatus SerializeTo(OutputStream* stream, int64_t* output_length)
 
+    c_string FormatMessageType(MessageType type)
 
-cdef extern from "arrow/ipc/api.h" namespace "arrow::ipc" nogil:
+    cdef cppclass CMessageReader \
+        " arrow::ipc::MessageReader":
+        CStatus ReadNextMessage(unique_ptr[CMessage]* out)
+
+    cdef cppclass CInputStreamMessageReader \
+        " arrow::ipc::InputStreamMessageReader":
+        CInputStreamMessageReader(const shared_ptr[InputStream]& stream)
 
     cdef cppclass CRecordBatchWriter \
         " arrow::ipc::RecordBatchWriter":
@@ -590,7 +590,7 @@ cdef extern from "arrow/ipc/api.h" namespace "arrow::ipc" nogil:
     cdef cppclass CRecordBatchReader \
         " arrow::ipc::RecordBatchReader":
         shared_ptr[CSchema] schema()
-        CStatus GetNextRecordBatch(shared_ptr[CRecordBatch]* batch)
+        CStatus ReadNextRecordBatch(shared_ptr[CRecordBatch]* batch)
 
     cdef cppclass CRecordBatchStreamReader \
         " arrow::ipc::RecordBatchStreamReader"(CRecordBatchReader):
@@ -598,6 +598,10 @@ cdef extern from "arrow/ipc/api.h" namespace "arrow::ipc" nogil:
         CStatus Open(const shared_ptr[InputStream]& stream,
                      shared_ptr[CRecordBatchStreamReader]* out)
 
+        @staticmethod
+        CStatus Open2" Open"(unique_ptr[CMessageReader] message_reader,
+                     shared_ptr[CRecordBatchStreamReader]* out)
+
     cdef cppclass CRecordBatchStreamWriter \
         " arrow::ipc::RecordBatchStreamWriter"(CRecordBatchWriter):
         @staticmethod
@@ -625,7 +629,9 @@ cdef extern from "arrow/ipc/api.h" namespace "arrow::ipc" nogil:
 
         int num_record_batches()
 
-        CStatus GetRecordBatch(int i, shared_ptr[CRecordBatch]* batch)
+        CStatus ReadRecordBatch(int i, shared_ptr[CRecordBatch]* batch)
+
+    CStatus ReadMessage(InputStream* stream, unique_ptr[CMessage]* message)
 
     CStatus GetRecordBatchSize(const CRecordBatch& batch, int64_t* size)
     CStatus GetTensorSize(const CTensor& tensor, int64_t* size)
@@ -637,6 +643,10 @@ cdef extern from "arrow/ipc/api.h" namespace "arrow::ipc" nogil:
     CStatus ReadTensor(int64_t offset, RandomAccessFile* file,
                        shared_ptr[CTensor]* out)
 
+    CStatus ReadRecordBatch(const CMessage& message,
+                            const shared_ptr[CSchema]& schema,
+                            shared_ptr[CRecordBatch]* out)
+
 
 cdef extern from "arrow/ipc/feather.h" namespace "arrow::ipc::feather" nogil:
 

http://git-wip-us.apache.org/repos/asf/arrow/blob/bb0a7588/python/pyarrow/io.pxi
----------------------------------------------------------------------
diff --git a/python/pyarrow/io.pxi b/python/pyarrow/io.pxi
index 3221185..8b213a3 100644
--- a/python/pyarrow/io.pxi
+++ b/python/pyarrow/io.pxi
@@ -956,356 +956,3 @@ cdef class HdfsFile(NativeFile):
 
     def __dealloc__(self):
         self.parent = None
-
-# ----------------------------------------------------------------------
-# File and stream readers and writers
-
-cdef class _RecordBatchWriter:
-    cdef:
-        shared_ptr[CRecordBatchWriter] writer
-        shared_ptr[OutputStream] sink
-        bint closed
-
-    def __cinit__(self):
-        self.closed = True
-
-    def __dealloc__(self):
-        if not self.closed:
-            self.close()
-
-    def _open(self, sink, Schema schema):
-        cdef:
-            shared_ptr[CRecordBatchStreamWriter] writer
-
-        get_writer(sink, &self.sink)
-
-        with nogil:
-            check_status(
-                CRecordBatchStreamWriter.Open(self.sink.get(),
-                                              schema.sp_schema,
-                                              &writer))
-
-        self.writer = <shared_ptr[CRecordBatchWriter]> writer
-        self.closed = False
-
-    def write_batch(self, RecordBatch batch):
-        with nogil:
-            check_status(self.writer.get()
-                         .WriteRecordBatch(deref(batch.batch)))
-
-    def close(self):
-        with nogil:
-            check_status(self.writer.get().Close())
-        self.closed = True
-
-
-cdef class _RecordBatchReader:
-    cdef:
-        shared_ptr[CRecordBatchReader] reader
-
-    cdef readonly:
-        Schema schema
-
-    def __cinit__(self):
-        pass
-
-    def _open(self, source):
-        cdef:
-            shared_ptr[RandomAccessFile] file_handle
-            shared_ptr[InputStream] in_stream
-            shared_ptr[CRecordBatchStreamReader] reader
-
-        get_reader(source, &file_handle)
-        in_stream = <shared_ptr[InputStream]> file_handle
-
-        with nogil:
-            check_status(CRecordBatchStreamReader.Open(in_stream, &reader))
-
-        self.reader = <shared_ptr[CRecordBatchReader]> reader
-        self.schema = pyarrow_wrap_schema(self.reader.get().schema())
-
-    def get_next_batch(self):
-        """
-        Read next RecordBatch from the stream. Raises StopIteration at end of
-        stream
-        """
-        cdef shared_ptr[CRecordBatch] batch
-
-        with nogil:
-            check_status(self.reader.get().GetNextRecordBatch(&batch))
-
-        if batch.get() == NULL:
-            raise StopIteration
-
-        return pyarrow_wrap_batch(batch)
-
-    def read_all(self):
-        """
-        Read all record batches as a pyarrow.Table
-        """
-        cdef:
-            vector[shared_ptr[CRecordBatch]] batches
-            shared_ptr[CRecordBatch] batch
-            shared_ptr[CTable] table
-
-        with nogil:
-            while True:
-                check_status(self.reader.get().GetNextRecordBatch(&batch))
-                if batch.get() == NULL:
-                    break
-                batches.push_back(batch)
-
-            check_status(CTable.FromRecordBatches(batches, &table))
-
-        return pyarrow_wrap_table(table)
-
-
-cdef class _RecordBatchFileWriter(_RecordBatchWriter):
-
-    def _open(self, sink, Schema schema):
-        cdef shared_ptr[CRecordBatchFileWriter] writer
-        get_writer(sink, &self.sink)
-
-        with nogil:
-            check_status(
-                CRecordBatchFileWriter.Open(self.sink.get(), schema.sp_schema,
-                                      &writer))
-
-        # Cast to base class, because has same interface
-        self.writer = <shared_ptr[CRecordBatchWriter]> writer
-        self.closed = False
-
-
-cdef class _RecordBatchFileReader:
-    cdef:
-        shared_ptr[CRecordBatchFileReader] reader
-
-    cdef readonly:
-        Schema schema
-
-    def __cinit__(self):
-        pass
-
-    def _open(self, source, footer_offset=None):
-        cdef shared_ptr[RandomAccessFile] reader
-        get_reader(source, &reader)
-
-        cdef int64_t offset = 0
-        if footer_offset is not None:
-            offset = footer_offset
-
-        with nogil:
-            if offset != 0:
-                check_status(CRecordBatchFileReader.Open2(
-                    reader, offset, &self.reader))
-            else:
-                check_status(CRecordBatchFileReader.Open(reader, &self.reader))
-
-        self.schema = pyarrow_wrap_schema(self.reader.get().schema())
-
-    property num_record_batches:
-
-        def __get__(self):
-            return self.reader.get().num_record_batches()
-
-    def get_batch(self, int i):
-        cdef shared_ptr[CRecordBatch] batch
-
-        if i < 0 or i >= self.num_record_batches:
-            raise ValueError('Batch number {0} out of range'.format(i))
-
-        with nogil:
-            check_status(self.reader.get().GetRecordBatch(i, &batch))
-
-        return pyarrow_wrap_batch(batch)
-
-    # TODO(wesm): ARROW-503: Function was renamed. Remove after a period of
-    # time has passed
-    get_record_batch = get_batch
-
-    def read_all(self):
-        """
-        Read all record batches as a pyarrow.Table
-        """
-        cdef:
-            vector[shared_ptr[CRecordBatch]] batches
-            shared_ptr[CTable] table
-            int i, nbatches
-
-        nbatches = self.num_record_batches
-
-        batches.resize(nbatches)
-        with nogil:
-            for i in range(nbatches):
-                check_status(self.reader.get().GetRecordBatch(i, &batches[i]))
-            check_status(CTable.FromRecordBatches(batches, &table))
-
-        return pyarrow_wrap_table(table)
-
-
-#----------------------------------------------------------------------
-# Implement legacy Feather file format
-
-
-class FeatherError(Exception):
-    pass
-
-
-cdef class FeatherWriter:
-    cdef:
-        unique_ptr[CFeatherWriter] writer
-
-    cdef public:
-        int64_t num_rows
-
-    def __cinit__(self):
-        self.num_rows = -1
-
-    def open(self, object dest):
-        cdef shared_ptr[OutputStream] sink
-        get_writer(dest, &sink)
-
-        with nogil:
-            check_status(CFeatherWriter.Open(sink, &self.writer))
-
-    def close(self):
-        if self.num_rows < 0:
-            self.num_rows = 0
-        self.writer.get().SetNumRows(self.num_rows)
-        check_status(self.writer.get().Finalize())
-
-    def write_array(self, object name, object col, object mask=None):
-        cdef Array arr
-
-        if self.num_rows >= 0:
-            if len(col) != self.num_rows:
-                raise ValueError('prior column had a different number of rows')
-        else:
-            self.num_rows = len(col)
-
-        if isinstance(col, Array):
-            arr = col
-        else:
-            arr = Array.from_pandas(col, mask=mask)
-
-        cdef c_string c_name = tobytes(name)
-
-        with nogil:
-            check_status(
-                self.writer.get().Append(c_name, deref(arr.sp_array)))
-
-
-cdef class FeatherReader:
-    cdef:
-        unique_ptr[CFeatherReader] reader
-
-    def __cinit__(self):
-        pass
-
-    def open(self, source):
-        cdef shared_ptr[RandomAccessFile] reader
-        get_reader(source, &reader)
-
-        with nogil:
-            check_status(CFeatherReader.Open(reader, &self.reader))
-
-    property num_rows:
-
-        def __get__(self):
-            return self.reader.get().num_rows()
-
-    property num_columns:
-
-        def __get__(self):
-            return self.reader.get().num_columns()
-
-    def get_column_name(self, int i):
-        cdef c_string name = self.reader.get().GetColumnName(i)
-        return frombytes(name)
-
-    def get_column(self, int i):
-        if i < 0 or i >= self.num_columns:
-            raise IndexError(i)
-
-        cdef shared_ptr[CColumn] sp_column
-        with nogil:
-            check_status(self.reader.get()
-                         .GetColumn(i, &sp_column))
-
-        cdef Column col = Column()
-        col.init(sp_column)
-        return col
-
-
-def get_tensor_size(Tensor tensor):
-    """
-    Return total size of serialized Tensor including metadata and padding
-    """
-    cdef int64_t size
-    with nogil:
-        check_status(GetTensorSize(deref(tensor.tp), &size))
-    return size
-
-
-def get_record_batch_size(RecordBatch batch):
-    """
-    Return total size of serialized RecordBatch including metadata and padding
-    """
-    cdef int64_t size
-    with nogil:
-        check_status(GetRecordBatchSize(deref(batch.batch), &size))
-    return size
-
-
-def write_tensor(Tensor tensor, NativeFile dest):
-    """
-    Write pyarrow.Tensor to pyarrow.NativeFile object its current position
-
-    Parameters
-    ----------
-    tensor : pyarrow.Tensor
-    dest : pyarrow.NativeFile
-
-    Returns
-    -------
-    bytes_written : int
-        Total number of bytes written to the file
-    """
-    cdef:
-        int32_t metadata_length
-        int64_t body_length
-
-    dest._assert_writeable()
-
-    with nogil:
-        check_status(
-            WriteTensor(deref(tensor.tp), dest.wr_file.get(),
-                        &metadata_length, &body_length))
-
-    return metadata_length + body_length
-
-
-def read_tensor(NativeFile source):
-    """
-    Read pyarrow.Tensor from pyarrow.NativeFile object from current
-    position. If the file source supports zero copy (e.g. a memory map), then
-    this operation does not allocate any memory
-
-    Parameters
-    ----------
-    source : pyarrow.NativeFile
-
-    Returns
-    -------
-    tensor : Tensor
-    """
-    cdef:
-        shared_ptr[CTensor] sp_tensor
-
-    source._assert_readable()
-
-    cdef int64_t offset = source.tell()
-    with nogil:
-        check_status(ReadTensor(offset, source.rd_file.get(), &sp_tensor))
-
-    return pyarrow_wrap_tensor(sp_tensor)