You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2016/11/29 02:29:28 UTC

[2/2] arrow git commit: ARROW-363: [Java/C++] integration testing harness, initial integration tests

ARROW-363: [Java/C++] integration testing harness, initial integration tests

This also includes format reconciliation as discussed in ARROW-384.

Author: Wes McKinney <we...@twosigma.com>

Closes #211 from wesm/ARROW-363 and squashes the following commits:

6982c3c [Wes McKinney] Permit end of buffer IPC reads if length is 0
4d46c8b [Wes McKinney] Fix logical error with offsets array in JsonFileWriter. Add broken string test case to simple.json
36ab5d6 [Wes McKinney] Increment MetadataVersion in flatbuffer
844257e [Wes McKinney] cpplint
a2711f2 [Wes McKinney] Address other format incompatibilities, write vectorLayout to Arrow metadata
13608ef [Wes McKinney] Relax 64 byte padding. Do not write RecordBatch embedded in Message for now
6a66fc8 [Wes McKinney] Write record batch size prefix in Java
72ea42c [Wes McKinney] Note that padding is 64-bytes at start of file (for now)
c2ffde4 [Wes McKinney] More notes about the file format
aef4382 [Wes McKinney] cpplint
85128f7 [Wes McKinney] Refactor IPC/File record batch read/write structure to reflect discussion in ARROW-384
dbd6ed6 [Wes McKinney] Do not embed metadata length in WriteDataHeader
c529d63 [Wes McKinney] Fix JSON integration test example to make it further
d806aa6 [Wes McKinney] Exclude JSON files from Apache RAT checks
a7e2d4b [Wes McKinney] Draft testing harness


Project: http://git-wip-us.apache.org/repos/asf/arrow/repo
Commit: http://git-wip-us.apache.org/repos/asf/arrow/commit/e3c167bd
Tree: http://git-wip-us.apache.org/repos/asf/arrow/tree/e3c167bd
Diff: http://git-wip-us.apache.org/repos/asf/arrow/diff/e3c167bd

Branch: refs/heads/master
Commit: e3c167bd101734f92c3a2be2eb7f56f1fba91e67
Parents: 86f56a6
Author: Wes McKinney <we...@twosigma.com>
Authored: Mon Nov 28 21:29:19 2016 -0500
Committer: Wes McKinney <we...@twosigma.com>
Committed: Mon Nov 28 21:29:19 2016 -0500

----------------------------------------------------------------------
 .gitignore                                      |  26 ++
 cpp/CMakeLists.txt                              |   1 -
 cpp/src/arrow/io/io-file-test.cc                |   2 +-
 cpp/src/arrow/io/memory.cc                      |  25 +-
 cpp/src/arrow/io/memory.h                       |   8 +-
 cpp/src/arrow/ipc/adapter.cc                    | 251 ++++++++++---------
 cpp/src/arrow/ipc/adapter.h                     |  65 ++---
 cpp/src/arrow/ipc/file.cc                       |  31 ++-
 cpp/src/arrow/ipc/ipc-adapter-test.cc           |  85 ++++---
 cpp/src/arrow/ipc/ipc-file-test.cc              |   2 +-
 cpp/src/arrow/ipc/ipc-json-test.cc              |  20 +-
 cpp/src/arrow/ipc/ipc-metadata-test.cc          |  12 +-
 cpp/src/arrow/ipc/json-integration-test.cc      |  30 ++-
 cpp/src/arrow/ipc/json-internal.cc              | 110 +++-----
 cpp/src/arrow/ipc/metadata-internal.cc          | 100 +++++---
 cpp/src/arrow/ipc/metadata-internal.h           |   6 +-
 cpp/src/arrow/ipc/metadata.cc                   | 115 +++++----
 cpp/src/arrow/ipc/metadata.h                    |  50 ++--
 cpp/src/arrow/ipc/test-common.h                 |  15 +-
 cpp/src/arrow/ipc/util.h                        |   6 +-
 cpp/src/arrow/test-util.h                       |   8 +-
 cpp/src/arrow/type.cc                           |  46 +++-
 cpp/src/arrow/type.h                            |  73 ++++--
 cpp/src/arrow/types/primitive.cc                |   2 +-
 cpp/src/arrow/util/bit-util.h                   |   4 +
 dev/release/run-rat.sh                          |   3 +-
 format/IPC.md                                   | 106 ++++++++
 format/Message.fbs                              |   3 +-
 integration/data/simple.json                    |  66 +++++
 integration/integration_test.py                 | 177 +++++++++++++
 java/pom.xml                                    |   6 +-
 java/tools/pom.xml                              |   6 +
 .../org/apache/arrow/tools/Integration.java     |   1 +
 .../org/apache/arrow/vector/VectorLoader.java   |   4 +-
 .../apache/arrow/vector/file/ArrowReader.java   |   6 +-
 .../apache/arrow/vector/file/ArrowWriter.java   |  23 +-
 .../arrow/vector/file/json/JsonFileReader.java  |   9 +-
 .../arrow/vector/file/json/JsonFileWriter.java  |   2 +-
 python/.gitignore                               |  10 -
 39 files changed, 1024 insertions(+), 491 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/arrow/blob/e3c167bd/.gitignore
----------------------------------------------------------------------
diff --git a/.gitignore b/.gitignore
new file mode 100644
index 0000000..a00cbba
--- /dev/null
+++ b/.gitignore
@@ -0,0 +1,26 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Compiled source
+*.a
+*.dll
+*.o
+*.py[ocd]
+*.so
+*.dylib
+.build_cache_dir
+MANIFEST

