You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2017/01/21 16:11:13 UTC

arrow git commit: ARROW-495: [C++] Implement streaming binary format, refactoring

Repository: arrow
Updated Branches:
  refs/heads/master 8ca7033fc -> 5888e10cf


ARROW-495: [C++] Implement streaming binary format, refactoring

cc @nongli

Author: Wes McKinney <we...@twosigma.com>

Closes #293 from wesm/ARROW-495 and squashes the following commits:

279583b [Wes McKinney] FileBlock is a struct
c88e61a [Wes McKinney] Fix Python bindings after API changes
645a329 [Wes McKinney] Install stream.h
21378b4 [Wes McKinney] Collapse BaseStreamWriter and StreamWriter
b6c4578 [Wes McKinney] clang-format
12eb2cb [Wes McKinney] Add unit tests for streaming format, fix EOS, metadata length padding issues
3200b17 [Wes McKinney] Implement StreamReader
69fe82e [Wes McKinney] Implement rough draft of StreamWriter, share code with FileWriter


Project: http://git-wip-us.apache.org/repos/asf/arrow/repo
Commit: http://git-wip-us.apache.org/repos/asf/arrow/commit/5888e10c
Tree: http://git-wip-us.apache.org/repos/asf/arrow/tree/5888e10c
Diff: http://git-wip-us.apache.org/repos/asf/arrow/diff/5888e10c

Branch: refs/heads/master
Commit: 5888e10cffac222e359d1b440b4684d16c061085
Parents: 8ca7033
Author: Wes McKinney <we...@twosigma.com>
Authored: Sat Jan 21 11:11:06 2017 -0500
Committer: Wes McKinney <we...@twosigma.com>
Committed: Sat Jan 21 11:11:06 2017 -0500

----------------------------------------------------------------------
 cpp/CMakeLists.txt                         |   1 -
 cpp/src/arrow/io/memory.cc                 |   4 +-
 cpp/src/arrow/ipc/CMakeLists.txt           |   2 +
 cpp/src/arrow/ipc/adapter.cc               |  44 ++---
 cpp/src/arrow/ipc/adapter.h                |  11 +-
 cpp/src/arrow/ipc/file.cc                  | 167 +++++++++++++------
 cpp/src/arrow/ipc/file.h                   |  54 ++++---
 cpp/src/arrow/ipc/ipc-adapter-test.cc      |  16 +-
 cpp/src/arrow/ipc/ipc-file-test.cc         | 188 +++++++++++++++++----
 cpp/src/arrow/ipc/ipc-json-test.cc         |   5 +-
 cpp/src/arrow/ipc/ipc-metadata-test.cc     |  83 +---------
 cpp/src/arrow/ipc/json-integration-test.cc |   4 +-
 cpp/src/arrow/ipc/json.cc                  |  19 +--
 cpp/src/arrow/ipc/json.h                   |   3 +-
 cpp/src/arrow/ipc/metadata-internal.cc     |   8 +-
 cpp/src/arrow/ipc/metadata-internal.h      |   4 +-
 cpp/src/arrow/ipc/metadata.cc              | 121 +-------------
 cpp/src/arrow/ipc/metadata.h               |  32 +---
 cpp/src/arrow/ipc/stream.cc                | 206 ++++++++++++++++++++++++
 cpp/src/arrow/ipc/stream.h                 | 112 +++++++++++++
 cpp/src/arrow/ipc/test-common.h            |   9 ++
 python/pyarrow/includes/libarrow_ipc.pxd   |   3 +-
 python/pyarrow/ipc.pyx                     |   5 +-
 python/src/pyarrow/adapters/pandas.cc      |  34 ++--
 24 files changed, 718 insertions(+), 417 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/arrow/blob/5888e10c/cpp/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt
index 885ab19..9039ffb 100644
--- a/cpp/CMakeLists.txt
+++ b/cpp/CMakeLists.txt
@@ -90,7 +90,6 @@ if("${CMAKE_SOURCE_DIR}" STREQUAL "${CMAKE_CURRENT_SOURCE_DIR}")
   option(ARROW_ALTIVEC
     "Build Arrow with Altivec"
     ON)
-
 endif()
 
 if(NOT ARROW_BUILD_TESTS)