http://git-wip-us.apache.org/repos/asf/arrow/blob/e3c167bd/cpp/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt
index 0edb8ce..1a97008 100644
--- a/cpp/CMakeLists.txt
+++ b/cpp/CMakeLists.txt
@@ -528,7 +528,6 @@ if(ARROW_BUILD_TESTS)
     ExternalProject_Add(gflags_ep
       GIT_REPOSITORY https://github.com/gflags/gflags.git
       GIT_TAG cce68f0c9c5d054017425e6e6fd54f696d36e8ee
-      # URL "https://github.com/gflags/gflags/archive/v${GFLAGS_VERSION}.tar.gz"
       BUILD_IN_SOURCE 1
       CMAKE_ARGS -DCMAKE_BUILD_TYPE=${CMAKE_BUILD_TYPE}
                  -DCMAKE_INSTALL_PREFIX=${GFLAGS_PREFIX}

http://git-wip-us.apache.org/repos/asf/arrow/blob/e3c167bd/cpp/src/arrow/io/io-file-test.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/io/io-file-test.cc b/cpp/src/arrow/io/io-file-test.cc
index 54c21d2..fad49ce 100644
--- a/cpp/src/arrow/io/io-file-test.cc
+++ b/cpp/src/arrow/io/io-file-test.cc
@@ -19,7 +19,7 @@
 #include <cstdio>
 #include <cstring>
 #ifndef _MSC_VER
-# include <fcntl.h>
+#include <fcntl.h>
 #endif
 #include <fstream>
 #include <memory>

http://git-wip-us.apache.org/repos/asf/arrow/blob/e3c167bd/cpp/src/arrow/io/memory.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/io/memory.cc b/cpp/src/arrow/io/memory.cc
index 71b0f1e..af495e2 100644
--- a/cpp/src/arrow/io/memory.cc
+++ b/cpp/src/arrow/io/memory.cc
@@ -258,8 +258,11 @@ Status BufferOutputStream::Reserve(int64_t nbytes) {
 // ----------------------------------------------------------------------
 // In-memory buffer reader
 
-BufferReader::BufferReader(const uint8_t* buffer, int buffer_size)
-    : buffer_(buffer), buffer_size_(buffer_size), position_(0) {}
+BufferReader::BufferReader(const std::shared_ptr<Buffer>& buffer)
+    : buffer_(buffer), data_(buffer->data()), size_(buffer->size()), position_(0) {}
+
+BufferReader::BufferReader(const uint8_t* data, int64_t size)
+    : buffer_(nullptr), data_(data), size_(size), position_(0) {}
 
 BufferReader::~BufferReader() {}
 
@@ -278,26 +281,32 @@ bool BufferReader::supports_zero_copy() const {
 }
 
 Status BufferReader::Read(int64_t nbytes, int64_t* bytes_read, uint8_t* buffer) {
-  memcpy(buffer, buffer_ + position_, nbytes);
-  *bytes_read = std::min(nbytes, buffer_size_ - position_);
+  memcpy(buffer, data_ + position_, nbytes);
+  *bytes_read = std::min(nbytes, size_ - position_);
   position_ += *bytes_read;
   return Status::OK();
 }
 
 Status BufferReader::Read(int64_t nbytes, std::shared_ptr<Buffer>* out) {
-  int64_t size = std::min(nbytes, buffer_size_ - position_);
-  *out = std::make_shared<Buffer>(buffer_ + position_, size);
+  int64_t size = std::min(nbytes, size_ - position_);
+
+  if (buffer_ != nullptr) {
+    *out = SliceBuffer(buffer_, position_, size);
+  } else {
+    *out = std::make_shared<Buffer>(data_ + position_, size);
+  }
+
   position_ += nbytes;
   return Status::OK();
 }
 
 Status BufferReader::GetSize(int64_t* size) {
-  *size = buffer_size_;
+  *size = size_;
   return Status::OK();
 }
 
 Status BufferReader::Seek(int64_t position) {
-  if (position < 0 || position >= buffer_size_) {
+  if (position < 0 || position >= size_) {
     return Status::IOError("position out of bounds");
   }
 

http://git-wip-us.apache.org/repos/asf/arrow/blob/e3c167bd/cpp/src/arrow/io/memory.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/io/memory.h b/cpp/src/arrow/io/memory.h
index df2fe8d..b72f93b 100644
--- a/cpp/src/arrow/io/memory.h
+++ b/cpp/src/arrow/io/memory.h
@@ -99,7 +99,8 @@ class ARROW_EXPORT MemoryMappedFile : public ReadWriteFileInterface {
 
 class ARROW_EXPORT BufferReader : public ReadableFileInterface {
  public:
-  BufferReader(const uint8_t* buffer, int buffer_size);
+  explicit BufferReader(const std::shared_ptr<Buffer>& buffer);
+  BufferReader(const uint8_t* data, int64_t size);
   ~BufferReader();
 
   Status Close() override;
@@ -116,8 +117,9 @@ class ARROW_EXPORT BufferReader : public ReadableFileInterface {
   bool supports_zero_copy() const override;
 
  private:
-  const uint8_t* buffer_;
-  int buffer_size_;
+  std::shared_ptr<Buffer> buffer_;
+  const uint8_t* data_;
+  int64_t size_;
   int64_t position_;
 };
 

http://git-wip-us.apache.org/repos/asf/arrow/blob/e3c167bd/cpp/src/arrow/ipc/adapter.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/adapter.cc b/cpp/src/arrow/ipc/adapter.cc
index da718c0..edf716f 100644
--- a/cpp/src/arrow/ipc/adapter.cc
+++ b/cpp/src/arrow/ipc/adapter.cc
@@ -48,15 +48,6 @@ namespace flatbuf = org::apache::arrow::flatbuf;
 
 namespace ipc {
 
-namespace {
-Status CheckMultipleOf64(int64_t size) {
-  if (BitUtil::IsMultipleOf64(size)) { return Status::OK(); }
-  return Status::Invalid(
-      "Attempted to write a buffer that "
-      "wasn't a multiple of 64 bytes");
-}
-}
-
 static bool IsPrimitive(const DataType* type) {
   DCHECK(type != nullptr);
   switch (type->type) {
@@ -124,30 +115,30 @@ Status VisitArray(const Array* arr, std::vector<flatbuf::FieldNode>* field_nodes
 class RecordBatchWriter {
  public:
   RecordBatchWriter(const std::vector<std::shared_ptr<Array>>& columns, int32_t num_rows,
-      int max_recursion_depth)
+      int64_t buffer_start_offset, int max_recursion_depth)
       : columns_(&columns),
         num_rows_(num_rows),
+        buffer_start_offset_(buffer_start_offset),
         max_recursion_depth_(max_recursion_depth) {}
 
-  Status AssemblePayload() {
+  Status AssemblePayload(int64_t* body_length) {
+    if (field_nodes_.size() > 0) {
+      field_nodes_.clear();
+      buffer_meta_.clear();
+      buffers_.clear();
+    }
+
     // Perform depth-first traversal of the row-batch
     for (size_t i = 0; i < columns_->size(); ++i) {
       const Array* arr = (*columns_)[i].get();
       RETURN_NOT_OK(VisitArray(arr, &field_nodes_, &buffers_, max_recursion_depth_));
     }
-    return Status::OK();
-  }
 
-  Status Write(
-      io::OutputStream* dst, int64_t* body_end_offset, int64_t* header_end_offset) {
-    // Get the starting position
-    int64_t start_position;
-    RETURN_NOT_OK(dst->Tell(&start_position));
-
-    // Keep track of the current position so we can determine the size of the
-    // message body
-    int64_t position = start_position;
+    // The position for the start of a buffer relative to the passed frame of
+    // reference. May be 0 or some other position in an address space
+    int64_t offset = buffer_start_offset_;
 
+    // Construct the buffer metadata for the record batch header
     for (size_t i = 0; i < buffers_.size(); ++i) {
       const Buffer* buffer = buffers_[i].get();
       int64_t size = 0;
@@ -161,65 +152,103 @@ class RecordBatchWriter {
 
       // TODO(wesm): We currently have no notion of shared memory page id's,
       // but we've included it in the metadata IDL for when we have it in the
-      // future. Use page=0 for now
+      // future. Use page = -1 for now
       //
       // Note that page ids are a bespoke notion for Arrow and not a feature we
       // are using from any OS-level shared memory. The thought is that systems
       // may (in the future) associate integer page id's with physical memory
       // pages (according to whatever is the desired shared memory mechanism)
-      buffer_meta_.push_back(flatbuf::Buffer(0, position, size + padding));
-
-      if (size > 0) {
-        RETURN_NOT_OK(dst->Write(buffer->data(), size));
-        position += size;
-      }
-
-      if (padding > 0) {
-        RETURN_NOT_OK(dst->Write(kPaddingBytes, padding));
-        position += padding;
-      }
+      buffer_meta_.push_back(flatbuf::Buffer(-1, offset, size + padding));
+      offset += size + padding;
     }
 
-    *body_end_offset = position;
+    *body_length = offset - buffer_start_offset_;
+    DCHECK(BitUtil::IsMultipleOf64(*body_length));
+
+    return Status::OK();
+  }
 
+  Status WriteMetadata(
+      int64_t body_length, io::OutputStream* dst, int32_t* metadata_length) {
     // Now that we have computed the locations of all of the buffers in shared
     // memory, the data header can be converted to a flatbuffer and written out
     //
     // Note: The memory written here is prefixed by the size of the flatbuffer
-    // itself as an int32_t. On reading from a input, you will have to
-    // determine the data header size then request a buffer such that you can
-    // construct the flatbuffer data accessor object (see arrow::ipc::Message)
-    std::shared_ptr<Buffer> data_header;
-    RETURN_NOT_OK(WriteDataHeader(
-        num_rows_, position - start_position, field_nodes_, buffer_meta_, &data_header));
+    // itself as an int32_t.
+    std::shared_ptr<Buffer> metadata_fb;
+    RETURN_NOT_OK(WriteRecordBatchMetadata(
+        num_rows_, body_length, field_nodes_, buffer_meta_, &metadata_fb));
+
+    // Need to write 4 bytes (metadata size), the metadata, plus padding to
+    // fall on a 64-byte offset
+    int64_t padded_metadata_length =
+        BitUtil::RoundUpToMultipleOf64(metadata_fb->size() + 4);
+
+    // The returned metadata size includes the length prefix, the flatbuffer,
+    // plus padding
+    *metadata_length = padded_metadata_length;
 
-    // Write the data header at the end
-    RETURN_NOT_OK(dst->Write(data_header->data(), data_header->size()));
+    // Write the flatbuffer size prefix
+    int32_t flatbuffer_size = metadata_fb->size();
+    RETURN_NOT_OK(
+        dst->Write(reinterpret_cast<const uint8_t*>(&flatbuffer_size), sizeof(int32_t)));
 
-    position += data_header->size();
-    *header_end_offset = position;
+    // Write the flatbuffer
+    RETURN_NOT_OK(dst->Write(metadata_fb->data(), metadata_fb->size()));
 
-    return Align(dst, &position);
+    // Write any padding
+    int64_t padding = padded_metadata_length - metadata_fb->size() - 4;
+    if (padding > 0) { RETURN_NOT_OK(dst->Write(kPaddingBytes, padding)); }
+
+    return Status::OK();
   }
 
-  Status Align(io::OutputStream* dst, int64_t* position) {
-    // Write all buffers here on word boundaries
-    // TODO(wesm): Is there benefit to 64-byte padding in IPC?
-    int64_t remainder = PaddedLength(*position) - *position;
-    if (remainder > 0) {
-      RETURN_NOT_OK(dst->Write(kPaddingBytes, remainder));
-      *position += remainder;
+  Status Write(io::OutputStream* dst, int32_t* metadata_length, int64_t* body_length) {
+    RETURN_NOT_OK(AssemblePayload(body_length));
+
+#ifndef NDEBUG
+    int64_t start_position, current_position;
+    RETURN_NOT_OK(dst->Tell(&start_position));
+#endif
+
+    RETURN_NOT_OK(WriteMetadata(*body_length, dst, metadata_length));
+
+#ifndef NDEBUG
+    RETURN_NOT_OK(dst->Tell(&current_position));
+    DCHECK(BitUtil::IsMultipleOf8(current_position));
+#endif
+
+    // Now write the buffers
+    for (size_t i = 0; i < buffers_.size(); ++i) {
+      const Buffer* buffer = buffers_[i].get();
+      int64_t size = 0;
+      int64_t padding = 0;
+
+      // The buffer might be null if we are handling zero row lengths.
+      if (buffer) {
+        size = buffer->size();
+        padding = BitUtil::RoundUpToMultipleOf64(size) - size;
+      }
+
+      if (size > 0) { RETURN_NOT_OK(dst->Write(buffer->data(), size)); }
+
+      if (padding > 0) { RETURN_NOT_OK(dst->Write(kPaddingBytes, padding)); }
     }
+
+#ifndef NDEBUG
+    RETURN_NOT_OK(dst->Tell(&current_position));
+    DCHECK(BitUtil::IsMultipleOf8(current_position));
+#endif
+
     return Status::OK();
   }
 
-  // This must be called after invoking AssemblePayload
   Status GetTotalSize(int64_t* size) {
     // emulates the behavior of Write without actually writing
-    int64_t body_offset;
-    int64_t data_header_offset;
+    int32_t metadata_length;
+    int64_t body_length;
     MockOutputStream dst;
-    RETURN_NOT_OK(Write(&dst, &body_offset, &data_header_offset));
+    RETURN_NOT_OK(Write(&dst, &metadata_length, &body_length));
     *size = dst.GetExtentBytesWritten();
     return Status::OK();
   }
@@ -228,6 +257,7 @@ class RecordBatchWriter {
   // Do not copy this vector. Ownership must be retained elsewhere
   const std::vector<std::shared_ptr<Array>>* columns_;
   int32_t num_rows_;
+  int64_t buffer_start_offset_;
 
   std::vector<flatbuf::FieldNode> field_nodes_;
   std::vector<flatbuf::Buffer> buffer_meta_;
@@ -236,18 +266,17 @@ class RecordBatchWriter {
 };
 
 Status WriteRecordBatch(const std::vector<std::shared_ptr<Array>>& columns,
-    int32_t num_rows, io::OutputStream* dst, int64_t* body_end_offset,
-    int64_t* header_end_offset, int max_recursion_depth) {
+    int32_t num_rows, int64_t buffer_start_offset, io::OutputStream* dst,
+    int32_t* metadata_length, int64_t* body_length, int max_recursion_depth) {
   DCHECK_GT(max_recursion_depth, 0);
-  RecordBatchWriter serializer(columns, num_rows, max_recursion_depth);
-  RETURN_NOT_OK(serializer.AssemblePayload());
-  return serializer.Write(dst, body_end_offset, header_end_offset);
+  RecordBatchWriter serializer(
+      columns, num_rows, buffer_start_offset, max_recursion_depth);
+  return serializer.Write(dst, metadata_length, body_length);
 }
 
 Status GetRecordBatchSize(const RecordBatch* batch, int64_t* size) {
   RecordBatchWriter serializer(
-      batch->columns(), batch->num_rows(), kMaxIpcRecursionDepth);
-  RETURN_NOT_OK(serializer.AssemblePayload());
+      batch->columns(), batch->num_rows(), 0, kMaxIpcRecursionDepth);
   RETURN_NOT_OK(serializer.GetTotalSize(size));
   return Status::OK();
 }
@@ -255,30 +284,33 @@ Status GetRecordBatchSize(const RecordBatch* batch, int64_t* size) {
 // ----------------------------------------------------------------------
 // Record batch read path
 
-class RecordBatchReader::RecordBatchReaderImpl {
+class RecordBatchReader {
  public:
-  RecordBatchReaderImpl(io::ReadableFileInterface* file,
-      const std::shared_ptr<RecordBatchMessage>& metadata, int max_recursion_depth)
-      : file_(file), metadata_(metadata), max_recursion_depth_(max_recursion_depth) {
+  RecordBatchReader(const std::shared_ptr<RecordBatchMetadata>& metadata,
+      const std::shared_ptr<Schema>& schema, int max_recursion_depth,
+      io::ReadableFileInterface* file)
+      : metadata_(metadata),
+        schema_(schema),
+        max_recursion_depth_(max_recursion_depth),
+        file_(file) {
     num_buffers_ = metadata->num_buffers();
     num_flattened_fields_ = metadata->num_fields();
   }
 
-  Status AssembleBatch(
-      const std::shared_ptr<Schema>& schema, std::shared_ptr<RecordBatch>* out) {
-    std::vector<std::shared_ptr<Array>> arrays(schema->num_fields());
+  Status Read(std::shared_ptr<RecordBatch>* out) {
+    std::vector<std::shared_ptr<Array>> arrays(schema_->num_fields());
 
     // The field_index and buffer_index are incremented in NextArray based on
     // how much of the batch is "consumed" (through nested data reconstruction,
     // for example)
     field_index_ = 0;
     buffer_index_ = 0;
-    for (int i = 0; i < schema->num_fields(); ++i) {
-      const Field* field = schema->field(i).get();
+    for (int i = 0; i < schema_->num_fields(); ++i) {
+      const Field* field = schema_->field(i).get();
       RETURN_NOT_OK(NextArray(field, max_recursion_depth_, &arrays[i]));
     }
 
-    *out = std::make_shared<RecordBatch>(schema, metadata_->length(), arrays);
+    *out = std::make_shared<RecordBatch>(schema_, metadata_->length(), arrays);
     return Status::OK();
   }
 
@@ -370,67 +402,56 @@ class RecordBatchReader::RecordBatchReaderImpl {
 
   Status GetBuffer(int buffer_index, std::shared_ptr<Buffer>* out) {
     BufferMetadata metadata = metadata_->buffer(buffer_index);
-    RETURN_NOT_OK(CheckMultipleOf64(metadata.length));
-    return file_->ReadAt(metadata.offset, metadata.length, out);
+
+    if (metadata.length == 0) {
+      *out = std::make_shared<Buffer>(nullptr, 0);
+      return Status::OK();
+    } else {
+      return file_->ReadAt(metadata.offset, metadata.length, out);
+    }
   }
 
  private:
+  std::shared_ptr<RecordBatchMetadata> metadata_;
+  std::shared_ptr<Schema> schema_;
+  int max_recursion_depth_;
   io::ReadableFileInterface* file_;
-  std::shared_ptr<RecordBatchMessage> metadata_;
 
   int field_index_;
   int buffer_index_;
-  int max_recursion_depth_;
   int num_buffers_;
   int num_flattened_fields_;
 };
 
-Status RecordBatchReader::Open(io::ReadableFileInterface* file, int64_t offset,
-    std::shared_ptr<RecordBatchReader>* out) {
-  return Open(file, offset, kMaxIpcRecursionDepth, out);
-}
-
-Status RecordBatchReader::Open(io::ReadableFileInterface* file, int64_t offset,
-    int max_recursion_depth, std::shared_ptr<RecordBatchReader>* out) {
+Status ReadRecordBatchMetadata(int64_t offset, int32_t metadata_length,
+    io::ReadableFileInterface* file, std::shared_ptr<RecordBatchMetadata>* metadata) {
   std::shared_ptr<Buffer> buffer;
-  RETURN_NOT_OK(file->ReadAt(offset - sizeof(int32_t), sizeof(int32_t), &buffer));
-
-  int32_t metadata_size = *reinterpret_cast<const int32_t*>(buffer->data());
+  RETURN_NOT_OK(file->ReadAt(offset, metadata_length, &buffer));
 
-  if (metadata_size + static_cast<int>(sizeof(int32_t)) > offset) {
-    return Status::Invalid("metadata size invalid");
-  }
-
-  // Read the metadata
-  RETURN_NOT_OK(
-      file->ReadAt(offset - metadata_size - sizeof(int32_t), metadata_size, &buffer));
-
-  // TODO(wesm): buffer slicing here would be better in case ReadAt returns
-  // allocated memory
-
-  std::shared_ptr<Message> message;
-  RETURN_NOT_OK(Message::Open(buffer, &message));
+  int32_t flatbuffer_size = *reinterpret_cast<const int32_t*>(buffer->data());
 
-  if (message->type() != Message::RECORD_BATCH) {
-    return Status::Invalid("Metadata message is not a record batch");
+  if (flatbuffer_size + static_cast<int>(sizeof(int32_t)) > metadata_length) {
+    std::stringstream ss;
+    ss << "flatbuffer size " << metadata_length << " invalid. File offset: " << offset
+       << ", metadata length: " << metadata_length;
+    return Status::Invalid(ss.str());
   }
 
-  std::shared_ptr<RecordBatchMessage> batch_meta = message->GetRecordBatch();
-
-  std::shared_ptr<RecordBatchReader> result(new RecordBatchReader());
-  result->impl_.reset(new RecordBatchReaderImpl(file, batch_meta, max_recursion_depth));
-  *out = result;
-
+  *metadata = std::make_shared<RecordBatchMetadata>(buffer, sizeof(int32_t));
   return Status::OK();
 }
 
-// Here the explicit destructor is required for compilers to be aware of
-// the complete information of RecordBatchReader::RecordBatchReaderImpl class
-RecordBatchReader::~RecordBatchReader() {}
+Status ReadRecordBatch(const std::shared_ptr<RecordBatchMetadata>& metadata,
+    const std::shared_ptr<Schema>& schema, io::ReadableFileInterface* file,
+    std::shared_ptr<RecordBatch>* out) {
+  return ReadRecordBatch(metadata, schema, kMaxIpcRecursionDepth, file, out);
+}
 
-Status RecordBatchReader::GetRecordBatch(
-    const std::shared_ptr<Schema>& schema, std::shared_ptr<RecordBatch>* out) {
-  return impl_->AssembleBatch(schema, out);
+Status ReadRecordBatch(const std::shared_ptr<RecordBatchMetadata>& metadata,
+    const std::shared_ptr<Schema>& schema, int max_recursion_depth,
+    io::ReadableFileInterface* file, std::shared_ptr<RecordBatch>* out) {
+  RecordBatchReader reader(metadata, schema, max_recursion_depth, file);
+  return reader.Read(out);
 }
 
 }  // namespace ipc

http://git-wip-us.apache.org/repos/asf/arrow/blob/e3c167bd/cpp/src/arrow/ipc/adapter.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/adapter.h b/cpp/src/arrow/ipc/adapter.h
index b02de28..963b9ee 100644
--- a/cpp/src/arrow/ipc/adapter.h
+++ b/cpp/src/arrow/ipc/adapter.h
@@ -43,7 +43,7 @@ class OutputStream;
 
 namespace ipc {
 
-class RecordBatchMessage;
+class RecordBatchMetadata;
 
 // ----------------------------------------------------------------------
 // Write path
@@ -51,22 +51,30 @@ class RecordBatchMessage;
 // TODO(emkornfield) investigate this more
 constexpr int kMaxIpcRecursionDepth = 64;
 
-// Write the RecordBatch (collection of equal-length Arrow arrays) to the output
-// stream
+// Write the RecordBatch (collection of equal-length Arrow arrays) to the
+// output stream in a contiguous block. The record batch metadata is written as
+// a flatbuffer (see format/Message.fbs -- the RecordBatch message type)
+// prefixed by its size, followed by each of the memory buffers in the batch
+// written end to end (with appropriate alignment and padding):
 //
-// First, each of the memory buffers are written out end-to-end
-//
-// Then, this function writes the batch metadata as a flatbuffer (see
-// format/Message.fbs -- the RecordBatch message type) like so:
-//
-// <int32: metadata size> <uint8*: metadata>
+// <int32: metadata size> <uint8*: metadata> <buffers>
 //
 // Finally, the absolute offsets (relative to the start of the output stream)
 // to the end of the body and end of the metadata / data header (suffixed by
 // the header size) is returned in out-variables
+//
+// @param(in) buffer_start_offset: the start offset to use in the buffer metadata,
+// default should be 0
+//
+// @param(out) metadata_length: the size of the length-prefixed flatbuffer
+// including padding to a 64-byte boundary
+//
+// @param(out) body_length: the size of the contiguous buffer block plus
+// padding bytes
 ARROW_EXPORT Status WriteRecordBatch(const std::vector<std::shared_ptr<Array>>& columns,
-    int32_t num_rows, io::OutputStream* dst, int64_t* body_end_offset,
-    int64_t* header_end_offset, int max_recursion_depth = kMaxIpcRecursionDepth);
+    int32_t num_rows, int64_t buffer_start_offset, io::OutputStream* dst,
+    int32_t* metadata_length, int64_t* body_length,
+    int max_recursion_depth = kMaxIpcRecursionDepth);
 
 // int64_t GetRecordBatchMetadata(const RecordBatch* batch);
 
@@ -78,27 +86,20 @@ ARROW_EXPORT Status GetRecordBatchSize(const RecordBatch* batch, int64_t* size);
 // ----------------------------------------------------------------------
 // "Read" path; does not copy data if the input supports zero copy reads
 
-class ARROW_EXPORT RecordBatchReader {
- public:
-  // The offset is the absolute position to the *end* of the record batch data
-  // header
-  static Status Open(io::ReadableFileInterface* file, int64_t offset,
-      std::shared_ptr<RecordBatchReader>* out);
-
-  static Status Open(io::ReadableFileInterface* file, int64_t offset,
-      int max_recursion_depth, std::shared_ptr<RecordBatchReader>* out);
-
-  virtual ~RecordBatchReader();
-
-  // Reassemble the record batch. A Schema is required to be able to construct
-  // the right array containers
-  Status GetRecordBatch(
-      const std::shared_ptr<Schema>& schema, std::shared_ptr<RecordBatch>* out);
-
- private:
-  class RecordBatchReaderImpl;
-  std::unique_ptr<RecordBatchReaderImpl> impl_;
-};
+// Read the record batch flatbuffer metadata starting at the indicated file offset
+//
+// The flatbuffer is expected to be length-prefixed, so the metadata_length
+// includes at least the length prefix and the flatbuffer
+Status ARROW_EXPORT ReadRecordBatchMetadata(int64_t offset, int32_t metadata_length,
+    io::ReadableFileInterface* file, std::shared_ptr<RecordBatchMetadata>* metadata);
+
+Status ARROW_EXPORT ReadRecordBatch(const std::shared_ptr<RecordBatchMetadata>& metadata,
+    const std::shared_ptr<Schema>& schema, io::ReadableFileInterface* file,
+    std::shared_ptr<RecordBatch>* out);
+
+Status ARROW_EXPORT ReadRecordBatch(const std::shared_ptr<RecordBatchMetadata>& metadata,
+    const std::shared_ptr<Schema>& schema, int max_recursion_depth,
+    io::ReadableFileInterface* file, std::shared_ptr<RecordBatch>* out);
 
 }  // namespace ipc
 }  // namespace arrow

http://git-wip-us.apache.org/repos/asf/arrow/blob/e3c167bd/cpp/src/arrow/ipc/file.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/file.cc b/cpp/src/arrow/ipc/file.cc
index c68244d..06001cc 100644
--- a/cpp/src/arrow/ipc/file.cc
+++ b/cpp/src/arrow/ipc/file.cc
@@ -23,6 +23,7 @@
 #include <vector>
 
 #include "arrow/io/interfaces.h"
+#include "arrow/io/memory.h"
 #include "arrow/ipc/adapter.h"
 #include "arrow/ipc/metadata.h"
 #include "arrow/ipc/util.h"
@@ -87,19 +88,19 @@ Status FileWriter::WriteRecordBatch(
 
   int64_t offset = position_;
 
-  int64_t body_end_offset;
-  int64_t header_end_offset;
+  // There may be padding ever the end of the metadata, so we cannot rely on
+  // position_
+  int32_t metadata_length;
+  int64_t body_length;
+
+  // Frame of reference in file format is 0, see ARROW-384
+  const int64_t buffer_start_offset = 0;
   RETURN_NOT_OK(arrow::ipc::WriteRecordBatch(
-      columns, num_rows, sink_, &body_end_offset, &header_end_offset));
+      columns, num_rows, buffer_start_offset, sink_, &metadata_length, &body_length));
   RETURN_NOT_OK(UpdatePosition());
 
   DCHECK(position_ % 8 == 0) << "ipc::WriteRecordBatch did not perform aligned writes";
 
-  // There may be padding ever the end of the metadata, so we cannot rely on
-  // position_
-  int32_t metadata_length = header_end_offset - body_end_offset;
-  int32_t body_length = body_end_offset - offset;
-
   // Append metadata, to be written in the footer later
   record_batches_.emplace_back(offset, metadata_length, body_length);
 
@@ -198,12 +199,18 @@ Status FileReader::GetRecordBatch(int i, std::shared_ptr<RecordBatch>* batch) {
   DCHECK_GE(i, 0);
   DCHECK_LT(i, num_record_batches());
   FileBlock block = footer_->record_batch(i);
-  int64_t metadata_end_offset = block.offset + block.body_length + block.metadata_length;
 
-  std::shared_ptr<RecordBatchReader> reader;
-  RETURN_NOT_OK(RecordBatchReader::Open(file_.get(), metadata_end_offset, &reader));
+  std::shared_ptr<RecordBatchMetadata> metadata;
+  RETURN_NOT_OK(ReadRecordBatchMetadata(
+      block.offset, block.metadata_length, file_.get(), &metadata));
+
+  // TODO(wesm): ARROW-388 -- the buffer frame of reference is 0 (see
+  // ARROW-384).
+  std::shared_ptr<Buffer> buffer_block;
+  RETURN_NOT_OK(file_->Read(block.body_length, &buffer_block));
+  io::BufferReader reader(buffer_block);
 
-  return reader->GetRecordBatch(schema_, batch);
+  return ReadRecordBatch(metadata, schema_, &reader, batch);
 }
 
 }  // namespace ipc

http://git-wip-us.apache.org/repos/asf/arrow/blob/e3c167bd/cpp/src/arrow/ipc/ipc-adapter-test.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/ipc-adapter-test.cc b/cpp/src/arrow/ipc/ipc-adapter-test.cc
index f5611d4..1accfde 100644
--- a/cpp/src/arrow/ipc/ipc-adapter-test.cc
+++ b/cpp/src/arrow/ipc/ipc-adapter-test.cc
@@ -54,17 +54,24 @@ class TestWriteRecordBatch : public ::testing::TestWithParam<MakeRecordBatch*>,
     std::string path = "test-write-row-batch";
     io::MemoryMapFixture::InitMemoryMap(memory_map_size, path, &mmap_);
 
-    int64_t body_end_offset;
-    int64_t header_end_offset;
+    int32_t metadata_length;
+    int64_t body_length;
 
-    RETURN_NOT_OK(WriteRecordBatch(batch.columns(), batch.num_rows(), mmap_.get(),
-        &body_end_offset, &header_end_offset));
+    const int64_t buffer_offset = 0;
 
-    std::shared_ptr<RecordBatchReader> reader;
-    RETURN_NOT_OK(RecordBatchReader::Open(mmap_.get(), header_end_offset, &reader));
+    RETURN_NOT_OK(WriteRecordBatch(batch.columns(), batch.num_rows(), buffer_offset,
+        mmap_.get(), &metadata_length, &body_length));
 
-    RETURN_NOT_OK(reader->GetRecordBatch(batch.schema(), batch_result));
-    return Status::OK();
+    std::shared_ptr<RecordBatchMetadata> metadata;
+    RETURN_NOT_OK(ReadRecordBatchMetadata(0, metadata_length, mmap_.get(), &metadata));
+
+    // The buffer offsets start at 0, so we must construct a
+    // ReadableFileInterface according to that frame of reference
+    std::shared_ptr<Buffer> buffer_payload;
+    RETURN_NOT_OK(mmap_->ReadAt(metadata_length, body_length, &buffer_payload));
+    io::BufferReader buffer_reader(buffer_payload);
+
+    return ReadRecordBatch(metadata, batch.schema(), &buffer_reader, batch_result);
   }
 
  protected:
@@ -96,11 +103,11 @@ INSTANTIATE_TEST_CASE_P(RoundTripTests, TestWriteRecordBatch,
 
 void TestGetRecordBatchSize(std::shared_ptr<RecordBatch> batch) {
   ipc::MockOutputStream mock;
-  int64_t mock_header_offset = -1;
-  int64_t mock_body_offset = -1;
+  int32_t mock_metadata_length = -1;
+  int64_t mock_body_length = -1;
   int64_t size = -1;
-  ASSERT_OK(WriteRecordBatch(batch->columns(), batch->num_rows(), &mock,
-      &mock_body_offset, &mock_header_offset));
+  ASSERT_OK(WriteRecordBatch(batch->columns(), batch->num_rows(), 0, &mock,
+      &mock_metadata_length, &mock_body_length));
   ASSERT_OK(GetRecordBatchSize(batch.get(), &size));
   ASSERT_EQ(mock.GetExtentBytesWritten(), size);
 }
@@ -129,39 +136,36 @@ class RecursionLimits : public ::testing::Test, public io::MemoryMapFixture {
   void SetUp() { pool_ = default_memory_pool(); }
   void TearDown() { io::MemoryMapFixture::TearDown(); }
 
-  Status WriteToMmap(int recursion_level, bool override_level,
-      int64_t* header_out = nullptr, std::shared_ptr<Schema>* schema_out = nullptr) {
+  Status WriteToMmap(int recursion_level, bool override_level, int32_t* metadata_length,
+      int64_t* body_length, std::shared_ptr<Schema>* schema) {
     const int batch_length = 5;
-    TypePtr type = kInt32;
+    TypePtr type = int32();
     ArrayPtr array;
     const bool include_nulls = true;
     RETURN_NOT_OK(MakeRandomInt32Array(1000, include_nulls, pool_, &array));
     for (int i = 0; i < recursion_level; ++i) {
-      type = std::static_pointer_cast<DataType>(std::make_shared<ListType>(type));
+      type = list(type);
       RETURN_NOT_OK(
           MakeRandomListArray(array, batch_length, include_nulls, pool_, &array));
     }
 
-    auto f0 = std::make_shared<Field>("f0", type);
-    std::shared_ptr<Schema> schema(new Schema({f0}));
-    if (schema_out != nullptr) { *schema_out = schema; }
+    auto f0 = field("f0", type);
+
+    *schema = std::shared_ptr<Schema>(new Schema({f0}));
+
     std::vector<ArrayPtr> arrays = {array};
-    auto batch = std::make_shared<RecordBatch>(schema, batch_length, arrays);
+    auto batch = std::make_shared<RecordBatch>(*schema, batch_length, arrays);
 
     std::string path = "test-write-past-max-recursion";
     const int memory_map_size = 1 << 16;
     io::MemoryMapFixture::InitMemoryMap(memory_map_size, path, &mmap_);
 
-    int64_t body_offset;
-    int64_t header_offset;
-
-    int64_t* header_out_param = header_out == nullptr ? &header_offset : header_out;
     if (override_level) {
-      return WriteRecordBatch(batch->columns(), batch->num_rows(), mmap_.get(),
-          &body_offset, header_out_param, recursion_level + 1);
+      return WriteRecordBatch(batch->columns(), batch->num_rows(), 0, mmap_.get(),
+          metadata_length, body_length, recursion_level + 1);
     } else {
-      return WriteRecordBatch(batch->columns(), batch->num_rows(), mmap_.get(),
-          &body_offset, header_out_param);
+      return WriteRecordBatch(batch->columns(), batch->num_rows(), 0, mmap_.get(),
+          metadata_length, body_length);
     }
   }
 
@@ -171,18 +175,29 @@ class RecursionLimits : public ::testing::Test, public io::MemoryMapFixture {
 };
 
 TEST_F(RecursionLimits, WriteLimit) {
-  ASSERT_RAISES(Invalid, WriteToMmap((1 << 8) + 1, false));
+  int32_t metadata_length = -1;
+  int64_t body_length = -1;
+  std::shared_ptr<Schema> schema;
+  ASSERT_RAISES(
+      Invalid, WriteToMmap((1 << 8) + 1, false, &metadata_length, &body_length, &schema));
 }
 
 TEST_F(RecursionLimits, ReadLimit) {
-  int64_t header_offset = -1;
+  int32_t metadata_length = -1;
+  int64_t body_length = -1;
   std::shared_ptr<Schema> schema;
-  ASSERT_OK(WriteToMmap(64, true, &header_offset, &schema));
+  ASSERT_OK(WriteToMmap(64, true, &metadata_length, &body_length, &schema));
 
-  std::shared_ptr<RecordBatchReader> reader;
-  ASSERT_OK(RecordBatchReader::Open(mmap_.get(), header_offset, &reader));
-  std::shared_ptr<RecordBatch> batch_result;
-  ASSERT_RAISES(Invalid, reader->GetRecordBatch(schema, &batch_result));
+  std::shared_ptr<RecordBatchMetadata> metadata;
+  ASSERT_OK(ReadRecordBatchMetadata(0, metadata_length, mmap_.get(), &metadata));
+
+  std::shared_ptr<Buffer> payload;
+  ASSERT_OK(mmap_->ReadAt(metadata_length, body_length, &payload));
+
+  io::BufferReader reader(payload);
+
+  std::shared_ptr<RecordBatch> batch;
+  ASSERT_RAISES(Invalid, ReadRecordBatch(metadata, schema, &reader, &batch));
 }
 
 }  // namespace ipc

http://git-wip-us.apache.org/repos/asf/arrow/blob/e3c167bd/cpp/src/arrow/ipc/ipc-file-test.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/ipc-file-test.cc b/cpp/src/arrow/ipc/ipc-file-test.cc
index cd424bf..a1feac4 100644
--- a/cpp/src/arrow/ipc/ipc-file-test.cc
+++ b/cpp/src/arrow/ipc/ipc-file-test.cc
@@ -68,7 +68,7 @@ class TestFileFormat : public ::testing::TestWithParam<MakeRecordBatch*> {
     RETURN_NOT_OK(sink_->Tell(&footer_offset));
 
     // Open the file
-    auto reader = std::make_shared<io::BufferReader>(buffer_->data(), buffer_->size());
+    auto reader = std::make_shared<io::BufferReader>(buffer_);
     RETURN_NOT_OK(FileReader::Open(reader, footer_offset, &file_reader_));
 
     EXPECT_EQ(num_batches, file_reader_->num_record_batches());

http://git-wip-us.apache.org/repos/asf/arrow/blob/e3c167bd/cpp/src/arrow/ipc/ipc-json-test.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/ipc-json-test.cc b/cpp/src/arrow/ipc/ipc-json-test.cc
index a51371c..e5c3a08 100644
--- a/cpp/src/arrow/ipc/ipc-json-test.cc
+++ b/cpp/src/arrow/ipc/ipc-json-test.cc
@@ -284,19 +284,23 @@ TEST(TestJsonFileReadWrite, MinimalFormatExample) {
         "name": "foo",
         "type": {"name": "int", "isSigned": true, "bitWidth": 32},
         "nullable": true, "children": [],
-        "typeLayout": [
-          {"type": "VALIDITY", "typeBitWidth": 1},
-          {"type": "DATA", "typeBitWidth": 32}
-        ]
+        "typeLayout": {
+          "vectors": [
+            {"type": "VALIDITY", "typeBitWidth": 1},
+            {"type": "DATA", "typeBitWidth": 32}
+          ]
+        }
       },
       {
         "name": "bar",
         "type": {"name": "floatingpoint", "precision": "DOUBLE"},
         "nullable": true, "children": [],
-        "typeLayout": [
-          {"type": "VALIDITY", "typeBitWidth": 1},
-          {"type": "DATA", "typeBitWidth": 64}
-        ]
+        "typeLayout": {
+          "vectors": [
+            {"type": "VALIDITY", "typeBitWidth": 1},
+            {"type": "DATA", "typeBitWidth": 64}
+          ]
+        }
       }
     ]
   },

http://git-wip-us.apache.org/repos/asf/arrow/blob/e3c167bd/cpp/src/arrow/ipc/ipc-metadata-test.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/ipc-metadata-test.cc b/cpp/src/arrow/ipc/ipc-metadata-test.cc
index 1dc3969..d29583f 100644
--- a/cpp/src/arrow/ipc/ipc-metadata-test.cc
+++ b/cpp/src/arrow/ipc/ipc-metadata-test.cc
@@ -43,7 +43,7 @@ static inline void assert_schema_equal(const Schema* lhs, const Schema* rhs) {
   }
 }
 
-class TestSchemaMessage : public ::testing::Test {
+class TestSchemaMetadata : public ::testing::Test {
  public:
   void SetUp() {}
 
@@ -52,11 +52,11 @@ class TestSchemaMessage : public ::testing::Test {
     ASSERT_OK(WriteSchema(schema, &buffer));
 
     std::shared_ptr<Message> message;
-    ASSERT_OK(Message::Open(buffer, &message));
+    ASSERT_OK(Message::Open(buffer, 0, &message));
 
     ASSERT_EQ(Message::SCHEMA, message->type());
 
-    std::shared_ptr<SchemaMessage> schema_msg = message->GetSchema();
+    auto schema_msg = std::make_shared<SchemaMetadata>(message);
     ASSERT_EQ(schema->num_fields(), schema_msg->num_fields());
 
     std::shared_ptr<Schema> schema2;
@@ -68,7 +68,7 @@ class TestSchemaMessage : public ::testing::Test {
 
 const std::shared_ptr<DataType> INT32 = std::make_shared<Int32Type>();
 
-TEST_F(TestSchemaMessage, PrimitiveFields) {
+TEST_F(TestSchemaMetadata, PrimitiveFields) {
   auto f0 = std::make_shared<Field>("f0", std::make_shared<Int8Type>());
   auto f1 = std::make_shared<Field>("f1", std::make_shared<Int16Type>());
   auto f2 = std::make_shared<Field>("f2", std::make_shared<Int32Type>());
@@ -85,7 +85,7 @@ TEST_F(TestSchemaMessage, PrimitiveFields) {
   CheckRoundtrip(&schema);
 }
 
-TEST_F(TestSchemaMessage, NestedFields) {
+TEST_F(TestSchemaMetadata, NestedFields) {
   auto type = std::make_shared<ListType>(std::make_shared<Int32Type>());
   auto f0 = std::make_shared<Field>("f0", type);
 
@@ -111,7 +111,7 @@ class TestFileFooter : public ::testing::Test {
     std::unique_ptr<FileFooter> footer;
     ASSERT_OK(FileFooter::Open(buffer, &footer));
 
-    ASSERT_EQ(MetadataVersion::V1_SNAPSHOT, footer->version());
+    ASSERT_EQ(MetadataVersion::V2, footer->version());
 
     // Check schema
     std::shared_ptr<Schema> schema2;

http://git-wip-us.apache.org/repos/asf/arrow/blob/e3c167bd/cpp/src/arrow/ipc/json-integration-test.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/json-integration-test.cc b/cpp/src/arrow/ipc/json-integration-test.cc
index 5eff899..7a313f7 100644
--- a/cpp/src/arrow/ipc/json-integration-test.cc
+++ b/cpp/src/arrow/ipc/json-integration-test.cc
@@ -255,19 +255,23 @@ static const char* JSON_EXAMPLE = R"example(
         "name": "foo",
         "type": {"name": "int", "isSigned": true, "bitWidth": 32},
         "nullable": true, "children": [],
-        "typeLayout": [
-          {"type": "VALIDITY", "typeBitWidth": 1},
-          {"type": "DATA", "typeBitWidth": 32}
-        ]
+        "typeLayout": {
+          "vectors": [
+            {"type": "VALIDITY", "typeBitWidth": 1},
+            {"type": "DATA", "typeBitWidth": 32}
+          ]
+        }
       },
       {
         "name": "bar",
         "type": {"name": "floatingpoint", "precision": "DOUBLE"},
         "nullable": true, "children": [],
-        "typeLayout": [
-          {"type": "VALIDITY", "typeBitWidth": 1},
-          {"type": "DATA", "typeBitWidth": 64}
-        ]
+        "typeLayout": {
+          "vectors": [
+            {"type": "VALIDITY", "typeBitWidth": 1},
+            {"type": "DATA", "typeBitWidth": 64}
+          ]
+        }
       }
     ]
   },
@@ -301,10 +305,12 @@ static const char* JSON_EXAMPLE2 = R"example(
         "name": "foo",
         "type": {"name": "int", "isSigned": true, "bitWidth": 32},
         "nullable": true, "children": [],
-        "typeLayout": [
-          {"type": "VALIDITY", "typeBitWidth": 1},
-          {"type": "DATA", "typeBitWidth": 32}
-        ]
+        "typeLayout": {
+          "vectors": [
+            {"type": "VALIDITY", "typeBitWidth": 1},
+            {"type": "DATA", "typeBitWidth": 32}
+          ]
+        }
       }
     ]
   },

http://git-wip-us.apache.org/repos/asf/arrow/blob/e3c167bd/cpp/src/arrow/ipc/json-internal.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/json-internal.cc b/cpp/src/arrow/ipc/json-internal.cc
index 31fe35b..e56bcb3 100644
--- a/cpp/src/arrow/ipc/json-internal.cc
+++ b/cpp/src/arrow/ipc/json-internal.cc
@@ -45,8 +45,6 @@ namespace ipc {
 using RjArray = rj::Value::ConstArray;
 using RjObject = rj::Value::ConstObject;
 
-enum class BufferType : char { DATA, OFFSET, TYPE, VALIDITY };
-
 static std::string GetBufferTypeName(BufferType type) {
   switch (type) {
     case BufferType::DATA:
@@ -93,27 +91,6 @@ static std::string GetTimeUnitName(TimeUnit unit) {
   return "UNKNOWN";
 }
 
-class BufferLayout {
- public:
-  BufferLayout(BufferType type, int bit_width) : type_(type), bit_width_(bit_width) {}
-
-  BufferType type() const { return type_; }
-  int bit_width() const { return bit_width_; }
-
- private:
-  BufferType type_;
-  int bit_width_;
-};
-
-static const BufferLayout kValidityBuffer(BufferType::VALIDITY, 1);
-static const BufferLayout kOffsetBuffer(BufferType::OFFSET, 32);
-static const BufferLayout kTypeBuffer(BufferType::TYPE, 32);
-static const BufferLayout kBooleanBuffer(BufferType::DATA, 1);
-static const BufferLayout kValues64(BufferType::DATA, 64);
-static const BufferLayout kValues32(BufferType::DATA, 32);
-static const BufferLayout kValues16(BufferType::DATA, 16);
-static const BufferLayout kValues8(BufferType::DATA, 8);
-
 class JsonSchemaWriter : public TypeVisitor {
  public:
   explicit JsonSchemaWriter(const Schema& schema, RjWriter* writer)
@@ -154,9 +131,9 @@ class JsonSchemaWriter : public TypeVisitor {
   }
 
   template <typename T>
-  typename std::enable_if<std::is_base_of<NoExtraMeta, T>::value ||
-                              std::is_base_of<BooleanType, T>::value ||
-                              std::is_base_of<NullType, T>::value,
+  typename std::enable_if<
+      std::is_base_of<NoExtraMeta, T>::value || std::is_base_of<BooleanType, T>::value ||
+          std::is_base_of<DateType, T>::value || std::is_base_of<NullType, T>::value,
       void>::type
   WriteTypeMetadata(const T& type) {}
 
@@ -243,11 +220,10 @@ class JsonSchemaWriter : public TypeVisitor {
   }
 
   template <typename T>
-  Status WritePrimitive(const std::string& typeclass, const T& type,
-      const std::vector<BufferLayout>& buffer_layout) {
+  Status WritePrimitive(const std::string& typeclass, const T& type) {
     WriteName(typeclass, type);
     SetNoChildren();
-    WriteBufferLayout(buffer_layout);
+    WriteBufferLayout(type.GetBufferLayout());
     return Status::OK();
   }
 
@@ -255,15 +231,17 @@ class JsonSchemaWriter : public TypeVisitor {
   Status WriteVarBytes(const std::string& typeclass, const T& type) {
     WriteName(typeclass, type);
     SetNoChildren();
-    WriteBufferLayout({kValidityBuffer, kOffsetBuffer, kValues8});
+    WriteBufferLayout(type.GetBufferLayout());
     return Status::OK();
   }
 
-  void WriteBufferLayout(const std::vector<BufferLayout>& buffer_layout) {
+  void WriteBufferLayout(const std::vector<BufferDescr>& buffer_layout) {
     writer_->Key("typeLayout");
+    writer_->StartObject();
+    writer_->Key("vectors");
     writer_->StartArray();
 
-    for (const BufferLayout& buffer : buffer_layout) {
+    for (const BufferDescr& buffer : buffer_layout) {
       writer_->StartObject();
       writer_->Key("type");
       writer_->String(GetBufferTypeName(buffer.type()));
@@ -274,6 +252,7 @@ class JsonSchemaWriter : public TypeVisitor {
       writer_->EndObject();
     }
     writer_->EndArray();
+    writer_->EndObject();
   }
 
   Status WriteChildren(const std::vector<std::shared_ptr<Field>>& children) {
@@ -286,74 +265,52 @@ class JsonSchemaWriter : public TypeVisitor {
     return Status::OK();
   }
 
-  Status Visit(const NullType& type) override { return WritePrimitive("null", type, {}); }
+  Status Visit(const NullType& type) override { return WritePrimitive("null", type); }
 
-  Status Visit(const BooleanType& type) override {
-    return WritePrimitive("bool", type, {kValidityBuffer, kBooleanBuffer});
-  }
+  Status Visit(const BooleanType& type) override { return WritePrimitive("bool", type); }
 
-  Status Visit(const Int8Type& type) override {
-    return WritePrimitive("int", type, {kValidityBuffer, kValues8});
-  }
+  Status Visit(const Int8Type& type) override { return WritePrimitive("int", type); }
 
-  Status Visit(const Int16Type& type) override {
-    return WritePrimitive("int", type, {kValidityBuffer, kValues16});
-  }
+  Status Visit(const Int16Type& type) override { return WritePrimitive("int", type); }
 
-  Status Visit(const Int32Type& type) override {
-    return WritePrimitive("int", type, {kValidityBuffer, kValues32});
-  }
+  Status Visit(const Int32Type& type) override { return WritePrimitive("int", type); }
 
-  Status Visit(const Int64Type& type) override {
-    return WritePrimitive("int", type, {kValidityBuffer, kValues64});
-  }
+  Status Visit(const Int64Type& type) override { return WritePrimitive("int", type); }
 
-  Status Visit(const UInt8Type& type) override {
-    return WritePrimitive("int", type, {kValidityBuffer, kValues8});
-  }
+  Status Visit(const UInt8Type& type) override { return WritePrimitive("int", type); }
 
-  Status Visit(const UInt16Type& type) override {
-    return WritePrimitive("int", type, {kValidityBuffer, kValues16});
-  }
+  Status Visit(const UInt16Type& type) override { return WritePrimitive("int", type); }
 
-  Status Visit(const UInt32Type& type) override {
-    return WritePrimitive("int", type, {kValidityBuffer, kValues32});
-  }
+  Status Visit(const UInt32Type& type) override { return WritePrimitive("int", type); }
 
-  Status Visit(const UInt64Type& type) override {
-    return WritePrimitive("int", type, {kValidityBuffer, kValues64});
-  }
+  Status Visit(const UInt64Type& type) override { return WritePrimitive("int", type); }
 
   Status Visit(const HalfFloatType& type) override {
-    return WritePrimitive("floatingpoint", type, {kValidityBuffer, kValues16});
+    return WritePrimitive("floatingpoint", type);
   }
 
   Status Visit(const FloatType& type) override {
-    return WritePrimitive("floatingpoint", type, {kValidityBuffer, kValues32});
+    return WritePrimitive("floatingpoint", type);
   }
 
   Status Visit(const DoubleType& type) override {
-    return WritePrimitive("floatingpoint", type, {kValidityBuffer, kValues64});
+    return WritePrimitive("floatingpoint", type);
   }
 
   Status Visit(const StringType& type) override { return WriteVarBytes("utf8", type); }
 
   Status Visit(const BinaryType& type) override { return WriteVarBytes("binary", type); }
 
-  Status Visit(const DateType& type) override {
-    return WritePrimitive("date", type, {kValidityBuffer, kValues64});
-  }
+  Status Visit(const DateType& type) override { return WritePrimitive("date", type); }
 
-  Status Visit(const TimeType& type) override {
-    return WritePrimitive("time", type, {kValidityBuffer, kValues64});
-  }
+  Status Visit(const TimeType& type) override { return WritePrimitive("time", type); }
 
   Status Visit(const TimestampType& type) override {
-    return WritePrimitive("timestamp", type, {kValidityBuffer, kValues64});
+    return WritePrimitive("timestamp", type);
   }
 
   Status Visit(const IntervalType& type) override {
-    return WritePrimitive("interval", type, {kValidityBuffer, kValues64});
+    return WritePrimitive("interval", type);
   }
 
   Status Visit(const DecimalType& type) override { return Status::NotImplemented("NYI"); }
@@ -361,26 +318,21 @@ class JsonSchemaWriter : public TypeVisitor {
   Status Visit(const ListType& type) override {
     WriteName("list", type);
     RETURN_NOT_OK(WriteChildren(type.children()));
-    WriteBufferLayout({kValidityBuffer, kOffsetBuffer});
+    WriteBufferLayout(type.GetBufferLayout());
     return Status::OK();
   }
 
   Status Visit(const StructType& type) override {
     WriteName("struct", type);
     WriteChildren(type.children());
-    WriteBufferLayout({kValidityBuffer, kTypeBuffer});
+    WriteBufferLayout(type.GetBufferLayout());
     return Status::OK();
   }
 
   Status Visit(const UnionType& type) override {
     WriteName("union", type);
     WriteChildren(type.children());
-
-    if (type.mode == UnionMode::SPARSE) {
-      WriteBufferLayout({kValidityBuffer, kTypeBuffer});
-    } else {
-      WriteBufferLayout({kValidityBuffer, kTypeBuffer, kOffsetBuffer});
-    }
+    WriteBufferLayout(type.GetBufferLayout());
     return Status::OK();
   }
 

http://git-wip-us.apache.org/repos/asf/arrow/blob/e3c167bd/cpp/src/arrow/ipc/metadata-internal.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/metadata-internal.cc b/cpp/src/arrow/ipc/metadata-internal.cc
index 7102012..b995228 100644
--- a/cpp/src/arrow/ipc/metadata-internal.cc
+++ b/cpp/src/arrow/ipc/metadata-internal.cc
@@ -37,20 +37,6 @@ namespace flatbuf = org::apache::arrow::flatbuf;
 
 namespace ipc {
 
-const std::shared_ptr<DataType> BOOL = std::make_shared<BooleanType>();
-const std::shared_ptr<DataType> INT8 = std::make_shared<Int8Type>();
-const std::shared_ptr<DataType> INT16 = std::make_shared<Int16Type>();
-const std::shared_ptr<DataType> INT32 = std::make_shared<Int32Type>();
-const std::shared_ptr<DataType> INT64 = std::make_shared<Int64Type>();
-const std::shared_ptr<DataType> UINT8 = std::make_shared<UInt8Type>();
-const std::shared_ptr<DataType> UINT16 = std::make_shared<UInt16Type>();
-const std::shared_ptr<DataType> UINT32 = std::make_shared<UInt32Type>();
-const std::shared_ptr<DataType> UINT64 = std::make_shared<UInt64Type>();
-const std::shared_ptr<DataType> FLOAT = std::make_shared<FloatType>();
-const std::shared_ptr<DataType> DOUBLE = std::make_shared<DoubleType>();
-const std::shared_ptr<DataType> STRING = std::make_shared<StringType>();
-const std::shared_ptr<DataType> BINARY = std::make_shared<BinaryType>();
-
 static Status IntFromFlatbuffer(
     const flatbuf::Int* int_data, std::shared_ptr<DataType>* out) {
   if (int_data->bitWidth() > 64) {
@@ -62,16 +48,16 @@ static Status IntFromFlatbuffer(
 
   switch (int_data->bitWidth()) {
     case 8:
-      *out = int_data->is_signed() ? INT8 : UINT8;
+      *out = int_data->is_signed() ? int8() : uint8();
       break;
     case 16:
-      *out = int_data->is_signed() ? INT16 : UINT16;
+      *out = int_data->is_signed() ? int16() : uint16();
       break;
     case 32:
-      *out = int_data->is_signed() ? INT32 : UINT32;
+      *out = int_data->is_signed() ? int32() : uint32();
       break;
     case 64:
-      *out = int_data->is_signed() ? INT64 : UINT64;
+      *out = int_data->is_signed() ? int64() : uint64();
       break;
     default:
       return Status::NotImplemented("Integers not in cstdint are not implemented");
@@ -81,10 +67,12 @@ static Status IntFromFlatbuffer(
 
 static Status FloatFromFlatuffer(
     const flatbuf::FloatingPoint* float_data, std::shared_ptr<DataType>* out) {
-  if (float_data->precision() == flatbuf::Precision_SINGLE) {
-    *out = FLOAT;
+  if (float_data->precision() == flatbuf::Precision_HALF) {
+    *out = float16();
+  } else if (float_data->precision() == flatbuf::Precision_SINGLE) {
+    *out = float32();
   } else {
-    *out = DOUBLE;
+    *out = float64();
   }
   return Status::OK();
 }
@@ -100,13 +88,13 @@ static Status TypeFromFlatbuffer(flatbuf::Type type, const void* type_data,
       return FloatFromFlatuffer(
           static_cast<const flatbuf::FloatingPoint*>(type_data), out);
     case flatbuf::Type_Binary:
-      *out = BINARY;
+      *out = binary();
       return Status::OK();
     case flatbuf::Type_Utf8:
-      *out = STRING;
+      *out = utf8();
       return Status::OK();
     case flatbuf::Type_Bool:
-      *out = BOOL;
+      *out = boolean();
       return Status::OK();
     case flatbuf::Type_Decimal:
     case flatbuf::Type_Timestamp:
@@ -164,7 +152,32 @@ static Status StructToFlatbuffer(FBB& fbb, const std::shared_ptr<DataType>& type
   break;
 
 static Status TypeToFlatbuffer(FBB& fbb, const std::shared_ptr<DataType>& type,
-    std::vector<FieldOffset>* children, flatbuf::Type* out_type, Offset* offset) {
+    std::vector<FieldOffset>* children, std::vector<VectorLayoutOffset>* layout,
+    flatbuf::Type* out_type, Offset* offset) {
+  std::vector<BufferDescr> buffer_layout = type->GetBufferLayout();
+  for (const BufferDescr& descr : buffer_layout) {
+    flatbuf::VectorType vector_type;
+    switch (descr.type()) {
+      case BufferType::OFFSET:
+        vector_type = flatbuf::VectorType_OFFSET;
+        break;
+      case BufferType::DATA:
+        vector_type = flatbuf::VectorType_DATA;
+        break;
+      case BufferType::VALIDITY:
+        vector_type = flatbuf::VectorType_VALIDITY;
+        break;
+      case BufferType::TYPE:
+        vector_type = flatbuf::VectorType_TYPE;
+        break;
+      default:
+        vector_type = flatbuf::VectorType_DATA;
+        break;
+    }
+    auto offset = flatbuf::CreateVectorLayout(fbb, descr.bit_width(), vector_type);
+    layout->push_back(offset);
+  }
+
   switch (type->type) {
     case Type::BOOL:
       *out_type = flatbuf::Type_Bool;
@@ -223,14 +236,18 @@ static Status FieldToFlatbuffer(
 
   flatbuf::Type type_enum;
   Offset type_data;
+  Offset type_layout;
   std::vector<FieldOffset> children;
+  std::vector<VectorLayoutOffset> layout;
 
-  RETURN_NOT_OK(TypeToFlatbuffer(fbb, field->type, &children, &type_enum, &type_data));
+  RETURN_NOT_OK(
+      TypeToFlatbuffer(fbb, field->type, &children, &layout, &type_enum, &type_data));
   auto fb_children = fbb.CreateVector(children);
+  auto fb_layout = fbb.CreateVector(layout);
 
   // TODO: produce the list of VectorTypes
   *offset = flatbuf::CreateField(fbb, fb_name, field->nullable, type_enum, type_data,
-      field->dictionary, fb_children);
+      field->dictionary, fb_children, fb_layout);
 
   return Status::OK();
 }
@@ -300,13 +317,26 @@ Status MessageBuilder::SetRecordBatch(int32_t length, int64_t body_length,
   return Status::OK();
 }
 
-Status WriteDataHeader(int32_t length, int64_t body_length,
+Status WriteRecordBatchMetadata(int32_t length, int64_t body_length,
     const std::vector<flatbuf::FieldNode>& nodes,
     const std::vector<flatbuf::Buffer>& buffers, std::shared_ptr<Buffer>* out) {
-  MessageBuilder message;
-  RETURN_NOT_OK(message.SetRecordBatch(length, body_length, nodes, buffers));
-  RETURN_NOT_OK(message.Finish());
-  return message.GetBuffer(out);
+  flatbuffers::FlatBufferBuilder fbb;
+
+  auto batch = flatbuf::CreateRecordBatch(
+      fbb, length, fbb.CreateVectorOfStructs(nodes), fbb.CreateVectorOfStructs(buffers));
+
+  fbb.Finish(batch);
+
+  int32_t size = fbb.GetSize();
+
+  auto result = std::make_shared<PoolBuffer>();
+  RETURN_NOT_OK(result->Resize(size));
+
+  uint8_t* dst = result->mutable_data();
+  memcpy(dst, fbb.GetBufferPointer(), size);
+
+  *out = result;
+  return Status::OK();
 }
 
 Status MessageBuilder::Finish() {
@@ -317,17 +347,13 @@ Status MessageBuilder::Finish() {
 }
 
 Status MessageBuilder::GetBuffer(std::shared_ptr<Buffer>* out) {
-  // The message buffer is suffixed by the size of the complete flatbuffer as
-  // int32_t
-  // <uint8_t*: flatbuffer data><int32_t: flatbuffer size>
   int32_t size = fbb_.GetSize();
 
   auto result = std::make_shared<PoolBuffer>();
-  RETURN_NOT_OK(result->Resize(size + sizeof(int32_t)));
+  RETURN_NOT_OK(result->Resize(size));
 
   uint8_t* dst = result->mutable_data();
   memcpy(dst, fbb_.GetBufferPointer(), size);
-  memcpy(dst + size, reinterpret_cast<int32_t*>(&size), sizeof(int32_t));
 
   *out = result;
   return Status::OK();

http://git-wip-us.apache.org/repos/asf/arrow/blob/e3c167bd/cpp/src/arrow/ipc/metadata-internal.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/metadata-internal.h b/cpp/src/arrow/ipc/metadata-internal.h
index c404cfd..4826ebe 100644
--- a/cpp/src/arrow/ipc/metadata-internal.h
+++ b/cpp/src/arrow/ipc/metadata-internal.h
@@ -41,10 +41,10 @@ namespace ipc {
 
 using FBB = flatbuffers::FlatBufferBuilder;
 using FieldOffset = flatbuffers::Offset<arrow::flatbuf::Field>;
+using VectorLayoutOffset = flatbuffers::Offset<arrow::flatbuf::VectorLayout>;
 using Offset = flatbuffers::Offset<void>;
 
-static constexpr flatbuf::MetadataVersion kMetadataVersion =
-    flatbuf::MetadataVersion_V1_SNAPSHOT;
+static constexpr flatbuf::MetadataVersion kMetadataVersion = flatbuf::MetadataVersion_V2;
 
 Status FieldFromFlatbuffer(const flatbuf::Field* field, std::shared_ptr<Field>* out);
 
@@ -70,7 +70,7 @@ class MessageBuilder {
   flatbuffers::FlatBufferBuilder fbb_;
 };
 
-Status WriteDataHeader(int32_t length, int64_t body_length,
+Status WriteRecordBatchMetadata(int32_t length, int64_t body_length,
     const std::vector<flatbuf::FieldNode>& nodes,
     const std::vector<flatbuf::Buffer>& buffers, std::shared_ptr<Buffer>* out);
 

http://git-wip-us.apache.org/repos/asf/arrow/blob/e3c167bd/cpp/src/arrow/ipc/metadata.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/metadata.cc b/cpp/src/arrow/ipc/metadata.cc
index 66df8a6..44d3939 100644
--- a/cpp/src/arrow/ipc/metadata.cc
+++ b/cpp/src/arrow/ipc/metadata.cc
@@ -50,9 +50,15 @@ Status WriteSchema(const Schema* schema, std::shared_ptr<Buffer>* out) {
 
 class Message::MessageImpl {
  public:
-  explicit MessageImpl(
-      const std::shared_ptr<Buffer>& buffer, const flatbuf::Message* message)
-      : buffer_(buffer), message_(message) {}
+  explicit MessageImpl(const std::shared_ptr<Buffer>& buffer, int64_t offset)
+      : buffer_(buffer), offset_(offset), message_(nullptr) {}
+
+  Status Open() {
+    message_ = flatbuf::GetMessage(buffer_->data() + offset_);
+
+    // TODO(wesm): verify the message
+    return Status::OK();
+  }
 
   Message::Type type() const {
     switch (message_->header_type()) {
@@ -72,25 +78,23 @@ class Message::MessageImpl {
   int64_t body_length() const { return message_->bodyLength(); }
 
  private:
-  // Owns the memory this message accesses
+  // Retain reference to memory
   std::shared_ptr<Buffer> buffer_;
+  int64_t offset_;
 
   const flatbuf::Message* message_;
 };
 
-Message::Message() {}
-
-Status Message::Open(
-    const std::shared_ptr<Buffer>& buffer, std::shared_ptr<Message>* out) {
-  std::shared_ptr<Message> result(new Message());
-
-  const flatbuf::Message* message = flatbuf::GetMessage(buffer->data());
+Message::Message(const std::shared_ptr<Buffer>& buffer, int64_t offset) {
+  impl_.reset(new MessageImpl(buffer, offset));
+}
 
-  // TODO(wesm): verify message
-  result->impl_.reset(new MessageImpl(buffer, message));
-  *out = result;
+Status Message::Open(const std::shared_ptr<Buffer>& buffer, int64_t offset,
+    std::shared_ptr<Message>* out) {
+  // ctor is private
 
-  return Status::OK();
+  *out = std::shared_ptr<Message>(new Message(buffer, offset));
+  return (*out)->impl_->Open();
 }
 
 Message::Type Message::type() const {
@@ -101,20 +105,12 @@ int64_t Message::body_length() const {
   return impl_->body_length();
 }
 
-std::shared_ptr<Message> Message::get_shared_ptr() {
-  return this->shared_from_this();
-}
-
-std::shared_ptr<SchemaMessage> Message::GetSchema() {
-  return std::make_shared<SchemaMessage>(this->shared_from_this(), impl_->header());
-}
-
 // ----------------------------------------------------------------------
-// SchemaMessage
+// SchemaMetadata
 
-class SchemaMessage::SchemaMessageImpl {
+class SchemaMetadata::SchemaMetadataImpl {
  public:
-  explicit SchemaMessageImpl(const void* schema)
+  explicit SchemaMetadataImpl(const void* schema)
       : schema_(static_cast<const flatbuf::Schema*>(schema)) {}
 
   const flatbuf::Field* field(int i) const { return schema_->fields()->Get(i); }
@@ -125,22 +121,29 @@ class SchemaMessage::SchemaMessageImpl {
   const flatbuf::Schema* schema_;
 };
 
-SchemaMessage::SchemaMessage(
-    const std::shared_ptr<Message>& message, const void* schema) {
+SchemaMetadata::SchemaMetadata(
+    const std::shared_ptr<Message>& message, const void* flatbuf) {
+  message_ = message;
+  impl_.reset(new SchemaMetadataImpl(flatbuf));
+}
+
+SchemaMetadata::SchemaMetadata(const std::shared_ptr<Message>& message) {
   message_ = message;
-  impl_.reset(new SchemaMessageImpl(schema));
+  impl_.reset(new SchemaMetadataImpl(message->impl_->header()));
 }
 
-int SchemaMessage::num_fields() const {
+SchemaMetadata::~SchemaMetadata() {}
+
+int SchemaMetadata::num_fields() const {
   return impl_->num_fields();
 }
 
-Status SchemaMessage::GetField(int i, std::shared_ptr<Field>* out) const {
+Status SchemaMetadata::GetField(int i, std::shared_ptr<Field>* out) const {
   const flatbuf::Field* field = impl_->field(i);
   return FieldFromFlatbuffer(field, out);
 }
 
-Status SchemaMessage::GetSchema(std::shared_ptr<Schema>* out) const {
+Status SchemaMetadata::GetSchema(std::shared_ptr<Schema>* out) const {
   std::vector<std::shared_ptr<Field>> fields(num_fields());
   for (int i = 0; i < this->num_fields(); ++i) {
     RETURN_NOT_OK(GetField(i, &fields[i]));
@@ -150,11 +153,11 @@ Status SchemaMessage::GetSchema(std::shared_ptr<Schema>* out) const {
 }
 
 // ----------------------------------------------------------------------
-// RecordBatchMessage
+// RecordBatchMetadata
 
-class RecordBatchMessage::RecordBatchMessageImpl {
+class RecordBatchMetadata::RecordBatchMetadataImpl {
  public:
-  explicit RecordBatchMessageImpl(const void* batch)
+  explicit RecordBatchMetadataImpl(const void* batch)
       : batch_(static_cast<const flatbuf::RecordBatch*>(batch)) {
     nodes_ = batch_->nodes();
     buffers_ = batch_->buffers();
@@ -176,19 +179,29 @@ class RecordBatchMessage::RecordBatchMessageImpl {
   const flatbuffers::Vector<const flatbuf::Buffer*>* buffers_;
 };
 
-std::shared_ptr<RecordBatchMessage> Message::GetRecordBatch() {
-  return std::make_shared<RecordBatchMessage>(this->shared_from_this(), impl_->header());
+RecordBatchMetadata::RecordBatchMetadata(const std::shared_ptr<Message>& message) {
+  message_ = message;
+  impl_.reset(new RecordBatchMetadataImpl(message->impl_->header()));
 }
 
-RecordBatchMessage::RecordBatchMessage(
-    const std::shared_ptr<Message>& message, const void* batch) {
-  message_ = message;
-  impl_.reset(new RecordBatchMessageImpl(batch));
+RecordBatchMetadata::RecordBatchMetadata(
+    const std::shared_ptr<Buffer>& buffer, int64_t offset) {
+  message_ = nullptr;
+  buffer_ = buffer;
+
+  const flatbuf::RecordBatch* metadata =
+      flatbuffers::GetRoot<flatbuf::RecordBatch>(buffer->data() + offset);
+
+  // TODO(wesm): validate table
+
+  impl_.reset(new RecordBatchMetadataImpl(metadata));
 }
 
+RecordBatchMetadata::~RecordBatchMetadata() {}
+
 // TODO(wesm): Copying the flatbuffer data isn't great, but this will do for
 // now
-FieldMetadata RecordBatchMessage::field(int i) const {
+FieldMetadata RecordBatchMetadata::field(int i) const {
   const flatbuf::FieldNode* node = impl_->field(i);
 
   FieldMetadata result;
@@ -197,7 +210,7 @@ FieldMetadata RecordBatchMessage::field(int i) const {
   return result;
 }
 
-BufferMetadata RecordBatchMessage::buffer(int i) const {
+BufferMetadata RecordBatchMetadata::buffer(int i) const {
   const flatbuf::Buffer* buffer = impl_->buffer(i);
 
   BufferMetadata result;
@@ -207,15 +220,15 @@ BufferMetadata RecordBatchMessage::buffer(int i) const {
   return result;
 }
 
-int32_t RecordBatchMessage::length() const {
+int32_t RecordBatchMetadata::length() const {
   return impl_->length();
 }
 
-int RecordBatchMessage::num_buffers() const {
+int RecordBatchMetadata::num_buffers() const {
   return impl_->num_buffers();
 }
 
-int RecordBatchMessage::num_fields() const {
+int RecordBatchMetadata::num_fields() const {
   return impl_->num_fields();
 }
 
@@ -268,11 +281,13 @@ class FileFooter::FileFooterImpl {
 
   MetadataVersion::type version() const {
     switch (footer_->version()) {
-      case flatbuf::MetadataVersion_V1_SNAPSHOT:
-        return MetadataVersion::V1_SNAPSHOT;
+      case flatbuf::MetadataVersion_V1:
+        return MetadataVersion::V1;
+      case flatbuf::MetadataVersion_V2:
+        return MetadataVersion::V2;
       // Add cases as other versions become available
       default:
-        return MetadataVersion::V1_SNAPSHOT;
+        return MetadataVersion::V2;
     }
   }
 
@@ -285,7 +300,7 @@ class FileFooter::FileFooterImpl {
   }
 
   Status GetSchema(std::shared_ptr<Schema>* out) const {
-    auto schema_msg = std::make_shared<SchemaMessage>(nullptr, footer_->schema());
+    auto schema_msg = std::make_shared<SchemaMetadata>(nullptr, footer_->schema());
     return schema_msg->GetSchema(out);
   }
 

http://git-wip-us.apache.org/repos/asf/arrow/blob/e3c167bd/cpp/src/arrow/ipc/metadata.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/metadata.h b/cpp/src/arrow/ipc/metadata.h
index 2f0e853..1c4ef64 100644
--- a/cpp/src/arrow/ipc/metadata.h
+++ b/cpp/src/arrow/ipc/metadata.h
@@ -42,7 +42,7 @@ class OutputStream;
 namespace ipc {
 
 struct MetadataVersion {
-  enum type { V1_SNAPSHOT };
+  enum type { V1, V2 };
 };
 
 //----------------------------------------------------------------------
@@ -58,10 +58,14 @@ Status WriteSchema(const Schema* schema, std::shared_ptr<Buffer>* out);
 class Message;
 
 // Container for serialized Schema metadata contained in an IPC message
-class ARROW_EXPORT SchemaMessage {
+class ARROW_EXPORT SchemaMetadata {
  public:
+  explicit SchemaMetadata(const std::shared_ptr<Message>& message);
+
   // Accepts an opaque flatbuffer pointer
-  SchemaMessage(const std::shared_ptr<Message>& message, const void* schema);
+  SchemaMetadata(const std::shared_ptr<Message>& message, const void* schema);
+
+  ~SchemaMetadata();
 
   int num_fields() const;
 
@@ -76,8 +80,8 @@ class ARROW_EXPORT SchemaMessage {
   // Parent, owns the flatbuffer data
   std::shared_ptr<Message> message_;
 
-  class SchemaMessageImpl;
-  std::unique_ptr<SchemaMessageImpl> impl_;
+  class SchemaMetadataImpl;
+  std::unique_ptr<SchemaMetadataImpl> impl_;
 };
 
 // Field metadata
@@ -93,10 +97,13 @@ struct BufferMetadata {
 };
 
 // Container for serialized record batch metadata contained in an IPC message
-class ARROW_EXPORT RecordBatchMessage {
+class ARROW_EXPORT RecordBatchMetadata {
  public:
-  // Accepts an opaque flatbuffer pointer
-  RecordBatchMessage(const std::shared_ptr<Message>& message, const void* batch_meta);
+  explicit RecordBatchMetadata(const std::shared_ptr<Message>& message);
+
+  RecordBatchMetadata(const std::shared_ptr<Buffer>& message, int64_t offset);
+
+  ~RecordBatchMetadata();
 
   FieldMetadata field(int i) const;
   BufferMetadata buffer(int i) const;
@@ -108,37 +115,34 @@ class ARROW_EXPORT RecordBatchMessage {
  private:
   // Parent, owns the flatbuffer data
   std::shared_ptr<Message> message_;
+  std::shared_ptr<Buffer> buffer_;
 
-  class RecordBatchMessageImpl;
-  std::unique_ptr<RecordBatchMessageImpl> impl_;
+  class RecordBatchMetadataImpl;
+  std::unique_ptr<RecordBatchMetadataImpl> impl_;
 };
 
-class ARROW_EXPORT DictionaryBatchMessage {
+class ARROW_EXPORT DictionaryBatchMetadata {
  public:
   int64_t id() const;
-  std::unique_ptr<RecordBatchMessage> data() const;
+  std::unique_ptr<RecordBatchMetadata> data() const;
 };
 
-class ARROW_EXPORT Message : public std::enable_shared_from_this<Message> {
+class ARROW_EXPORT Message {
  public:
   enum Type { NONE, SCHEMA, DICTIONARY_BATCH, RECORD_BATCH };
 
-  static Status Open(
-      const std::shared_ptr<Buffer>& buffer, std::shared_ptr<Message>* out);
-
-  std::shared_ptr<Message> get_shared_ptr();
+  static Status Open(const std::shared_ptr<Buffer>& buffer, int64_t offset,
+      std::shared_ptr<Message>* out);
 
   int64_t body_length() const;
 
   Type type() const;
 
-  // These methods only to be invoked if you have checked the message type
-  std::shared_ptr<SchemaMessage> GetSchema();
-  std::shared_ptr<RecordBatchMessage> GetRecordBatch();
-  std::shared_ptr<DictionaryBatchMessage> GetDictionaryBatch();
-
  private:
-  Message();
+  Message(const std::shared_ptr<Buffer>& buffer, int64_t offset);
+
+  friend class RecordBatchMetadata;
+  friend class SchemaMetadata;
 
   // Hide serialization details from user API
   class MessageImpl;

http://git-wip-us.apache.org/repos/asf/arrow/blob/e3c167bd/cpp/src/arrow/ipc/test-common.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/test-common.h b/cpp/src/arrow/ipc/test-common.h
index 9abc20d..65b3782 100644
--- a/cpp/src/arrow/ipc/test-common.h
+++ b/cpp/src/arrow/ipc/test-common.h
@@ -39,8 +39,7 @@
 namespace arrow {
 namespace ipc {
 
-const auto kInt32 = std::make_shared<Int32Type>();
-const auto kListInt32 = list(kInt32);
+const auto kListInt32 = list(int32());
 const auto kListListInt32 = list(kListInt32);
 
 Status MakeRandomInt32Array(
@@ -99,8 +98,8 @@ Status MakeIntRecordBatch(std::shared_ptr<RecordBatch>* out) {
   const int length = 1000;
 
   // Make the schema
-  auto f0 = std::make_shared<Field>("f0", kInt32);
-  auto f1 = std::make_shared<Field>("f1", kInt32);
+  auto f0 = std::make_shared<Field>("f0", int32());
+  auto f1 = std::make_shared<Field>("f1", int32());
   std::shared_ptr<Schema> schema(new Schema({f0, f1}));
 
   // Example data
@@ -161,7 +160,7 @@ Status MakeListRecordBatch(std::shared_ptr<RecordBatch>* out) {
   // Make the schema
   auto f0 = std::make_shared<Field>("f0", kListInt32);
   auto f1 = std::make_shared<Field>("f1", kListListInt32);
-  auto f2 = std::make_shared<Field>("f2", kInt32);
+  auto f2 = std::make_shared<Field>("f2", int32());
   std::shared_ptr<Schema> schema(new Schema({f0, f1, f2}));
 
   // Example data
@@ -184,7 +183,7 @@ Status MakeZeroLengthRecordBatch(std::shared_ptr<RecordBatch>* out) {
   // Make the schema
   auto f0 = std::make_shared<Field>("f0", kListInt32);
   auto f1 = std::make_shared<Field>("f1", kListListInt32);
-  auto f2 = std::make_shared<Field>("f2", kInt32);
+  auto f2 = std::make_shared<Field>("f2", int32());
   std::shared_ptr<Schema> schema(new Schema({f0, f1, f2}));
 
   // Example data
@@ -205,7 +204,7 @@ Status MakeNonNullRecordBatch(std::shared_ptr<RecordBatch>* out) {
   // Make the schema
   auto f0 = std::make_shared<Field>("f0", kListInt32);
   auto f1 = std::make_shared<Field>("f1", kListListInt32);
-  auto f2 = std::make_shared<Field>("f2", kInt32);
+  auto f2 = std::make_shared<Field>("f2", int32());
   std::shared_ptr<Schema> schema(new Schema({f0, f1, f2}));
 
   // Example data
@@ -226,7 +225,7 @@ Status MakeNonNullRecordBatch(std::shared_ptr<RecordBatch>* out) {
 
 Status MakeDeeplyNestedList(std::shared_ptr<RecordBatch>* out) {
   const int batch_length = 5;
-  TypePtr type = kInt32;
+  TypePtr type = int32();
 
   MemoryPool* pool = default_memory_pool();
   ArrayPtr array;

http://git-wip-us.apache.org/repos/asf/arrow/blob/e3c167bd/cpp/src/arrow/ipc/util.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/util.h b/cpp/src/arrow/ipc/util.h
index 9000d1b..242d662 100644
--- a/cpp/src/arrow/ipc/util.h
+++ b/cpp/src/arrow/ipc/util.h
@@ -28,12 +28,10 @@ namespace arrow {
 namespace ipc {
 
 // Align on 8-byte boundaries
-static constexpr int kArrowAlignment = 8;
-
 // Buffers are padded to 64-byte boundaries (for SIMD)
-static constexpr int kArrowBufferAlignment = 64;
+static constexpr int kArrowAlignment = 64;
 
-static constexpr uint8_t kPaddingBytes[kArrowBufferAlignment] = {0};
+static constexpr uint8_t kPaddingBytes[kArrowAlignment] = {0};
 
 static inline int64_t PaddedLength(int64_t nbytes, int64_t alignment = kArrowAlignment) {
   return ((nbytes + alignment - 1) / alignment) * alignment;

http://git-wip-us.apache.org/repos/asf/arrow/blob/e3c167bd/cpp/src/arrow/test-util.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/test-util.h b/cpp/src/arrow/test-util.h
index 93dd5b6..63c2166 100644
--- a/cpp/src/arrow/test-util.h
+++ b/cpp/src/arrow/test-util.h
@@ -61,10 +61,10 @@
 
 // Alias MSVC popcount to GCC name
 #ifdef _MSC_VER
-#  include <intrin.h>
-#  define __builtin_popcount __popcnt
-#  include <nmmintrin.h>
-#  define __builtin_popcountll _mm_popcnt_u64
+#include <intrin.h>
+#define __builtin_popcount __popcnt
+#include <nmmintrin.h>
+#define __builtin_popcountll _mm_popcnt_u64
 #endif
 
 namespace arrow {

http://git-wip-us.apache.org/repos/asf/arrow/blob/e3c167bd/cpp/src/arrow/type.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/type.cc b/cpp/src/arrow/type.cc
index 589bdad..80f295c 100644
--- a/cpp/src/arrow/type.cc
+++ b/cpp/src/arrow/type.cc
@@ -105,10 +105,6 @@ std::string UnionType::ToString() const {
   return s.str();
 }
 
-int NullType::bit_width() const {
-  return 0;
-}
-
 std::string NullType::ToString() const {
   return name();
 }
@@ -187,4 +183,46 @@ std::shared_ptr<Field> field(
   return std::make_shared<Field>(name, type, nullable, dictionary);
 }
 
+static const BufferDescr kValidityBuffer(BufferType::VALIDITY, 1);
+static const BufferDescr kOffsetBuffer(BufferType::OFFSET, 32);
+static const BufferDescr kTypeBuffer(BufferType::TYPE, 32);
+static const BufferDescr kBooleanBuffer(BufferType::DATA, 1);
+static const BufferDescr kValues64(BufferType::DATA, 64);
+static const BufferDescr kValues32(BufferType::DATA, 32);
+static const BufferDescr kValues16(BufferType::DATA, 16);
+static const BufferDescr kValues8(BufferType::DATA, 8);
+
+std::vector<BufferDescr> FixedWidthType::GetBufferLayout() const {
+  return {kValidityBuffer, BufferDescr(BufferType::DATA, bit_width())};
+}
+
+std::vector<BufferDescr> NullType::GetBufferLayout() const {
+  return {};
+}
+
+std::vector<BufferDescr> BinaryType::GetBufferLayout() const {
+  return {kValidityBuffer, kOffsetBuffer, kValues8};
+}
+
+std::vector<BufferDescr> ListType::GetBufferLayout() const {
+  return {kValidityBuffer, kOffsetBuffer};
+}
+
+std::vector<BufferDescr> StructType::GetBufferLayout() const {
+  return {kValidityBuffer, kTypeBuffer};
+}
+
+std::vector<BufferDescr> UnionType::GetBufferLayout() const {
+  if (mode == UnionMode::SPARSE) {
+    return {kValidityBuffer, kTypeBuffer};
+  } else {
+    return {kValidityBuffer, kTypeBuffer, kOffsetBuffer};
+  }
+}
+
+std::vector<BufferDescr> DecimalType::GetBufferLayout() const {
+  // TODO(wesm)
+  return {};
+}
+
 }  // namespace arrow

http://git-wip-us.apache.org/repos/asf/arrow/blob/e3c167bd/cpp/src/arrow/type.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/type.h b/cpp/src/arrow/type.h
index 876d7ea..3077738 100644
--- a/cpp/src/arrow/type.h
+++ b/cpp/src/arrow/type.h
@@ -101,6 +101,20 @@ struct Type {
   };
 };
 
+enum class BufferType : char { DATA, OFFSET, TYPE, VALIDITY };
+
+class BufferDescr {
+ public:
+  BufferDescr(BufferType type, int bit_width) : type_(type), bit_width_(bit_width) {}
+
+  BufferType type() const { return type_; }
+  int bit_width() const { return bit_width_; }
+
+ private:
+  BufferType type_;
+  int bit_width_;
+};
+
 struct ARROW_EXPORT DataType {
   Type::type type;
 
@@ -129,12 +143,18 @@ struct ARROW_EXPORT DataType {
   virtual Status Accept(TypeVisitor* visitor) const = 0;
 
   virtual std::string ToString() const = 0;
+
+  virtual std::vector<BufferDescr> GetBufferLayout() const = 0;
 };
 
 typedef std::shared_ptr<DataType> TypePtr;
 
-struct ARROW_EXPORT FixedWidthMeta {
+struct ARROW_EXPORT FixedWidthType : public DataType {
+  using DataType::DataType;
+
   virtual int bit_width() const = 0;
+
+  std::vector<BufferDescr> GetBufferLayout() const override;
 };
 
 struct ARROW_EXPORT IntegerMeta {
@@ -184,12 +204,12 @@ struct ARROW_EXPORT Field {
 };
 typedef std::shared_ptr<Field> FieldPtr;
 
-struct ARROW_EXPORT PrimitiveCType : public DataType {
-  using DataType::DataType;
+struct ARROW_EXPORT PrimitiveCType : public FixedWidthType {
+  using FixedWidthType::FixedWidthType;
 };
 
 template <typename DERIVED, Type::type TYPE_ID, typename C_TYPE>
-struct ARROW_EXPORT CTypeImpl : public PrimitiveCType, public FixedWidthMeta {
+struct ARROW_EXPORT CTypeImpl : public PrimitiveCType {
   using c_type = C_TYPE;
   static constexpr Type::type type_id = TYPE_ID;
 
@@ -204,16 +224,17 @@ struct ARROW_EXPORT CTypeImpl : public PrimitiveCType, public FixedWidthMeta {
   std::string ToString() const override { return std::string(DERIVED::name()); }
 };
 
-struct ARROW_EXPORT NullType : public DataType, public FixedWidthMeta {
+struct ARROW_EXPORT NullType : public DataType {
   static constexpr Type::type type_id = Type::NA;
 
   NullType() : DataType(Type::NA) {}
 
-  int bit_width() const override;
   Status Accept(TypeVisitor* visitor) const override;
   std::string ToString() const override;
 
   static std::string name() { return "null"; }
+
+  std::vector<BufferDescr> GetBufferLayout() const override;
 };
 
 template <typename DERIVED, Type::type TYPE_ID, typename C_TYPE>
@@ -221,10 +242,10 @@ struct IntegerTypeImpl : public CTypeImpl<DERIVED, TYPE_ID, C_TYPE>, public Inte
   bool is_signed() const override { return std::is_signed<C_TYPE>::value; }
 };
 
-struct ARROW_EXPORT BooleanType : public DataType, FixedWidthMeta {
+struct ARROW_EXPORT BooleanType : public FixedWidthType {
   static constexpr Type::type type_id = Type::BOOL;
 
-  BooleanType() : DataType(Type::BOOL) {}
+  BooleanType() : FixedWidthType(Type::BOOL) {}
 
   Status Accept(TypeVisitor* visitor) const override;
   std::string ToString() const override;
@@ -306,6 +327,8 @@ struct ARROW_EXPORT ListType : public DataType, public NoExtraMeta {
   std::string ToString() const override;
 
   static std::string name() { return "list"; }
+
+  std::vector<BufferDescr> GetBufferLayout() const override;
 };
 
 // BinaryType type is reprsents lists of 1-byte values.
@@ -318,6 +341,8 @@ struct ARROW_EXPORT BinaryType : public DataType, public NoExtraMeta {
   std::string ToString() const override;
   static std::string name() { return "binary"; }
 
+  std::vector<BufferDescr> GetBufferLayout() const override;
+
  protected:
   // Allow subclasses to change the logical type.
   explicit BinaryType(Type::type logical_type) : DataType(logical_type) {}
@@ -345,6 +370,8 @@ struct ARROW_EXPORT StructType : public DataType, public NoExtraMeta {
   Status Accept(TypeVisitor* visitor) const override;
   std::string ToString() const override;
   static std::string name() { return "struct"; }
+
+  std::vector<BufferDescr> GetBufferLayout() const override;
 };
 
 struct ARROW_EXPORT DecimalType : public DataType {
@@ -358,6 +385,8 @@ struct ARROW_EXPORT DecimalType : public DataType {
   Status Accept(TypeVisitor* visitor) const override;
   std::string ToString() const override;
   static std::string name() { return "decimal"; }
+
+  std::vector<BufferDescr> GetBufferLayout() const override;
 };
 
 enum class UnionMode : char { SPARSE, DENSE };
@@ -375,14 +404,20 @@ struct ARROW_EXPORT UnionType : public DataType {
   static std::string name() { return "union"; }
   Status Accept(TypeVisitor* visitor) const override;
 
+  std::vector<BufferDescr> GetBufferLayout() const override;
+
   UnionMode mode;
   std::vector<uint8_t> type_ids;
 };
 
-struct ARROW_EXPORT DateType : public DataType, public NoExtraMeta {
+struct ARROW_EXPORT DateType : public FixedWidthType {
   static constexpr Type::type type_id = Type::DATE;
 
-  DateType() : DataType(Type::DATE) {}
+  using c_type = int32_t;
+
+  DateType() : FixedWidthType(Type::DATE) {}
+
+  int bit_width() const override { return sizeof(c_type) * 8; }
 
   Status Accept(TypeVisitor* visitor) const override;
   std::string ToString() const override { return name(); }
@@ -391,13 +426,17 @@ struct ARROW_EXPORT DateType : public DataType, public NoExtraMeta {
 
 enum class TimeUnit : char { SECOND = 0, MILLI = 1, MICRO = 2, NANO = 3 };
 
-struct ARROW_EXPORT TimeType : public DataType {
+struct ARROW_EXPORT TimeType : public FixedWidthType {
   static constexpr Type::type type_id = Type::TIME;
   using Unit = TimeUnit;
+  using c_type = int64_t;
 
   TimeUnit unit;
 
-  explicit TimeType(TimeUnit unit = TimeUnit::MILLI) : DataType(Type::TIME), unit(unit) {}
+  int bit_width() const override { return sizeof(c_type) * 8; }
+
+  explicit TimeType(TimeUnit unit = TimeUnit::MILLI)
+      : FixedWidthType(Type::TIME), unit(unit) {}
   TimeType(const TimeType& other) : TimeType(other.unit) {}
 
   Status Accept(TypeVisitor* visitor) const override;
@@ -405,7 +444,7 @@ struct ARROW_EXPORT TimeType : public DataType {
   static std::string name() { return "time"; }
 };
 
-struct ARROW_EXPORT TimestampType : public DataType, public FixedWidthMeta {
+struct ARROW_EXPORT TimestampType : public FixedWidthType {
   using Unit = TimeUnit;
 
   typedef int64_t c_type;
@@ -416,7 +455,7 @@ struct ARROW_EXPORT TimestampType : public DataType, public FixedWidthMeta {
   TimeUnit unit;
 
   explicit TimestampType(TimeUnit unit = TimeUnit::MILLI)
-      : DataType(Type::TIMESTAMP), unit(unit) {}
+      : FixedWidthType(Type::TIMESTAMP), unit(unit) {}
 
   TimestampType(const TimestampType& other) : TimestampType(other.unit) {}
 
@@ -425,10 +464,10 @@ struct ARROW_EXPORT TimestampType : public DataType, public FixedWidthMeta {
   static std::string name() { return "timestamp"; }
 };
 
-struct ARROW_EXPORT IntervalType : public DataType, public FixedWidthMeta {
+struct ARROW_EXPORT IntervalType : public FixedWidthType {
   enum class Unit : char { YEAR_MONTH = 0, DAY_TIME = 1 };
 
-  typedef int64_t c_type;
+  using c_type = int64_t;
   static constexpr Type::type type_id = Type::INTERVAL;
 
   int bit_width() const override { return sizeof(int64_t) * 8; }
@@ -436,7 +475,7 @@ struct ARROW_EXPORT IntervalType : public DataType, public FixedWidthMeta {
   Unit unit;
 
   explicit IntervalType(Unit unit = Unit::YEAR_MONTH)
-      : DataType(Type::INTERVAL), unit(unit) {}
+      : FixedWidthType(Type::INTERVAL), unit(unit) {}
 
   IntervalType(const IntervalType& other) : IntervalType(other.unit) {}
 

http://git-wip-us.apache.org/repos/asf/arrow/blob/e3c167bd/cpp/src/arrow/types/primitive.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/types/primitive.cc b/cpp/src/arrow/types/primitive.cc
index 14667ee..f42a3ca 100644
--- a/cpp/src/arrow/types/primitive.cc
+++ b/cpp/src/arrow/types/primitive.cc
@@ -49,7 +49,7 @@ bool PrimitiveArray::EqualsExact(const PrimitiveArray& other) const {
     const uint8_t* this_data = raw_data_;
     const uint8_t* other_data = other.raw_data_;
 
-    auto size_meta = dynamic_cast<const FixedWidthMeta*>(type_.get());
+    auto size_meta = dynamic_cast<const FixedWidthType*>(type_.get());
     int value_byte_size = size_meta->bit_width() / 8;
     DCHECK_GT(value_byte_size, 0);
 

http://git-wip-us.apache.org/repos/asf/arrow/blob/e3c167bd/cpp/src/arrow/util/bit-util.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/util/bit-util.h b/cpp/src/arrow/util/bit-util.h
index 13b7e19..5c8055f 100644
--- a/cpp/src/arrow/util/bit-util.h
+++ b/cpp/src/arrow/util/bit-util.h
@@ -78,6 +78,10 @@ static inline bool IsMultipleOf64(int64_t n) {
   return (n & 63) == 0;
 }
 
+static inline bool IsMultipleOf8(int64_t n) {
+  return (n & 7) == 0;
+}
+
 inline int64_t RoundUpToMultipleOf64(int64_t num) {
   // TODO(wesm): is this definitely needed?
   // DCHECK_GE(num, 0);

http://git-wip-us.apache.org/repos/asf/arrow/blob/e3c167bd/dev/release/run-rat.sh
----------------------------------------------------------------------
diff --git a/dev/release/run-rat.sh b/dev/release/run-rat.sh
index d8ec650..e26dd58 100755
--- a/dev/release/run-rat.sh
+++ b/dev/release/run-rat.sh
@@ -28,6 +28,7 @@ $RAT $1 \
   -e ".*" \
   -e mman.h \
   -e "*_generated.h" \
+  -e "*.json" \
   -e random.h \
   -e status.cc \
   -e status.h \
@@ -49,5 +50,3 @@ else
   echo "${UNAPPROVED} unapproved licences. Check rat report: rat.txt"
   exit 1
 fi
-
-