http://git-wip-us.apache.org/repos/asf/arrow/blob/5888e10c/cpp/src/arrow/io/memory.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/io/memory.cc b/cpp/src/arrow/io/memory.cc
index 0f5a0dc..1339a99 100644
--- a/cpp/src/arrow/io/memory.cc
+++ b/cpp/src/arrow/io/memory.cc
@@ -116,13 +116,13 @@ Status BufferReader::Read(int64_t nbytes, int64_t* bytes_read, uint8_t* buffer)
 Status BufferReader::Read(int64_t nbytes, std::shared_ptr<Buffer>* out) {
   int64_t size = std::min(nbytes, size_ - position_);
 
-  if (buffer_ != nullptr) {
+  if (size > 0 && buffer_ != nullptr) {
     *out = SliceBuffer(buffer_, position_, size);
   } else {
     *out = std::make_shared<Buffer>(data_ + position_, size);
   }
 
-  position_ += nbytes;
+  position_ += size;
   return Status::OK();
 }
 

http://git-wip-us.apache.org/repos/asf/arrow/blob/5888e10c/cpp/src/arrow/ipc/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/CMakeLists.txt b/cpp/src/arrow/ipc/CMakeLists.txt
index b7ac5f0..c047f53 100644
--- a/cpp/src/arrow/ipc/CMakeLists.txt
+++ b/cpp/src/arrow/ipc/CMakeLists.txt
@@ -46,6 +46,7 @@ set(ARROW_IPC_SRCS
   json-internal.cc
   metadata.cc
   metadata-internal.cc
+  stream.cc
 )
 
 if(NOT APPLE)
@@ -151,6 +152,7 @@ install(FILES
   file.h
   json.h
   metadata.h
+  stream.h
   DESTINATION include/arrow/ipc)
 
 # pkg-config support

http://git-wip-us.apache.org/repos/asf/arrow/blob/5888e10c/cpp/src/arrow/ipc/adapter.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/adapter.cc b/cpp/src/arrow/ipc/adapter.cc
index 7b4d18c..9da7b39 100644
--- a/cpp/src/arrow/ipc/adapter.cc
+++ b/cpp/src/arrow/ipc/adapter.cc
@@ -49,10 +49,9 @@ namespace ipc {
 
 class RecordBatchWriter : public ArrayVisitor {
  public:
-  RecordBatchWriter(const std::vector<std::shared_ptr<Array>>& columns, int32_t num_rows,
-      int64_t buffer_start_offset, int max_recursion_depth)
-      : columns_(columns),
-        num_rows_(num_rows),
+  RecordBatchWriter(
+      const RecordBatch& batch, int64_t buffer_start_offset, int max_recursion_depth)
+      : batch_(batch),
         max_recursion_depth_(max_recursion_depth),
         buffer_start_offset_(buffer_start_offset) {}
 
@@ -79,8 +78,8 @@ class RecordBatchWriter : public ArrayVisitor {
     }
 
     // Perform depth-first traversal of the row-batch
-    for (size_t i = 0; i < columns_.size(); ++i) {
-      RETURN_NOT_OK(VisitArray(*columns_[i].get()));
+    for (int i = 0; i < batch_.num_columns(); ++i) {
+      RETURN_NOT_OK(VisitArray(*batch_.column(i)));
     }
 
     // The position for the start of a buffer relative to the passed frame of
@@ -126,18 +125,23 @@ class RecordBatchWriter : public ArrayVisitor {
     // itself as an int32_t.
     std::shared_ptr<Buffer> metadata_fb;
     RETURN_NOT_OK(WriteRecordBatchMetadata(
-        num_rows_, body_length, field_nodes_, buffer_meta_, &metadata_fb));
+        batch_.num_rows(), body_length, field_nodes_, buffer_meta_, &metadata_fb));
 
     // Need to write 4 bytes (metadata size), the metadata, plus padding to
-    // fall on an 8-byte offset
-    int64_t padded_metadata_length = BitUtil::CeilByte(metadata_fb->size() + 4);
+    // end on an 8-byte offset
+    int64_t start_offset;
+    RETURN_NOT_OK(dst->Tell(&start_offset));
+
+    int64_t padded_metadata_length = metadata_fb->size() + 4;
+    const int remainder = (padded_metadata_length + start_offset) % 8;
+    if (remainder != 0) { padded_metadata_length += 8 - remainder; }
 
     // The returned metadata size includes the length prefix, the flatbuffer,
     // plus padding
     *metadata_length = static_cast<int32_t>(padded_metadata_length);
 
-    // Write the flatbuffer size prefix
-    int32_t flatbuffer_size = metadata_fb->size();
+    // Write the flatbuffer size prefix including padding
+    int32_t flatbuffer_size = padded_metadata_length - 4;
     RETURN_NOT_OK(
         dst->Write(reinterpret_cast<const uint8_t*>(&flatbuffer_size), sizeof(int32_t)));
 
@@ -294,9 +298,7 @@ class RecordBatchWriter : public ArrayVisitor {
     return Status::OK();
   }
 
-  // Do not copy this vector. Ownership must be retained elsewhere
-  const std::vector<std::shared_ptr<Array>>& columns_;
-  int32_t num_rows_;
+  const RecordBatch& batch_;
 
   std::vector<flatbuf::FieldNode> field_nodes_;
   std::vector<flatbuf::Buffer> buffer_meta_;
@@ -306,18 +308,16 @@ class RecordBatchWriter : public ArrayVisitor {
   int64_t buffer_start_offset_;
 };
 
-Status WriteRecordBatch(const std::vector<std::shared_ptr<Array>>& columns,
-    int32_t num_rows, int64_t buffer_start_offset, io::OutputStream* dst,
-    int32_t* metadata_length, int64_t* body_length, int max_recursion_depth) {
+Status WriteRecordBatch(const RecordBatch& batch, int64_t buffer_start_offset,
+    io::OutputStream* dst, int32_t* metadata_length, int64_t* body_length,
+    int max_recursion_depth) {
   DCHECK_GT(max_recursion_depth, 0);
-  RecordBatchWriter serializer(
-      columns, num_rows, buffer_start_offset, max_recursion_depth);
+  RecordBatchWriter serializer(batch, buffer_start_offset, max_recursion_depth);
   return serializer.Write(dst, metadata_length, body_length);
 }
 
-Status GetRecordBatchSize(const RecordBatch* batch, int64_t* size) {
-  RecordBatchWriter serializer(
-      batch->columns(), batch->num_rows(), 0, kMaxIpcRecursionDepth);
+Status GetRecordBatchSize(const RecordBatch& batch, int64_t* size) {
+  RecordBatchWriter serializer(batch, 0, kMaxIpcRecursionDepth);
   RETURN_NOT_OK(serializer.GetTotalSize(size));
   return Status::OK();
 }

http://git-wip-us.apache.org/repos/asf/arrow/blob/5888e10c/cpp/src/arrow/ipc/adapter.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/adapter.h b/cpp/src/arrow/ipc/adapter.h
index 963b9ee..f9ef7d9 100644
--- a/cpp/src/arrow/ipc/adapter.h
+++ b/cpp/src/arrow/ipc/adapter.h
@@ -71,17 +71,14 @@ constexpr int kMaxIpcRecursionDepth = 64;
 //
 // @param(out) body_length: the size of the contiguous buffer block plus
 // padding bytes
-ARROW_EXPORT Status WriteRecordBatch(const std::vector<std::shared_ptr<Array>>& columns,
-    int32_t num_rows, int64_t buffer_start_offset, io::OutputStream* dst,
-    int32_t* metadata_length, int64_t* body_length,
-    int max_recursion_depth = kMaxIpcRecursionDepth);
-
-// int64_t GetRecordBatchMetadata(const RecordBatch* batch);
+ARROW_EXPORT Status WriteRecordBatch(const RecordBatch& batch,
+    int64_t buffer_start_offset, io::OutputStream* dst, int32_t* metadata_length,
+    int64_t* body_length, int max_recursion_depth = kMaxIpcRecursionDepth);
 
 // Compute the precise number of bytes needed in a contiguous memory segment to
 // write the record batch. This involves generating the complete serialized
 // Flatbuffers metadata.
-ARROW_EXPORT Status GetRecordBatchSize(const RecordBatch* batch, int64_t* size);
+ARROW_EXPORT Status GetRecordBatchSize(const RecordBatch& batch, int64_t* size);
 
 // ----------------------------------------------------------------------
 // "Read" path; does not copy data if the input supports zero copy reads

http://git-wip-us.apache.org/repos/asf/arrow/blob/5888e10c/cpp/src/arrow/ipc/file.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/file.cc b/cpp/src/arrow/ipc/file.cc
index d7d2e61..bc086e3 100644
--- a/cpp/src/arrow/ipc/file.cc
+++ b/cpp/src/arrow/ipc/file.cc
@@ -26,6 +26,7 @@
 #include "arrow/io/interfaces.h"
 #include "arrow/io/memory.h"
 #include "arrow/ipc/adapter.h"
+#include "arrow/ipc/metadata-internal.h"
 #include "arrow/ipc/metadata.h"
 #include "arrow/ipc/util.h"
 #include "arrow/status.h"
@@ -35,82 +36,154 @@ namespace arrow {
 namespace ipc {
 
 static constexpr const char* kArrowMagicBytes = "ARROW1";
-
 // ----------------------------------------------------------------------
-// Writer implementation
+// File footer
+
+static flatbuffers::Offset<flatbuffers::Vector<const flatbuf::Block*>>
+FileBlocksToFlatbuffer(FBB& fbb, const std::vector<FileBlock>& blocks) {
+  std::vector<flatbuf::Block> fb_blocks;
 
-FileWriter::FileWriter(io::OutputStream* sink, const std::shared_ptr<Schema>& schema)
-    : sink_(sink), schema_(schema), position_(-1), started_(false) {}
+  for (const FileBlock& block : blocks) {
+    fb_blocks.emplace_back(block.offset, block.metadata_length, block.body_length);
+  }
 
-Status FileWriter::UpdatePosition() {
-  return sink_->Tell(&position_);
+  return fbb.CreateVectorOfStructs(fb_blocks);
 }
 
-Status FileWriter::Open(io::OutputStream* sink, const std::shared_ptr<Schema>& schema,
-    std::shared_ptr<FileWriter>* out) {
-  *out = std::shared_ptr<FileWriter>(new FileWriter(sink, schema));  // ctor is private
-  RETURN_NOT_OK((*out)->UpdatePosition());
-  return Status::OK();
+Status WriteFileFooter(const Schema& schema, const std::vector<FileBlock>& dictionaries,
+    const std::vector<FileBlock>& record_batches, io::OutputStream* out) {
+  FBB fbb;
+
+  flatbuffers::Offset<flatbuf::Schema> fb_schema;
+  RETURN_NOT_OK(SchemaToFlatbuffer(fbb, schema, &fb_schema));
+
+  auto fb_dictionaries = FileBlocksToFlatbuffer(fbb, dictionaries);
+  auto fb_record_batches = FileBlocksToFlatbuffer(fbb, record_batches);
+
+  auto footer = flatbuf::CreateFooter(
+      fbb, kMetadataVersion, fb_schema, fb_dictionaries, fb_record_batches);
+
+  fbb.Finish(footer);
+
+  int32_t size = fbb.GetSize();
+
+  return out->Write(fbb.GetBufferPointer(), size);
 }
 
-Status FileWriter::Write(const uint8_t* data, int64_t nbytes) {
-  RETURN_NOT_OK(sink_->Write(data, nbytes));
-  position_ += nbytes;
-  return Status::OK();
+static inline FileBlock FileBlockFromFlatbuffer(const flatbuf::Block* block) {
+  return FileBlock(block->offset(), block->metaDataLength(), block->bodyLength());
 }
 
-Status FileWriter::Align() {
-  int64_t remainder = PaddedLength(position_) - position_;
-  if (remainder > 0) { return Write(kPaddingBytes, remainder); }
+class FileFooter::FileFooterImpl {
+ public:
+  FileFooterImpl(const std::shared_ptr<Buffer>& buffer, const flatbuf::Footer* footer)
+      : buffer_(buffer), footer_(footer) {}
+
+  int num_dictionaries() const { return footer_->dictionaries()->size(); }
+
+  int num_record_batches() const { return footer_->recordBatches()->size(); }
+
+  MetadataVersion::type version() const {
+    switch (footer_->version()) {
+      case flatbuf::MetadataVersion_V1:
+        return MetadataVersion::V1;
+      case flatbuf::MetadataVersion_V2:
+        return MetadataVersion::V2;
+      // Add cases as other versions become available
+      default:
+        return MetadataVersion::V2;
+    }
+  }
+
+  FileBlock record_batch(int i) const {
+    return FileBlockFromFlatbuffer(footer_->recordBatches()->Get(i));
+  }
+
+  FileBlock dictionary(int i) const {
+    return FileBlockFromFlatbuffer(footer_->dictionaries()->Get(i));
+  }
+
+  Status GetSchema(std::shared_ptr<Schema>* out) const {
+    auto schema_msg = std::make_shared<SchemaMetadata>(nullptr, footer_->schema());
+    return schema_msg->GetSchema(out);
+  }
+
+ private:
+  // Retain reference to memory
+  std::shared_ptr<Buffer> buffer_;
+
+  const flatbuf::Footer* footer_;
+};
+
+FileFooter::FileFooter() {}
+
+FileFooter::~FileFooter() {}
+
+Status FileFooter::Open(
+    const std::shared_ptr<Buffer>& buffer, std::unique_ptr<FileFooter>* out) {
+  const flatbuf::Footer* footer = flatbuf::GetFooter(buffer->data());
+
+  *out = std::unique_ptr<FileFooter>(new FileFooter());
+
+  // TODO(wesm): Verify the footer
+  (*out)->impl_.reset(new FileFooterImpl(buffer, footer));
+
   return Status::OK();
 }
 
-Status FileWriter::WriteAligned(const uint8_t* data, int64_t nbytes) {
-  RETURN_NOT_OK(Write(data, nbytes));
-  return Align();
+int FileFooter::num_dictionaries() const {
+  return impl_->num_dictionaries();
 }
 
-Status FileWriter::Start() {
-  RETURN_NOT_OK(WriteAligned(
-      reinterpret_cast<const uint8_t*>(kArrowMagicBytes), strlen(kArrowMagicBytes)));
-  started_ = true;
-  return Status::OK();
+int FileFooter::num_record_batches() const {
+  return impl_->num_record_batches();
 }
 
-Status FileWriter::CheckStarted() {
-  if (!started_) { return Start(); }
-  return Status::OK();
+MetadataVersion::type FileFooter::version() const {
+  return impl_->version();
 }
 
-Status FileWriter::WriteRecordBatch(
-    const std::vector<std::shared_ptr<Array>>& columns, int32_t num_rows) {
-  RETURN_NOT_OK(CheckStarted());
-
-  int64_t offset = position_;
+FileBlock FileFooter::record_batch(int i) const {
+  return impl_->record_batch(i);
+}
 
-  // There may be padding ever the end of the metadata, so we cannot rely on
-  // position_
-  int32_t metadata_length;
-  int64_t body_length;
+FileBlock FileFooter::dictionary(int i) const {
+  return impl_->dictionary(i);
+}
 
-  // Frame of reference in file format is 0, see ARROW-384
-  const int64_t buffer_start_offset = 0;
-  RETURN_NOT_OK(arrow::ipc::WriteRecordBatch(
-      columns, num_rows, buffer_start_offset, sink_, &metadata_length, &body_length));
-  RETURN_NOT_OK(UpdatePosition());
+Status FileFooter::GetSchema(std::shared_ptr<Schema>* out) const {
+  return impl_->GetSchema(out);
+}
 
-  DCHECK(position_ % 8 == 0) << "ipc::WriteRecordBatch did not perform aligned writes";
+// ----------------------------------------------------------------------
+// File writer implementation
 
-  // Append metadata, to be written in the footer later
-  record_batches_.emplace_back(offset, metadata_length, body_length);
+Status FileWriter::Open(io::OutputStream* sink, const std::shared_ptr<Schema>& schema,
+    std::shared_ptr<FileWriter>* out) {
+  *out = std::shared_ptr<FileWriter>(new FileWriter(sink, schema));  // ctor is private
+  RETURN_NOT_OK((*out)->UpdatePosition());
+  return Status::OK();
+}
 
+Status FileWriter::Start() {
+  RETURN_NOT_OK(WriteAligned(
+      reinterpret_cast<const uint8_t*>(kArrowMagicBytes), strlen(kArrowMagicBytes)));
+  started_ = true;
   return Status::OK();
 }
 
+Status FileWriter::WriteRecordBatch(const RecordBatch& batch) {
+  // Push an empty FileBlock
+  // Append metadata, to be written in the footer later
+  record_batches_.emplace_back(0, 0, 0);
+  return StreamWriter::WriteRecordBatch(
+      batch, &record_batches_[record_batches_.size() - 1]);
+}
+
 Status FileWriter::Close() {
   // Write metadata
   int64_t initial_position = position_;
-  RETURN_NOT_OK(WriteFileFooter(schema_.get(), dictionaries_, record_batches_, sink_));
+  RETURN_NOT_OK(WriteFileFooter(*schema_, dictionaries_, record_batches_, sink_));
   RETURN_NOT_OK(UpdatePosition());
 
   // Write footer length

http://git-wip-us.apache.org/repos/asf/arrow/blob/5888e10c/cpp/src/arrow/ipc/file.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/file.h b/cpp/src/arrow/ipc/file.h
index 4f35c37..7696954 100644
--- a/cpp/src/arrow/ipc/file.h
+++ b/cpp/src/arrow/ipc/file.h
@@ -25,13 +25,12 @@
 #include <vector>
 
 #include "arrow/ipc/metadata.h"
+#include "arrow/ipc/stream.h"
 #include "arrow/util/visibility.h"
 
 namespace arrow {
 
-class Array;
 class Buffer;
-struct Field;
 class RecordBatch;
 class Schema;
 class Status;
@@ -45,40 +44,43 @@ class ReadableFileInterface;
 
 namespace ipc {
 
-class ARROW_EXPORT FileWriter {
- public:
-  static Status Open(io::OutputStream* sink, const std::shared_ptr<Schema>& schema,
-      std::shared_ptr<FileWriter>* out);
+Status WriteFileFooter(const Schema& schema, const std::vector<FileBlock>& dictionaries,
+    const std::vector<FileBlock>& record_batches, io::OutputStream* out);
 
-  // TODO(wesm): Write dictionaries
+class ARROW_EXPORT FileFooter {
+ public:
+  ~FileFooter();
 
-  Status WriteRecordBatch(
-      const std::vector<std::shared_ptr<Array>>& columns, int32_t num_rows);
+  static Status Open(
+      const std::shared_ptr<Buffer>& buffer, std::unique_ptr<FileFooter>* out);
 
-  Status Close();
+  int num_dictionaries() const;
+  int num_record_batches() const;
+  MetadataVersion::type version() const;
 
- private:
-  FileWriter(io::OutputStream* sink, const std::shared_ptr<Schema>& schema);
+  FileBlock record_batch(int i) const;
+  FileBlock dictionary(int i) const;
 
-  Status CheckStarted();
-  Status Start();
+  Status GetSchema(std::shared_ptr<Schema>* out) const;
 
-  Status UpdatePosition();
+ private:
+  FileFooter();
+  class FileFooterImpl;
+  std::unique_ptr<FileFooterImpl> impl_;
+};
 
-  // Adds padding bytes if necessary to ensure all memory blocks are written on
-  // 8-byte boundaries.
-  Status Align();
+class ARROW_EXPORT FileWriter : public StreamWriter {
+ public:
+  static Status Open(io::OutputStream* sink, const std::shared_ptr<Schema>& schema,
+      std::shared_ptr<FileWriter>* out);
 
-  // Write data and update position
-  Status Write(const uint8_t* data, int64_t nbytes);
+  Status WriteRecordBatch(const RecordBatch& batch) override;
+  Status Close() override;
 
-  // Write and align
-  Status WriteAligned(const uint8_t* data, int64_t nbytes);
+ private:
+  using StreamWriter::StreamWriter;
 
-  io::OutputStream* sink_;
-  std::shared_ptr<Schema> schema_;
-  int64_t position_;
-  bool started_;
+  Status Start() override;
 
   std::vector<FileBlock> dictionaries_;
   std::vector<FileBlock> record_batches_;

http://git-wip-us.apache.org/repos/asf/arrow/blob/5888e10c/cpp/src/arrow/ipc/ipc-adapter-test.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/ipc-adapter-test.cc b/cpp/src/arrow/ipc/ipc-adapter-test.cc
index 6ba0a6e..17868f8 100644
--- a/cpp/src/arrow/ipc/ipc-adapter-test.cc
+++ b/cpp/src/arrow/ipc/ipc-adapter-test.cc
@@ -55,8 +55,8 @@ class TestWriteRecordBatch : public ::testing::TestWithParam<MakeRecordBatch*>,
 
     const int64_t buffer_offset = 0;
 
-    RETURN_NOT_OK(WriteRecordBatch(batch.columns(), batch.num_rows(), buffer_offset,
-        mmap_.get(), &metadata_length, &body_length));
+    RETURN_NOT_OK(WriteRecordBatch(
+        batch, buffer_offset, mmap_.get(), &metadata_length, &body_length));
 
     std::shared_ptr<RecordBatchMetadata> metadata;
     RETURN_NOT_OK(ReadRecordBatchMetadata(0, metadata_length, mmap_.get(), &metadata));
@@ -102,9 +102,8 @@ void TestGetRecordBatchSize(std::shared_ptr<RecordBatch> batch) {
   int32_t mock_metadata_length = -1;
   int64_t mock_body_length = -1;
   int64_t size = -1;
-  ASSERT_OK(WriteRecordBatch(batch->columns(), batch->num_rows(), 0, &mock,
-      &mock_metadata_length, &mock_body_length));
-  ASSERT_OK(GetRecordBatchSize(batch.get(), &size));
+  ASSERT_OK(WriteRecordBatch(*batch, 0, &mock, &mock_metadata_length, &mock_body_length));
+  ASSERT_OK(GetRecordBatchSize(*batch, &size));
   ASSERT_EQ(mock.GetExtentBytesWritten(), size);
 }
 
@@ -157,11 +156,10 @@ class RecursionLimits : public ::testing::Test, public io::MemoryMapFixture {
     io::MemoryMapFixture::InitMemoryMap(memory_map_size, path, &mmap_);
 
     if (override_level) {
-      return WriteRecordBatch(batch->columns(), batch->num_rows(), 0, mmap_.get(),
-          metadata_length, body_length, recursion_level + 1);
+      return WriteRecordBatch(
+          *batch, 0, mmap_.get(), metadata_length, body_length, recursion_level + 1);
     } else {
-      return WriteRecordBatch(batch->columns(), batch->num_rows(), 0, mmap_.get(),
-          metadata_length, body_length);
+      return WriteRecordBatch(*batch, 0, mmap_.get(), metadata_length, body_length);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/arrow/blob/5888e10c/cpp/src/arrow/ipc/ipc-file-test.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/ipc-file-test.cc b/cpp/src/arrow/ipc/ipc-file-test.cc
index 0a9f677..15ceb80 100644
--- a/cpp/src/arrow/ipc/ipc-file-test.cc
+++ b/cpp/src/arrow/ipc/ipc-file-test.cc
@@ -29,6 +29,7 @@
 #include "arrow/io/test-common.h"
 #include "arrow/ipc/adapter.h"
 #include "arrow/ipc/file.h"
+#include "arrow/ipc/stream.h"
 #include "arrow/ipc/test-common.h"
 #include "arrow/ipc/util.h"
 
@@ -41,6 +42,19 @@
 namespace arrow {
 namespace ipc {
 
+void CompareBatch(const RecordBatch& left, const RecordBatch& right) {
+  ASSERT_TRUE(left.schema()->Equals(right.schema()));
+  ASSERT_EQ(left.num_columns(), right.num_columns())
+      << left.schema()->ToString() << " result: " << right.schema()->ToString();
+  EXPECT_EQ(left.num_rows(), right.num_rows());
+  for (int i = 0; i < left.num_columns(); ++i) {
+    EXPECT_TRUE(left.column(i)->Equals(right.column(i)))
+        << "Idx: " << i << " Name: " << left.column_name(i);
+  }
+}
+
+using BatchVector = std::vector<std::shared_ptr<RecordBatch>>;
+
 class TestFileFormat : public ::testing::TestWithParam<MakeRecordBatch*> {
  public:
   void SetUp() {
@@ -50,43 +64,94 @@ class TestFileFormat : public ::testing::TestWithParam<MakeRecordBatch*> {
   }
   void TearDown() {}
 
-  Status RoundTripHelper(
-      const RecordBatch& batch, std::vector<std::shared_ptr<RecordBatch>>* out_batches) {
+  Status RoundTripHelper(const BatchVector& in_batches, BatchVector* out_batches) {
     // Write the file
-    RETURN_NOT_OK(FileWriter::Open(sink_.get(), batch.schema(), &file_writer_));
-    int num_batches = 3;
-    for (int i = 0; i < num_batches; ++i) {
-      RETURN_NOT_OK(file_writer_->WriteRecordBatch(batch.columns(), batch.num_rows()));
+    std::shared_ptr<FileWriter> writer;
+    RETURN_NOT_OK(FileWriter::Open(sink_.get(), in_batches[0]->schema(), &writer));
+
+    const int num_batches = static_cast<int>(in_batches.size());
+
+    for (const auto& batch : in_batches) {
+      RETURN_NOT_OK(writer->WriteRecordBatch(*batch));
     }
-    RETURN_NOT_OK(file_writer_->Close());
+    RETURN_NOT_OK(writer->Close());
 
     // Current offset into stream is the end of the file
     int64_t footer_offset;
     RETURN_NOT_OK(sink_->Tell(&footer_offset));
 
     // Open the file
-    auto reader = std::make_shared<io::BufferReader>(buffer_);
-    RETURN_NOT_OK(FileReader::Open(reader, footer_offset, &file_reader_));
+    auto buf_reader = std::make_shared<io::BufferReader>(buffer_);
+    std::shared_ptr<FileReader> reader;
+    RETURN_NOT_OK(FileReader::Open(buf_reader, footer_offset, &reader));
 
-    EXPECT_EQ(num_batches, file_reader_->num_record_batches());
-
-    out_batches->resize(num_batches);
+    EXPECT_EQ(num_batches, reader->num_record_batches());
     for (int i = 0; i < num_batches; ++i) {
-      RETURN_NOT_OK(file_reader_->GetRecordBatch(i, &(*out_batches)[i]));
+      std::shared_ptr<RecordBatch> chunk;
+      RETURN_NOT_OK(reader->GetRecordBatch(i, &chunk));
+      out_batches->emplace_back(chunk);
     }
 
     return Status::OK();
   }
 
-  void CompareBatch(const RecordBatch* left, const RecordBatch* right) {
-    ASSERT_TRUE(left->schema()->Equals(right->schema()));
-    ASSERT_EQ(left->num_columns(), right->num_columns())
-        << left->schema()->ToString() << " result: " << right->schema()->ToString();
-    EXPECT_EQ(left->num_rows(), right->num_rows());
-    for (int i = 0; i < left->num_columns(); ++i) {
-      EXPECT_TRUE(left->column(i)->Equals(right->column(i)))
-          << "Idx: " << i << " Name: " << left->column_name(i);
+ protected:
+  MemoryPool* pool_;
+
+  std::unique_ptr<io::BufferOutputStream> sink_;
+  std::shared_ptr<PoolBuffer> buffer_;
+};
+
+TEST_P(TestFileFormat, RoundTrip) {
+  std::shared_ptr<RecordBatch> batch1;
+  std::shared_ptr<RecordBatch> batch2;
+  ASSERT_OK((*GetParam())(&batch1));  // NOLINT clang-tidy gtest issue
+  ASSERT_OK((*GetParam())(&batch2));  // NOLINT clang-tidy gtest issue
+
+  std::vector<std::shared_ptr<RecordBatch>> in_batches = {batch1, batch2};
+  std::vector<std::shared_ptr<RecordBatch>> out_batches;
+
+  ASSERT_OK(RoundTripHelper(in_batches, &out_batches));
+
+  // Compare batches
+  for (size_t i = 0; i < in_batches.size(); ++i) {
+    CompareBatch(*in_batches[i], *out_batches[i]);
+  }
+}
+
+class TestStreamFormat : public ::testing::TestWithParam<MakeRecordBatch*> {
+ public:
+  void SetUp() {
+    pool_ = default_memory_pool();
+    buffer_ = std::make_shared<PoolBuffer>(pool_);
+    sink_.reset(new io::BufferOutputStream(buffer_));
+  }
+  void TearDown() {}
+
+  Status RoundTripHelper(
+      const RecordBatch& batch, std::vector<std::shared_ptr<RecordBatch>>* out_batches) {
+    // Write the file
+    std::shared_ptr<StreamWriter> writer;
+    RETURN_NOT_OK(StreamWriter::Open(sink_.get(), batch.schema(), &writer));
+    int num_batches = 5;
+    for (int i = 0; i < num_batches; ++i) {
+      RETURN_NOT_OK(writer->WriteRecordBatch(batch));
+    }
+    RETURN_NOT_OK(writer->Close());
+
+    // Open the file
+    auto buf_reader = std::make_shared<io::BufferReader>(buffer_);
+
+    std::shared_ptr<StreamReader> reader;
+    RETURN_NOT_OK(StreamReader::Open(buf_reader, &reader));
+
+    std::shared_ptr<RecordBatch> chunk;
+    while (true) {
+      RETURN_NOT_OK(reader->GetNextRecordBatch(&chunk));
+      if (chunk == nullptr) { break; }
+      out_batches->emplace_back(chunk);
     }
+    return Status::OK();
   }
 
  protected:
@@ -94,12 +159,9 @@ class TestFileFormat : public ::testing::TestWithParam<MakeRecordBatch*> {
 
   std::unique_ptr<io::BufferOutputStream> sink_;
   std::shared_ptr<PoolBuffer> buffer_;
-
-  std::shared_ptr<FileWriter> file_writer_;
-  std::shared_ptr<FileReader> file_reader_;
 };
 
-TEST_P(TestFileFormat, RoundTrip) {
+TEST_P(TestStreamFormat, RoundTrip) {
   std::shared_ptr<RecordBatch> batch;
   ASSERT_OK((*GetParam())(&batch));  // NOLINT clang-tidy gtest issue
 
@@ -109,14 +171,80 @@ TEST_P(TestFileFormat, RoundTrip) {
 
   // Compare batches. Same
   for (size_t i = 0; i < out_batches.size(); ++i) {
-    CompareBatch(batch.get(), out_batches[i].get());
+    CompareBatch(*batch, *out_batches[i]);
   }
 }
 
-INSTANTIATE_TEST_CASE_P(RoundTripTests, TestFileFormat,
-    ::testing::Values(&MakeIntRecordBatch, &MakeListRecordBatch, &MakeNonNullRecordBatch,
-                            &MakeZeroLengthRecordBatch, &MakeDeeplyNestedList,
-                            &MakeStringTypesRecordBatch, &MakeStruct));
+#define BATCH_CASES()                                                                   \
+  ::testing::Values(&MakeIntRecordBatch, &MakeListRecordBatch, &MakeNonNullRecordBatch, \
+      &MakeZeroLengthRecordBatch, &MakeDeeplyNestedList, &MakeStringTypesRecordBatch,   \
+      &MakeStruct);
+
+INSTANTIATE_TEST_CASE_P(FileRoundTripTests, TestFileFormat, BATCH_CASES());
+INSTANTIATE_TEST_CASE_P(StreamRoundTripTests, TestStreamFormat, BATCH_CASES());
+
+class TestFileFooter : public ::testing::Test {
+ public:
+  void SetUp() {}
+
+  void CheckRoundtrip(const Schema& schema, const std::vector<FileBlock>& dictionaries,
+      const std::vector<FileBlock>& record_batches) {
+    auto buffer = std::make_shared<PoolBuffer>();
+    io::BufferOutputStream stream(buffer);
+
+    ASSERT_OK(WriteFileFooter(schema, dictionaries, record_batches, &stream));
+
+    std::unique_ptr<FileFooter> footer;
+    ASSERT_OK(FileFooter::Open(buffer, &footer));
+
+    ASSERT_EQ(MetadataVersion::V2, footer->version());
+
+    // Check schema
+    std::shared_ptr<Schema> schema2;
+    ASSERT_OK(footer->GetSchema(&schema2));
+    AssertSchemaEqual(schema, *schema2);
+
+    // Check blocks
+    ASSERT_EQ(dictionaries.size(), footer->num_dictionaries());
+    ASSERT_EQ(record_batches.size(), footer->num_record_batches());
+
+    for (int i = 0; i < footer->num_dictionaries(); ++i) {
+      CheckBlocks(dictionaries[i], footer->dictionary(i));
+    }
+
+    for (int i = 0; i < footer->num_record_batches(); ++i) {
+      CheckBlocks(record_batches[i], footer->record_batch(i));
+    }
+  }
+
+  void CheckBlocks(const FileBlock& left, const FileBlock& right) {
+    ASSERT_EQ(left.offset, right.offset);
+    ASSERT_EQ(left.metadata_length, right.metadata_length);
+    ASSERT_EQ(left.body_length, right.body_length);
+  }
+
+ private:
+  std::shared_ptr<Schema> example_schema_;
+};
+
+TEST_F(TestFileFooter, Basics) {
+  auto f0 = std::make_shared<Field>("f0", std::make_shared<Int8Type>());
+  auto f1 = std::make_shared<Field>("f1", std::make_shared<Int16Type>());
+  Schema schema({f0, f1});
+
+  std::vector<FileBlock> dictionaries;
+  dictionaries.emplace_back(8, 92, 900);
+  dictionaries.emplace_back(1000, 100, 1900);
+  dictionaries.emplace_back(3000, 100, 2900);
+
+  std::vector<FileBlock> record_batches;
+  record_batches.emplace_back(6000, 100, 900);
+  record_batches.emplace_back(7000, 100, 1900);
+  record_batches.emplace_back(9000, 100, 2900);
+  record_batches.emplace_back(12000, 100, 3900);
+
+  CheckRoundtrip(schema, dictionaries, record_batches);
+}
 
 }  // namespace ipc
 }  // namespace arrow

http://git-wip-us.apache.org/repos/asf/arrow/blob/5888e10c/cpp/src/arrow/ipc/ipc-json-test.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/ipc-json-test.cc b/cpp/src/arrow/ipc/ipc-json-test.cc
index 0750989..30f968c 100644
--- a/cpp/src/arrow/ipc/ipc-json-test.cc
+++ b/cpp/src/arrow/ipc/ipc-json-test.cc
@@ -245,8 +245,9 @@ TEST(TestJsonFileReadWrite, BasicRoundTrip) {
     std::vector<std::shared_ptr<Array>> arrays;
 
     MakeBatchArrays(schema, num_rows, &arrays);
-    batches.emplace_back(std::make_shared<RecordBatch>(schema, num_rows, arrays));
-    ASSERT_OK(writer->WriteRecordBatch(arrays, num_rows));
+    auto batch = std::make_shared<RecordBatch>(schema, num_rows, arrays);
+    batches.push_back(batch);
+    ASSERT_OK(writer->WriteRecordBatch(*batch));
   }
 
   std::string result;

http://git-wip-us.apache.org/repos/asf/arrow/blob/5888e10c/cpp/src/arrow/ipc/ipc-metadata-test.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/ipc-metadata-test.cc b/cpp/src/arrow/ipc/ipc-metadata-test.cc
index 7c5744a..098f996 100644
--- a/cpp/src/arrow/ipc/ipc-metadata-test.cc
+++ b/cpp/src/arrow/ipc/ipc-metadata-test.cc
@@ -23,6 +23,7 @@
 
 #include "arrow/io/memory.h"
 #include "arrow/ipc/metadata.h"
+#include "arrow/ipc/test-common.h"
 #include "arrow/schema.h"
 #include "arrow/status.h"
 #include "arrow/test-util.h"
@@ -34,20 +35,11 @@ class Buffer;
 
 namespace ipc {
 
-static inline void assert_schema_equal(const Schema* lhs, const Schema* rhs) {
-  if (!lhs->Equals(*rhs)) {
-    std::stringstream ss;
-    ss << "left schema: " << lhs->ToString() << std::endl
-       << "right schema: " << rhs->ToString() << std::endl;
-    FAIL() << ss.str();
-  }
-}
-
 class TestSchemaMetadata : public ::testing::Test {
  public:
   void SetUp() {}
 
-  void CheckRoundtrip(const Schema* schema) {
+  void CheckRoundtrip(const Schema& schema) {
     std::shared_ptr<Buffer> buffer;
     ASSERT_OK(WriteSchema(schema, &buffer));
 
@@ -57,12 +49,12 @@ class TestSchemaMetadata : public ::testing::Test {
     ASSERT_EQ(Message::SCHEMA, message->type());
 
     auto schema_msg = std::make_shared<SchemaMetadata>(message);
-    ASSERT_EQ(schema->num_fields(), schema_msg->num_fields());
+    ASSERT_EQ(schema.num_fields(), schema_msg->num_fields());
 
     std::shared_ptr<Schema> schema2;
     ASSERT_OK(schema_msg->GetSchema(&schema2));
 
-    assert_schema_equal(schema, schema2.get());
+    AssertSchemaEqual(schema, *schema2);
   }
 };
 
@@ -82,7 +74,7 @@ TEST_F(TestSchemaMetadata, PrimitiveFields) {
   auto f10 = std::make_shared<Field>("f10", std::make_shared<BooleanType>());
 
   Schema schema({f0, f1, f2, f3, f4, f5, f6, f7, f8, f9, f10});
-  CheckRoundtrip(&schema);
+  CheckRoundtrip(schema);
 }
 
 TEST_F(TestSchemaMetadata, NestedFields) {
@@ -94,70 +86,7 @@ TEST_F(TestSchemaMetadata, NestedFields) {
   auto f1 = std::make_shared<Field>("f1", type2);
 
   Schema schema({f0, f1});
-  CheckRoundtrip(&schema);
-}
-
-class TestFileFooter : public ::testing::Test {
- public:
-  void SetUp() {}
-
-  void CheckRoundtrip(const Schema* schema, const std::vector<FileBlock>& dictionaries,
-      const std::vector<FileBlock>& record_batches) {
-    auto buffer = std::make_shared<PoolBuffer>();
-    io::BufferOutputStream stream(buffer);
-
-    ASSERT_OK(WriteFileFooter(schema, dictionaries, record_batches, &stream));
-
-    std::unique_ptr<FileFooter> footer;
-    ASSERT_OK(FileFooter::Open(buffer, &footer));
-
-    ASSERT_EQ(MetadataVersion::V2, footer->version());
-
-    // Check schema
-    std::shared_ptr<Schema> schema2;
-    ASSERT_OK(footer->GetSchema(&schema2));
-    assert_schema_equal(schema, schema2.get());
-
-    // Check blocks
-    ASSERT_EQ(dictionaries.size(), footer->num_dictionaries());
-    ASSERT_EQ(record_batches.size(), footer->num_record_batches());
-
-    for (int i = 0; i < footer->num_dictionaries(); ++i) {
-      CheckBlocks(dictionaries[i], footer->dictionary(i));
-    }
-
-    for (int i = 0; i < footer->num_record_batches(); ++i) {
-      CheckBlocks(record_batches[i], footer->record_batch(i));
-    }
-  }
-
-  void CheckBlocks(const FileBlock& left, const FileBlock& right) {
-    ASSERT_EQ(left.offset, right.offset);
-    ASSERT_EQ(left.metadata_length, right.metadata_length);
-    ASSERT_EQ(left.body_length, right.body_length);
-  }
-
- private:
-  std::shared_ptr<Schema> example_schema_;
-};
-
-TEST_F(TestFileFooter, Basics) {
-  auto f0 = std::make_shared<Field>("f0", std::make_shared<Int8Type>());
-  auto f1 = std::make_shared<Field>("f1", std::make_shared<Int16Type>());
-  Schema schema({f0, f1});
-
-  std::vector<FileBlock> dictionaries;
-  dictionaries.emplace_back(8, 92, 900);
-  dictionaries.emplace_back(1000, 100, 1900);
-  dictionaries.emplace_back(3000, 100, 2900);
-
-  std::vector<FileBlock> record_batches;
-  record_batches.emplace_back(6000, 100, 900);
-  record_batches.emplace_back(7000, 100, 1900);
-  record_batches.emplace_back(9000, 100, 2900);
-  record_batches.emplace_back(12000, 100, 3900);
-
-  CheckRoundtrip(&schema, dictionaries, record_batches);
+  CheckRoundtrip(schema);
 }
 
 }  // namespace ipc

http://git-wip-us.apache.org/repos/asf/arrow/blob/5888e10c/cpp/src/arrow/ipc/json-integration-test.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/json-integration-test.cc b/cpp/src/arrow/ipc/json-integration-test.cc
index 757e6c0..95bc742 100644
--- a/cpp/src/arrow/ipc/json-integration-test.cc
+++ b/cpp/src/arrow/ipc/json-integration-test.cc
@@ -81,7 +81,7 @@ static Status ConvertJsonToArrow(
   for (int i = 0; i < reader->num_record_batches(); ++i) {
     std::shared_ptr<RecordBatch> batch;
     RETURN_NOT_OK(reader->GetRecordBatch(i, &batch));
-    RETURN_NOT_OK(writer->WriteRecordBatch(batch->columns(), batch->num_rows()));
+    RETURN_NOT_OK(writer->WriteRecordBatch(*batch));
   }
   return writer->Close();
 }
@@ -108,7 +108,7 @@ static Status ConvertArrowToJson(
   for (int i = 0; i < reader->num_record_batches(); ++i) {
     std::shared_ptr<RecordBatch> batch;
     RETURN_NOT_OK(reader->GetRecordBatch(i, &batch));
-    RETURN_NOT_OK(writer->WriteRecordBatch(batch->columns(), batch->num_rows()));
+    RETURN_NOT_OK(writer->WriteRecordBatch(*batch));
   }
 
   std::string result;

http://git-wip-us.apache.org/repos/asf/arrow/blob/5888e10c/cpp/src/arrow/ipc/json.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/json.cc b/cpp/src/arrow/ipc/json.cc
index 6e3a993..773fb74 100644
--- a/cpp/src/arrow/ipc/json.cc
+++ b/cpp/src/arrow/ipc/json.cc
@@ -64,25 +64,23 @@ class JsonWriter::JsonWriterImpl {
     return Status::OK();
   }
 
-  Status WriteRecordBatch(
-      const std::vector<std::shared_ptr<Array>>& columns, int32_t num_rows) {
-    DCHECK_EQ(static_cast<int>(columns.size()), schema_->num_fields());
+  Status WriteRecordBatch(const RecordBatch& batch) {
+    DCHECK_EQ(batch.num_columns(), schema_->num_fields());
 
     writer_->StartObject();
     writer_->Key("count");
-    writer_->Int(num_rows);
+    writer_->Int(batch.num_rows());
 
     writer_->Key("columns");
     writer_->StartArray();
 
     for (int i = 0; i < schema_->num_fields(); ++i) {
-      const std::shared_ptr<Array>& column = columns[i];
+      const std::shared_ptr<Array>& column = batch.column(i);
 
-      DCHECK_EQ(num_rows, column->length())
+      DCHECK_EQ(batch.num_rows(), column->length())
           << "Array length did not match record batch length";
 
-      RETURN_NOT_OK(
-          WriteJsonArray(schema_->field(i)->name, *column.get(), writer_.get()));
+      RETURN_NOT_OK(WriteJsonArray(schema_->field(i)->name, *column, writer_.get()));
     }
 
     writer_->EndArray();
@@ -113,9 +111,8 @@ Status JsonWriter::Finish(std::string* result) {
   return impl_->Finish(result);
 }
 
-Status JsonWriter::WriteRecordBatch(
-    const std::vector<std::shared_ptr<Array>>& columns, int32_t num_rows) {
-  return impl_->WriteRecordBatch(columns, num_rows);
+Status JsonWriter::WriteRecordBatch(const RecordBatch& batch) {
+  return impl_->WriteRecordBatch(batch);
 }
 
 // ----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/arrow/blob/5888e10c/cpp/src/arrow/ipc/json.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/json.h b/cpp/src/arrow/ipc/json.h
index 7395be4..88afdfa 100644
--- a/cpp/src/arrow/ipc/json.h
+++ b/cpp/src/arrow/ipc/json.h
@@ -46,8 +46,7 @@ class ARROW_EXPORT JsonWriter {
 
   // TODO(wesm): Write dictionaries
 
-  Status WriteRecordBatch(
-      const std::vector<std::shared_ptr<Array>>& columns, int32_t num_rows);
+  Status WriteRecordBatch(const RecordBatch& batch);
 
   Status Finish(std::string* result);
 

http://git-wip-us.apache.org/repos/asf/arrow/blob/5888e10c/cpp/src/arrow/ipc/metadata-internal.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/metadata-internal.cc b/cpp/src/arrow/ipc/metadata-internal.cc
index cc160c4..cd77220 100644
--- a/cpp/src/arrow/ipc/metadata-internal.cc
+++ b/cpp/src/arrow/ipc/metadata-internal.cc
@@ -282,10 +282,10 @@ flatbuf::Endianness endianness() {
 }
 
 Status SchemaToFlatbuffer(
-    FBB& fbb, const Schema* schema, flatbuffers::Offset<flatbuf::Schema>* out) {
+    FBB& fbb, const Schema& schema, flatbuffers::Offset<flatbuf::Schema>* out) {
   std::vector<FieldOffset> field_offsets;
-  for (int i = 0; i < schema->num_fields(); ++i) {
-    std::shared_ptr<Field> field = schema->field(i);
+  for (int i = 0; i < schema.num_fields(); ++i) {
+    std::shared_ptr<Field> field = schema.field(i);
     FieldOffset offset;
     RETURN_NOT_OK(FieldToFlatbuffer(fbb, field, &offset));
     field_offsets.push_back(offset);
@@ -295,7 +295,7 @@ Status SchemaToFlatbuffer(
   return Status::OK();
 }
 
-Status MessageBuilder::SetSchema(const Schema* schema) {
+Status MessageBuilder::SetSchema(const Schema& schema) {
   flatbuffers::Offset<flatbuf::Schema> fb_schema;
   RETURN_NOT_OK(SchemaToFlatbuffer(fbb_, schema, &fb_schema));
 

http://git-wip-us.apache.org/repos/asf/arrow/blob/5888e10c/cpp/src/arrow/ipc/metadata-internal.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/metadata-internal.h b/cpp/src/arrow/ipc/metadata-internal.h
index 4826ebe..d94a8ab 100644
--- a/cpp/src/arrow/ipc/metadata-internal.h
+++ b/cpp/src/arrow/ipc/metadata-internal.h
@@ -49,11 +49,11 @@ static constexpr flatbuf::MetadataVersion kMetadataVersion = flatbuf::MetadataVe
 Status FieldFromFlatbuffer(const flatbuf::Field* field, std::shared_ptr<Field>* out);
 
 Status SchemaToFlatbuffer(
-    FBB& fbb, const Schema* schema, flatbuffers::Offset<flatbuf::Schema>* out);
+    FBB& fbb, const Schema& schema, flatbuffers::Offset<flatbuf::Schema>* out);
 
 class MessageBuilder {
  public:
-  Status SetSchema(const Schema* schema);
+  Status SetSchema(const Schema& schema);
 
   Status SetRecordBatch(int32_t length, int64_t body_length,
       const std::vector<flatbuf::FieldNode>& nodes,

http://git-wip-us.apache.org/repos/asf/arrow/blob/5888e10c/cpp/src/arrow/ipc/metadata.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/metadata.cc b/cpp/src/arrow/ipc/metadata.cc
index f0674ff..a97965c 100644
--- a/cpp/src/arrow/ipc/metadata.cc
+++ b/cpp/src/arrow/ipc/metadata.cc
@@ -38,7 +38,7 @@ namespace flatbuf = org::apache::arrow::flatbuf;
 
 namespace ipc {
 
-Status WriteSchema(const Schema* schema, std::shared_ptr<Buffer>* out) {
+Status WriteSchema(const Schema& schema, std::shared_ptr<Buffer>* out) {
   MessageBuilder message;
   RETURN_NOT_OK(message.SetSchema(schema));
   RETURN_NOT_OK(message.Finish());
@@ -232,124 +232,5 @@ int RecordBatchMetadata::num_fields() const {
   return impl_->num_fields();
 }
 
-// ----------------------------------------------------------------------
-// File footer
-
-static flatbuffers::Offset<flatbuffers::Vector<const flatbuf::Block*>>
-FileBlocksToFlatbuffer(FBB& fbb, const std::vector<FileBlock>& blocks) {
-  std::vector<flatbuf::Block> fb_blocks;
-
-  for (const FileBlock& block : blocks) {
-    fb_blocks.emplace_back(block.offset, block.metadata_length, block.body_length);
-  }
-
-  return fbb.CreateVectorOfStructs(fb_blocks);
-}
-
-Status WriteFileFooter(const Schema* schema, const std::vector<FileBlock>& dictionaries,
-    const std::vector<FileBlock>& record_batches, io::OutputStream* out) {
-  FBB fbb;
-
-  flatbuffers::Offset<flatbuf::Schema> fb_schema;
-  RETURN_NOT_OK(SchemaToFlatbuffer(fbb, schema, &fb_schema));
-
-  auto fb_dictionaries = FileBlocksToFlatbuffer(fbb, dictionaries);
-  auto fb_record_batches = FileBlocksToFlatbuffer(fbb, record_batches);
-
-  auto footer = flatbuf::CreateFooter(
-      fbb, kMetadataVersion, fb_schema, fb_dictionaries, fb_record_batches);
-
-  fbb.Finish(footer);
-
-  int32_t size = fbb.GetSize();
-
-  return out->Write(fbb.GetBufferPointer(), size);
-}
-
-static inline FileBlock FileBlockFromFlatbuffer(const flatbuf::Block* block) {
-  return FileBlock(block->offset(), block->metaDataLength(), block->bodyLength());
-}
-
-class FileFooter::FileFooterImpl {
- public:
-  FileFooterImpl(const std::shared_ptr<Buffer>& buffer, const flatbuf::Footer* footer)
-      : buffer_(buffer), footer_(footer) {}
-
-  int num_dictionaries() const { return footer_->dictionaries()->size(); }
-
-  int num_record_batches() const { return footer_->recordBatches()->size(); }
-
-  MetadataVersion::type version() const {
-    switch (footer_->version()) {
-      case flatbuf::MetadataVersion_V1:
-        return MetadataVersion::V1;
-      case flatbuf::MetadataVersion_V2:
-        return MetadataVersion::V2;
-      // Add cases as other versions become available
-      default:
-        return MetadataVersion::V2;
-    }
-  }
-
-  FileBlock record_batch(int i) const {
-    return FileBlockFromFlatbuffer(footer_->recordBatches()->Get(i));
-  }
-
-  FileBlock dictionary(int i) const {
-    return FileBlockFromFlatbuffer(footer_->dictionaries()->Get(i));
-  }
-
-  Status GetSchema(std::shared_ptr<Schema>* out) const {
-    auto schema_msg = std::make_shared<SchemaMetadata>(nullptr, footer_->schema());
-    return schema_msg->GetSchema(out);
-  }
-
- private:
-  // Retain reference to memory
-  std::shared_ptr<Buffer> buffer_;
-
-  const flatbuf::Footer* footer_;
-};
-
-FileFooter::FileFooter() {}
-
-FileFooter::~FileFooter() {}
-
-Status FileFooter::Open(
-    const std::shared_ptr<Buffer>& buffer, std::unique_ptr<FileFooter>* out) {
-  const flatbuf::Footer* footer = flatbuf::GetFooter(buffer->data());
-
-  *out = std::unique_ptr<FileFooter>(new FileFooter());
-
-  // TODO(wesm): Verify the footer
-  (*out)->impl_.reset(new FileFooterImpl(buffer, footer));
-
-  return Status::OK();
-}
-
-int FileFooter::num_dictionaries() const {
-  return impl_->num_dictionaries();
-}
-
-int FileFooter::num_record_batches() const {
-  return impl_->num_record_batches();
-}
-
-MetadataVersion::type FileFooter::version() const {
-  return impl_->version();
-}
-
-FileBlock FileFooter::record_batch(int i) const {
-  return impl_->record_batch(i);
-}
-
-FileBlock FileFooter::dictionary(int i) const {
-  return impl_->dictionary(i);
-}
-
-Status FileFooter::GetSchema(std::shared_ptr<Schema>* out) const {
-  return impl_->GetSchema(out);
-}
-
 }  // namespace ipc
 }  // namespace arrow

http://git-wip-us.apache.org/repos/asf/arrow/blob/5888e10c/cpp/src/arrow/ipc/metadata.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/metadata.h b/cpp/src/arrow/ipc/metadata.h
index 1c4ef64..6e15ef3 100644
--- a/cpp/src/arrow/ipc/metadata.h
+++ b/cpp/src/arrow/ipc/metadata.h
@@ -49,7 +49,7 @@ struct MetadataVersion {
 
 // Serialize arrow::Schema as a Flatbuffer
 ARROW_EXPORT
-Status WriteSchema(const Schema* schema, std::shared_ptr<Buffer>* out);
+Status WriteSchema(const Schema& schema, std::shared_ptr<Buffer>* out);
 
 // Read interface classes. We do not fully deserialize the flatbuffers so that
 // individual fields metadata can be retrieved from very large schema without
@@ -149,10 +149,8 @@ class ARROW_EXPORT Message {
   std::unique_ptr<MessageImpl> impl_;
 };
 
-// ----------------------------------------------------------------------
-// File footer for file-like representation
-
 struct FileBlock {
+  FileBlock() {}
   FileBlock(int64_t offset, int32_t metadata_length, int64_t body_length)
       : offset(offset), metadata_length(metadata_length), body_length(body_length) {}
 
@@ -161,32 +159,6 @@ struct FileBlock {
   int64_t body_length;
 };
 
-ARROW_EXPORT
-Status WriteFileFooter(const Schema* schema, const std::vector<FileBlock>& dictionaries,
-    const std::vector<FileBlock>& record_batches, io::OutputStream* out);
-
-class ARROW_EXPORT FileFooter {
- public:
-  ~FileFooter();
-
-  static Status Open(
-      const std::shared_ptr<Buffer>& buffer, std::unique_ptr<FileFooter>* out);
-
-  int num_dictionaries() const;
-  int num_record_batches() const;
-  MetadataVersion::type version() const;
-
-  FileBlock record_batch(int i) const;
-  FileBlock dictionary(int i) const;
-
-  Status GetSchema(std::shared_ptr<Schema>* out) const;
-
- private:
-  FileFooter();
-  class FileFooterImpl;
-  std::unique_ptr<FileFooterImpl> impl_;
-};
-
 }  // namespace ipc
 }  // namespace arrow
 

http://git-wip-us.apache.org/repos/asf/arrow/blob/5888e10c/cpp/src/arrow/ipc/stream.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/stream.cc b/cpp/src/arrow/ipc/stream.cc
new file mode 100644
index 0000000..a2ca672
--- /dev/null
+++ b/cpp/src/arrow/ipc/stream.cc
@@ -0,0 +1,206 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/ipc/stream.h"
+
+#include <cstdint>
+#include <cstring>
+#include <sstream>
+#include <vector>
+
+#include "arrow/buffer.h"
+#include "arrow/io/interfaces.h"
+#include "arrow/io/memory.h"
+#include "arrow/ipc/adapter.h"
+#include "arrow/ipc/metadata.h"
+#include "arrow/ipc/util.h"
+#include "arrow/schema.h"
+#include "arrow/status.h"
+#include "arrow/util/logging.h"
+
+namespace arrow {
+namespace ipc {
+
+// ----------------------------------------------------------------------
+// Stream writer implementation
+
+StreamWriter::~StreamWriter() {}
+
+StreamWriter::StreamWriter(io::OutputStream* sink, const std::shared_ptr<Schema>& schema)
+    : sink_(sink), schema_(schema), position_(-1), started_(false) {}
+
+Status StreamWriter::UpdatePosition() {
+  return sink_->Tell(&position_);
+}
+
+Status StreamWriter::Write(const uint8_t* data, int64_t nbytes) {
+  RETURN_NOT_OK(sink_->Write(data, nbytes));
+  position_ += nbytes;
+  return Status::OK();
+}
+
+Status StreamWriter::Align() {
+  int64_t remainder = PaddedLength(position_) - position_;
+  if (remainder > 0) { return Write(kPaddingBytes, remainder); }
+  return Status::OK();
+}
+
+Status StreamWriter::WriteAligned(const uint8_t* data, int64_t nbytes) {
+  RETURN_NOT_OK(Write(data, nbytes));
+  return Align();
+}
+
+Status StreamWriter::CheckStarted() {
+  if (!started_) { return Start(); }
+  return Status::OK();
+}
+
+Status StreamWriter::WriteRecordBatch(const RecordBatch& batch, FileBlock* block) {
+  RETURN_NOT_OK(CheckStarted());
+
+  block->offset = position_;
+
+  // Frame of reference in file format is 0, see ARROW-384
+  const int64_t buffer_start_offset = 0;
+  RETURN_NOT_OK(arrow::ipc::WriteRecordBatch(
+      batch, buffer_start_offset, sink_, &block->metadata_length, &block->body_length));
+  RETURN_NOT_OK(UpdatePosition());
+
+  DCHECK(position_ % 8 == 0) << "WriteRecordBatch did not perform aligned writes";
+
+  return Status::OK();
+}
+
+// ----------------------------------------------------------------------
+// StreamWriter implementation
+
+Status StreamWriter::Open(io::OutputStream* sink, const std::shared_ptr<Schema>& schema,
+    std::shared_ptr<StreamWriter>* out) {
+  // ctor is private
+  *out = std::shared_ptr<StreamWriter>(new StreamWriter(sink, schema));
+  RETURN_NOT_OK((*out)->UpdatePosition());
+  return Status::OK();
+}
+
+Status StreamWriter::Start() {
+  std::shared_ptr<Buffer> schema_fb;
+  RETURN_NOT_OK(WriteSchema(*schema_, &schema_fb));
+
+  int32_t flatbuffer_size = schema_fb->size();
+  RETURN_NOT_OK(
+      Write(reinterpret_cast<const uint8_t*>(&flatbuffer_size), sizeof(int32_t)));
+
+  // Write the flatbuffer
+  RETURN_NOT_OK(Write(schema_fb->data(), flatbuffer_size));
+  started_ = true;
+  return Status::OK();
+}
+
+Status StreamWriter::WriteRecordBatch(const RecordBatch& batch) {
+  // Pass FileBlock, but results not used
+  FileBlock dummy_block;
+  return WriteRecordBatch(batch, &dummy_block);
+}
+
+Status StreamWriter::Close() {
+  // Close the stream
+  RETURN_NOT_OK(CheckStarted());
+  return sink_->Close();
+}
+
+// ----------------------------------------------------------------------
+// StreamReader implementation
+
+StreamReader::StreamReader(const std::shared_ptr<io::InputStream>& stream)
+    : stream_(stream), schema_(nullptr) {}
+
+StreamReader::~StreamReader() {}
+
+Status StreamReader::Open(const std::shared_ptr<io::InputStream>& stream,
+    std::shared_ptr<StreamReader>* reader) {
+  // Private ctor
+  *reader = std::shared_ptr<StreamReader>(new StreamReader(stream));
+  return (*reader)->ReadSchema();
+}
+
+Status StreamReader::ReadSchema() {
+  std::shared_ptr<Message> message;
+  RETURN_NOT_OK(ReadNextMessage(&message));
+
+  if (message->type() != Message::SCHEMA) {
+    return Status::IOError("First message was not schema type");
+  }
+
+  SchemaMetadata schema_meta(message);
+
+  // TODO(wesm): If the schema contains dictionaries, we must read all the
+  // dictionaries from the stream before constructing the final Schema
+  return schema_meta.GetSchema(&schema_);
+}
+
+Status StreamReader::ReadNextMessage(std::shared_ptr<Message>* message) {
+  std::shared_ptr<Buffer> buffer;
+  RETURN_NOT_OK(stream_->Read(sizeof(int32_t), &buffer));
+
+  if (buffer->size() != sizeof(int32_t)) {
+    *message = nullptr;
+    return Status::OK();
+  }
+
+  int32_t message_length = *reinterpret_cast<const int32_t*>(buffer->data());
+
+  RETURN_NOT_OK(stream_->Read(message_length, &buffer));
+  if (buffer->size() != message_length) {
+    return Status::IOError("Unexpected end of stream trying to read message");
+  }
+  return Message::Open(buffer, 0, message);
+}
+
+std::shared_ptr<Schema> StreamReader::schema() const {
+  return schema_;
+}
+
+Status StreamReader::GetNextRecordBatch(std::shared_ptr<RecordBatch>* batch) {
+  std::shared_ptr<Message> message;
+  RETURN_NOT_OK(ReadNextMessage(&message));
+
+  if (message == nullptr) {
+    // End of stream
+    *batch = nullptr;
+    return Status::OK();
+  }
+
+  if (message->type() != Message::RECORD_BATCH) {
+    return Status::IOError("Metadata not record batch");
+  }
+
+  auto batch_metadata = std::make_shared<RecordBatchMetadata>(message);
+
+  std::shared_ptr<Buffer> batch_body;
+  RETURN_NOT_OK(stream_->Read(message->body_length(), &batch_body));
+
+  if (batch_body->size() < message->body_length()) {
+    return Status::IOError("Unexpected EOS when reading message body");
+  }
+
+  io::BufferReader reader(batch_body);
+
+  return ReadRecordBatch(batch_metadata, schema_, &reader, batch);
+}
+
+}  // namespace ipc
+}  // namespace arrow

http://git-wip-us.apache.org/repos/asf/arrow/blob/5888e10c/cpp/src/arrow/ipc/stream.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/stream.h b/cpp/src/arrow/ipc/stream.h
new file mode 100644
index 0000000..0b0e62f
--- /dev/null
+++ b/cpp/src/arrow/ipc/stream.h
@@ -0,0 +1,112 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Implement Arrow streaming binary format
+
+#ifndef ARROW_IPC_STREAM_H
+#define ARROW_IPC_STREAM_H
+
+#include <cstdint>
+#include <memory>
+
+#include "arrow/util/visibility.h"
+
+namespace arrow {
+
+class Array;
+class Buffer;
+struct Field;
+class RecordBatch;
+class Schema;
+class Status;
+
+namespace io {
+
+class InputStream;
+class OutputStream;
+
+}  // namespace io
+
+namespace ipc {
+
+struct FileBlock;
+class Message;
+
+class ARROW_EXPORT StreamWriter {
+ public:
+  virtual ~StreamWriter();
+
+  static Status Open(io::OutputStream* sink, const std::shared_ptr<Schema>& schema,
+      std::shared_ptr<StreamWriter>* out);
+
+  virtual Status WriteRecordBatch(const RecordBatch& batch);
+  virtual Status Close();
+
+ protected:
+  StreamWriter(io::OutputStream* sink, const std::shared_ptr<Schema>& schema);
+
+  virtual Status Start();
+
+  Status CheckStarted();
+  Status UpdatePosition();
+
+  Status WriteRecordBatch(const RecordBatch& batch, FileBlock* block);
+
+  // Adds padding bytes if necessary to ensure all memory blocks are written on
+  // 8-byte boundaries.
+  Status Align();
+
+  // Write data and update position
+  Status Write(const uint8_t* data, int64_t nbytes);
+
+  // Write and align
+  Status WriteAligned(const uint8_t* data, int64_t nbytes);
+
+  io::OutputStream* sink_;
+  std::shared_ptr<Schema> schema_;
+  int64_t position_;
+  bool started_;
+};
+
+class ARROW_EXPORT StreamReader {
+ public:
+  ~StreamReader();
+
+  // Open an stream.
+  static Status Open(const std::shared_ptr<io::InputStream>& stream,
+      std::shared_ptr<StreamReader>* reader);
+
+  std::shared_ptr<Schema> schema() const;
+
+  // Returned batch is nullptr when end of stream reached
+  Status GetNextRecordBatch(std::shared_ptr<RecordBatch>* batch);
+
+ private:
+  explicit StreamReader(const std::shared_ptr<io::InputStream>& stream);
+
+  Status ReadSchema();
+
+  Status ReadNextMessage(std::shared_ptr<Message>* message);
+
+  std::shared_ptr<io::InputStream> stream_;
+  std::shared_ptr<Schema> schema_;
+};
+
+}  // namespace ipc
+}  // namespace arrow
+
+#endif  // ARROW_IPC_STREAM_H

http://git-wip-us.apache.org/repos/asf/arrow/blob/5888e10c/cpp/src/arrow/ipc/test-common.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/test-common.h b/cpp/src/arrow/ipc/test-common.h
index 3faeebf..ca790de 100644
--- a/cpp/src/arrow/ipc/test-common.h
+++ b/cpp/src/arrow/ipc/test-common.h
@@ -36,6 +36,15 @@
 namespace arrow {
 namespace ipc {
 
+static inline void AssertSchemaEqual(const Schema& lhs, const Schema& rhs) {
+  if (!lhs.Equals(rhs)) {
+    std::stringstream ss;
+    ss << "left schema: " << lhs.ToString() << std::endl
+       << "right schema: " << rhs.ToString() << std::endl;
+    FAIL() << ss.str();
+  }
+}
+
 const auto kListInt32 = list(int32());
 const auto kListListInt32 = list(kListInt32);
 

http://git-wip-us.apache.org/repos/asf/arrow/blob/5888e10c/python/pyarrow/includes/libarrow_ipc.pxd
----------------------------------------------------------------------
diff --git a/python/pyarrow/includes/libarrow_ipc.pxd b/python/pyarrow/includes/libarrow_ipc.pxd
index b3185b1..8295760 100644
--- a/python/pyarrow/includes/libarrow_ipc.pxd
+++ b/python/pyarrow/includes/libarrow_ipc.pxd
@@ -29,8 +29,7 @@ cdef extern from "arrow/ipc/file.h" namespace "arrow::ipc" nogil:
         CStatus Open(OutputStream* sink, const shared_ptr[CSchema]& schema,
                      shared_ptr[CFileWriter]* out)
 
-        CStatus WriteRecordBatch(const vector[shared_ptr[CArray]]& columns,
-                                 int32_t num_rows)
+        CStatus WriteRecordBatch(const CRecordBatch& batch)
 
         CStatus Close()
 

http://git-wip-us.apache.org/repos/asf/arrow/blob/5888e10c/python/pyarrow/ipc.pyx
----------------------------------------------------------------------
diff --git a/python/pyarrow/ipc.pyx b/python/pyarrow/ipc.pyx
index abc5e1b..22069a7 100644
--- a/python/pyarrow/ipc.pyx
+++ b/python/pyarrow/ipc.pyx
@@ -21,6 +21,8 @@
 # distutils: language = c++
 # cython: embedsignature = True
 
+from cython.operator cimport dereference as deref
+
 from pyarrow.includes.libarrow cimport *
 from pyarrow.includes.libarrow_io cimport *
 from pyarrow.includes.libarrow_ipc cimport *
@@ -58,10 +60,9 @@ cdef class ArrowFileWriter:
             self.close()
 
     def write_record_batch(self, RecordBatch batch):
-        cdef CRecordBatch* bptr = batch.batch
         with nogil:
             check_status(self.writer.get()
-                         .WriteRecordBatch(bptr.columns(), bptr.num_rows()))
+                         .WriteRecordBatch(deref(batch.batch)))
 
     def close(self):
         with nogil:

http://git-wip-us.apache.org/repos/asf/arrow/blob/5888e10c/python/src/pyarrow/adapters/pandas.cc
----------------------------------------------------------------------
diff --git a/python/src/pyarrow/adapters/pandas.cc b/python/src/pyarrow/adapters/pandas.cc
index 6623e23..feafa3d 100644
--- a/python/src/pyarrow/adapters/pandas.cc
+++ b/python/src/pyarrow/adapters/pandas.cc
@@ -254,16 +254,16 @@ struct arrow_traits<Type::BOOL> {
   static constexpr bool is_numeric_nullable = false;
 };
 
-#define INT_DECL(TYPE)                                      \
-  template <>                                               \
-  struct arrow_traits<Type::TYPE> {                         \
-    static constexpr int npy_type = NPY_##TYPE;             \
-    static constexpr bool supports_nulls = false;           \
-    static constexpr double na_value = NAN;                 \
-    static constexpr bool is_boolean = false;               \
-    static constexpr bool is_numeric_not_nullable = true;   \
-    static constexpr bool is_numeric_nullable = false;      \
-    typedef typename npy_traits<NPY_##TYPE>::value_type T;  \
+#define INT_DECL(TYPE)                                     \
+  template <>                                              \
+  struct arrow_traits<Type::TYPE> {                        \
+    static constexpr int npy_type = NPY_##TYPE;            \
+    static constexpr bool supports_nulls = false;          \
+    static constexpr double na_value = NAN;                \
+    static constexpr bool is_boolean = false;              \
+    static constexpr bool is_numeric_not_nullable = true;  \
+    static constexpr bool is_numeric_nullable = false;     \
+    typedef typename npy_traits<NPY_##TYPE>::value_type T; \
   };
 
 INT_DECL(INT8);
@@ -1803,7 +1803,7 @@ class ArrowDeserializer {
   // types
 
   Status Convert(PyObject** out) {
-#define CONVERT_CASE(TYPE)                             \
+#define CONVERT_CASE(TYPE)                      \
   case Type::TYPE: {                            \
     RETURN_NOT_OK(ConvertValues<Type::TYPE>()); \
   } break;
@@ -1857,8 +1857,7 @@ class ArrowDeserializer {
   }
 
   template <int TYPE>
-  inline typename std::enable_if<TYPE == Type::DATE, Status>::type
-  ConvertValues() {
+  inline typename std::enable_if<TYPE == Type::DATE, Status>::type ConvertValues() {
     typedef typename arrow_traits<TYPE>::T T;
 
     RETURN_NOT_OK(AllocateOutput(arrow_traits<TYPE>::npy_type));
@@ -1910,24 +1909,21 @@ class ArrowDeserializer {
 
   // UTF8 strings
   template <int TYPE>
-  inline typename std::enable_if<TYPE == Type::STRING, Status>::type
-  ConvertValues() {
+  inline typename std::enable_if<TYPE == Type::STRING, Status>::type ConvertValues() {
     RETURN_NOT_OK(AllocateOutput(NPY_OBJECT));
     auto out_values = reinterpret_cast<PyObject**>(PyArray_DATA(arr_));
     return ConvertBinaryLike<arrow::StringArray>(data_, out_values);
   }
 
   template <int T2>
-  inline typename std::enable_if<T2 == Type::BINARY, Status>::type
-  ConvertValues() {
+  inline typename std::enable_if<T2 == Type::BINARY, Status>::type ConvertValues() {
     RETURN_NOT_OK(AllocateOutput(NPY_OBJECT));
     auto out_values = reinterpret_cast<PyObject**>(PyArray_DATA(arr_));
     return ConvertBinaryLike<arrow::BinaryArray>(data_, out_values);
   }
 
   template <int TYPE>
-  inline typename std::enable_if<TYPE == Type::DICTIONARY, Status>::type
-  ConvertValues() {
+  inline typename std::enable_if<TYPE == Type::DICTIONARY, Status>::type ConvertValues() {
     std::shared_ptr<PandasBlock> block;
     RETURN_NOT_OK(MakeCategoricalBlock(col_->type(), col_->length(), &block));
     RETURN_NOT_OK(block->Write(col_, 0, 0));