You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2016/11/29 02:29:27 UTC

[1/2] arrow git commit: ARROW-363: [Java/C++] integration testing harness, initial integration tests

Repository: arrow
Updated Branches:
  refs/heads/master 86f56a607 -> e3c167bd1


http://git-wip-us.apache.org/repos/asf/arrow/blob/e3c167bd/format/IPC.md
----------------------------------------------------------------------
diff --git a/format/IPC.md b/format/IPC.md
index 3f78126..a55dcdf 100644
--- a/format/IPC.md
+++ b/format/IPC.md
@@ -15,3 +15,109 @@
 # Interprocess messaging / communication (IPC)
 
 ## File format
+
+We define a self-contained "file format" containing an Arrow schema along with
+one or more record batches defining a dataset. See [format/File.fbs][1] for the
+precise details of the file metadata.
+
+In general, the file looks like:
+
+```
+<magic number "ARROW1">
+<empty padding bytes [to 64 byte boundary]>
+<DICTIONARY 0>
+...
+<DICTIONARY k - 1>
+<RECORD BATCH 0>
+...
+<RECORD BATCH n - 1>
+<METADATA org.apache.arrow.flatbuf.Footer>
+<metadata_size: int32>
+<magic number "ARROW1">
+```
+
+See the File.fbs document for details about the Flatbuffers metadata. The
+record batches have a particular structure, defined next.
+
+### Record batches
+
+The record batch metadata is written as a flatbuffer (see
+[format/Message.fbs][2] -- the RecordBatch message type) prefixed by its size,
+followed by each of the memory buffers in the batch written end to end (with
+appropriate alignment and padding):
+
+```
+<int32: metadata flatbuffer size>
+<metadata: org.apache.arrow.flatbuf.RecordBatch>
+<padding bytes [to 64-byte boundary]>
+<body: buffers end to end>
+```
+
+The `RecordBatch` metadata contains a depth-first (pre-order) flattened set of
+field metadata and physical memory buffers (some comments from [Message.fbs][2]
+have been shortened / removed):
+
+```
+table RecordBatch {
+  length: int;
+  nodes: [FieldNode];
+  buffers: [Buffer];
+}
+
+struct FieldNode {
+  /// The number of value slots in the Arrow array at this level of a nested
+  /// tree
+  length: int;
+
+  /// The number of observed nulls. Fields with null_count == 0 may choose not
+  /// to write their physical validity bitmap out as a materialized buffer,
+  /// instead setting the length of the bitmap buffer to 0.
+  null_count: int;
+}
+
+struct Buffer {
+  /// The shared memory page id where this buffer is located. Currently this is
+  /// not used
+  page: int;
+
+  /// The relative offset into the shared memory page where the bytes for this
+  /// buffer starts
+  offset: long;
+
+  /// The absolute length (in bytes) of the memory buffer. The memory is found
+  /// from offset (inclusive) to offset + length (non-inclusive).
+  length: long;
+}
+```
+
+In the context of a file, the `page` is not used, and the `Buffer` offsets use
+as a frame of reference the start of the segment where they are written in the
+file. So, while in a general IPC setting these offsets may be anyplace in one
+or more shared memory regions, in the file format the offsets start from 0.
+
+The location of a record batch and the size of the metadata block as well as
+the body of buffers is stored in the file footer:
+
+```
+struct Block {
+  offset: long;
+  metaDataLength: int;
+  bodyLength: long;
+}
+```
+
+Some notes about this
+
+* The `Block` offset indicates the starting byte of the record batch.
+* The metadata length includes the flatbuffer size, the record batch metadata
+  flatbuffer, and any padding bytes
+
+
+### Dictionary batches
+
+Dictionary batches have not yet been implemented, while they are provided for
+in the metadata. For the time being, the `DICTIONARY` segments shown above in
+the file do not appear in any of the file implementations.
+
+[1]: https://github.com/apache/arrow/blob/master/format/File.fbs
+[1]: https://github.com/apache/arrow/blob/master/format/Message.fbs
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/arrow/blob/e3c167bd/format/Message.fbs
----------------------------------------------------------------------
diff --git a/format/Message.fbs b/format/Message.fbs
index 2ec9fd1..d07d066 100644
--- a/format/Message.fbs
+++ b/format/Message.fbs
@@ -18,7 +18,8 @@
 namespace org.apache.arrow.flatbuf;
 
 enum MetadataVersion:short {
-  V1_SNAPSHOT
+  V1,
+  V2
 }
 
 /// ----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/arrow/blob/e3c167bd/integration/data/simple.json
----------------------------------------------------------------------
diff --git a/integration/data/simple.json b/integration/data/simple.json
new file mode 100644
index 0000000..a91b405
--- /dev/null
+++ b/integration/data/simple.json
@@ -0,0 +1,66 @@
+{
+  "schema": {
+    "fields": [
+      {
+        "name": "foo",
+        "type": {"name": "int", "isSigned": true, "bitWidth": 32},
+        "nullable": true, "children": [],
+        "typeLayout": {
+          "vectors": [
+            {"type": "VALIDITY", "typeBitWidth": 1},
+            {"type": "DATA", "typeBitWidth": 32}
+          ]
+        }
+      },
+      {
+        "name": "bar",
+        "type": {"name": "floatingpoint", "precision": "DOUBLE"},
+        "nullable": true, "children": [],
+        "typeLayout": {
+          "vectors": [
+            {"type": "VALIDITY", "typeBitWidth": 1},
+            {"type": "DATA", "typeBitWidth": 64}
+          ]
+        }
+      },
+      {
+        "name": "baz",
+        "type": {"name": "utf8"},
+        "nullable": true, "children": [],
+        "typeLayout": {
+          "vectors": [
+            {"type": "VALIDITY", "typeBitWidth": 1},
+            {"type": "OFFSET", "typeBitWidth": 32},
+            {"type": "DATA", "typeBitWidth": 64}
+          ]
+        }
+      }
+    ]
+  },
+  "batches": [
+    {
+      "count": 5,
+      "columns": [
+        {
+          "name": "foo",
+          "count": 5,
+          "VALIDITY": [1, 0, 1, 1, 1],
+          "DATA": [1, 2, 3, 4, 5]
+        },
+        {
+          "name": "bar",
+          "count": 5,
+          "VALIDITY": [1, 0, 0, 1, 1],
+          "DATA": [1.0, 2.0, 3.0, 4.0, 5.0]
+        },
+        {
+          "name": "baz",
+          "count": 5,
+          "VALIDITY": [1, 0, 0, 1, 1],
+          "OFFSET": [0, 2, 2, 2, 5, 9],
+          "DATA": ["aa", "", "", "bbb", "cccc"]
+        }
+      ]
+    }
+  ]
+}

http://git-wip-us.apache.org/repos/asf/arrow/blob/e3c167bd/integration/integration_test.py
----------------------------------------------------------------------
diff --git a/integration/integration_test.py b/integration/integration_test.py
new file mode 100644
index 0000000..6ea634d
--- /dev/null
+++ b/integration/integration_test.py
@@ -0,0 +1,177 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import argparse
+import glob
+import itertools
+import os
+import six
+import subprocess
+import tempfile
+import uuid
+
+
+ARROW_HOME = os.path.abspath(__file__).rsplit("/", 2)[0]
+
+
+def guid():
+    return uuid.uuid4().hex
+
+
+def run_cmd(cmd):
+    if isinstance(cmd, six.string_types):
+        cmd = cmd.split(' ')
+
+    try:
+        output = subprocess.check_output(cmd, stderr=subprocess.STDOUT)
+    except subprocess.CalledProcessError as e:
+        # this avoids hiding the stdout / stderr of failed processes
+        print('Command failed: %s' % ' '.join(cmd))
+        print('With output:')
+        print('--------------')
+        print(e.output)
+        print('--------------')
+        raise e
+
+    if isinstance(output, six.binary_type):
+        output = output.decode('utf-8')
+    return output
+
+
+class IntegrationRunner(object):
+
+    def __init__(self, json_files, testers, debug=False):
+        self.json_files = json_files
+        self.testers = testers
+        self.temp_dir = tempfile.mkdtemp()
+        self.debug = debug
+
+    def run(self):
+        for producer, consumer in itertools.product(self.testers,
+                                                    self.testers):
+            if producer is consumer:
+                continue
+
+            print('-- {0} producing, {1} consuming'.format(producer.name,
+                                                           consumer.name))
+
+            for json_path in self.json_files:
+                print('Testing with {0}'.format(json_path))
+
+                arrow_path = os.path.join(self.temp_dir, guid())
+
+                producer.json_to_arrow(json_path, arrow_path)
+                consumer.validate(json_path, arrow_path)
+
+
+class Tester(object):
+
+    def __init__(self, debug=False):
+        self.debug = debug
+
+    def json_to_arrow(self, json_path, arrow_path):
+        raise NotImplementedError
+
+    def validate(self, json_path, arrow_path):
+        raise NotImplementedError
+
+
+class JavaTester(Tester):
+
+    ARROW_TOOLS_JAR = os.path.join(ARROW_HOME,
+                                   'java/tools/target/arrow-tools-0.1.1-'
+                                   'SNAPSHOT-jar-with-dependencies.jar')
+
+    name = 'Java'
+
+    def _run(self, arrow_path=None, json_path=None, command='VALIDATE'):
+        cmd = ['java', '-cp', self.ARROW_TOOLS_JAR,
+               'org.apache.arrow.tools.Integration']
+
+        if arrow_path is not None:
+            cmd.extend(['-a', arrow_path])
+
+        if json_path is not None:
+            cmd.extend(['-j', json_path])
+
+        cmd.extend(['-c', command])
+
+        if self.debug:
+            print(' '.join(cmd))
+
+        return run_cmd(cmd)
+
+    def validate(self, json_path, arrow_path):
+        return self._run(arrow_path, json_path, 'VALIDATE')
+
+    def json_to_arrow(self, json_path, arrow_path):
+        return self._run(arrow_path, json_path, 'JSON_TO_ARROW')
+
+
+class CPPTester(Tester):
+
+    CPP_INTEGRATION_EXE = os.environ.get(
+        'ARROW_CPP_TESTER',
+        os.path.join(ARROW_HOME,
+                     'cpp/test-build/debug/json-integration-test'))
+
+    name = 'C++'
+
+    def _run(self, arrow_path=None, json_path=None, command='VALIDATE'):
+        cmd = [self.CPP_INTEGRATION_EXE, '--integration']
+
+        if arrow_path is not None:
+            cmd.append('--arrow=' + arrow_path)
+
+        if json_path is not None:
+            cmd.append('--json=' + json_path)
+
+        cmd.append('--mode=' + command)
+
+        if self.debug:
+            print(' '.join(cmd))
+
+        return run_cmd(cmd)
+
+    def validate(self, json_path, arrow_path):
+        return self._run(arrow_path, json_path, 'VALIDATE')
+
+    def json_to_arrow(self, json_path, arrow_path):
+        return self._run(arrow_path, json_path, 'JSON_TO_ARROW')
+
+
+def get_json_files():
+    glob_pattern = os.path.join(ARROW_HOME, 'integration', 'data', '*.json')
+    return glob.glob(glob_pattern)
+
+
+def run_all_tests(debug=False):
+    testers = [JavaTester(debug=debug), CPPTester(debug=debug)]
+    json_files = get_json_files()
+
+    runner = IntegrationRunner(json_files, testers, debug=debug)
+    runner.run()
+
+
+if __name__ == '__main__':
+    parser = argparse.ArgumentParser(description='Arrow integration test CLI')
+    parser.add_argument('--debug', dest='debug', action='store_true',
+                        default=False,
+                        help='Run executables in debug mode as relevant')
+
+    args = parser.parse_args()
+    run_all_tests(debug=args.debug)

http://git-wip-us.apache.org/repos/asf/arrow/blob/e3c167bd/java/pom.xml
----------------------------------------------------------------------
diff --git a/java/pom.xml b/java/pom.xml
index 7221a14..a147d66 100644
--- a/java/pom.xml
+++ b/java/pom.xml
@@ -24,7 +24,7 @@
   <packaging>pom</packaging>
 
   <name>Apache Arrow Java Root POM</name>
-  <description>Apache arrow is an open source, low latency SQL query engine for Hadoop and NoSQL.</description>
+  <description>Apache Arrow is open source, in-memory columnar data structures and low-overhead messaging</description>
   <url>http://arrow.apache.org/</url>
 
   <properties>
@@ -442,8 +442,8 @@
       <scope>test</scope>
     </dependency>
     <dependency>
-      <!-- Mockito needs to be on the class path after JUnit (or Hamcrest) as 
-           long as Mockito _contains_ older Hamcrest classes.  See arrow-2130. --> 
+      <!-- Mockito needs to be on the class path after JUnit (or Hamcrest) as
+           long as Mockito _contains_ older Hamcrest classes.  See arrow-2130. -->
       <groupId>org.mockito</groupId>
       <artifactId>mockito-core</artifactId>
       <version>1.9.5</version>

http://git-wip-us.apache.org/repos/asf/arrow/blob/e3c167bd/java/tools/pom.xml
----------------------------------------------------------------------
diff --git a/java/tools/pom.xml b/java/tools/pom.xml
index 84b0b5e..ef96328 100644
--- a/java/tools/pom.xml
+++ b/java/tools/pom.xml
@@ -45,6 +45,12 @@
             <artifactId>commons-cli</artifactId>
             <version>1.2</version>
         </dependency>
+        <dependency>
+          <groupId>ch.qos.logback</groupId>
+          <artifactId>logback-classic</artifactId>
+          <version>1.0.13</version>
+          <scope>run</scope>
+        </dependency>
     </dependencies>
 
     <build>

http://git-wip-us.apache.org/repos/asf/arrow/blob/e3c167bd/java/tools/src/main/java/org/apache/arrow/tools/Integration.java
----------------------------------------------------------------------
diff --git a/java/tools/src/main/java/org/apache/arrow/tools/Integration.java b/java/tools/src/main/java/org/apache/arrow/tools/Integration.java
index 29f0ee2..fa4bedc 100644
--- a/java/tools/src/main/java/org/apache/arrow/tools/Integration.java
+++ b/java/tools/src/main/java/org/apache/arrow/tools/Integration.java
@@ -220,6 +220,7 @@ public class Integration {
 
   private static void fatalError(String message, Throwable e) {
     System.err.println(message);
+    System.err.println(e.getMessage());
     LOGGER.error(message, e);
     System.exit(1);
   }

http://git-wip-us.apache.org/repos/asf/arrow/blob/e3c167bd/java/vector/src/main/java/org/apache/arrow/vector/VectorLoader.java
----------------------------------------------------------------------
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/VectorLoader.java b/java/vector/src/main/java/org/apache/arrow/vector/VectorLoader.java
index 4afd823..c5d642e 100644
--- a/java/vector/src/main/java/org/apache/arrow/vector/VectorLoader.java
+++ b/java/vector/src/main/java/org/apache/arrow/vector/VectorLoader.java
@@ -81,7 +81,9 @@ public class VectorLoader {
     try {
       vector.loadFieldBuffers(fieldNode, ownBuffers);
     } catch (RuntimeException e) {
-      throw new IllegalArgumentException("Could not load buffers for field " + field, e);
+      e.printStackTrace();
+      throw new IllegalArgumentException("Could not load buffers for field " +
+              field + " error message" + e.getMessage(), e);
     }
     List<Field> children = field.getChildren();
     if (children.size() > 0) {

http://git-wip-us.apache.org/repos/asf/arrow/blob/e3c167bd/java/vector/src/main/java/org/apache/arrow/vector/file/ArrowReader.java
----------------------------------------------------------------------
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/file/ArrowReader.java b/java/vector/src/main/java/org/apache/arrow/vector/file/ArrowReader.java
index bbcd3e9..cd520da 100644
--- a/java/vector/src/main/java/org/apache/arrow/vector/file/ArrowReader.java
+++ b/java/vector/src/main/java/org/apache/arrow/vector/file/ArrowReader.java
@@ -123,7 +123,11 @@ public class ArrowReader implements AutoCloseable {
     if (n != l) {
       throw new IllegalStateException(n + " != " + l);
     }
-    RecordBatch recordBatchFB = RecordBatch.getRootAsRecordBatch(buffer.nioBuffer().asReadOnlyBuffer());
+
+    // Record batch flatbuffer is prefixed by its size as int32le
+    final ArrowBuf metadata = buffer.slice(4, recordBatchBlock.getMetadataLength() - 4);
+    RecordBatch recordBatchFB = RecordBatch.getRootAsRecordBatch(metadata.nioBuffer().asReadOnlyBuffer());
+
     int nodesLength = recordBatchFB.nodesLength();
     final ArrowBuf body = buffer.slice(recordBatchBlock.getMetadataLength(), (int)recordBatchBlock.getBodyLength());
     List<ArrowFieldNode> nodes = new ArrayList<>();

http://git-wip-us.apache.org/repos/asf/arrow/blob/e3c167bd/java/vector/src/main/java/org/apache/arrow/vector/file/ArrowWriter.java
----------------------------------------------------------------------
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/file/ArrowWriter.java b/java/vector/src/main/java/org/apache/arrow/vector/file/ArrowWriter.java
index 9881a22..1cd87eb 100644
--- a/java/vector/src/main/java/org/apache/arrow/vector/file/ArrowWriter.java
+++ b/java/vector/src/main/java/org/apache/arrow/vector/file/ArrowWriter.java
@@ -99,9 +99,10 @@ public class ArrowWriter implements AutoCloseable {
   public void writeRecordBatch(ArrowRecordBatch recordBatch) throws IOException {
     checkStarted();
     align();
-    // write metadata header
+
+    // write metadata header with int32 size prefix
     long offset = currentPosition;
-    write(recordBatch);
+    write(recordBatch, true);
     align();
     // write body
     long bodyOffset = currentPosition;
@@ -117,6 +118,7 @@ public class ArrowWriter implements AutoCloseable {
       if (startPosition != currentPosition) {
         writeZeros((int)(startPosition - currentPosition));
       }
+
       write(buffer);
       if (currentPosition != startPosition + layout.getSize()) {
         throw new IllegalStateException("wrong buffer size: " + currentPosition + " != " + startPosition + layout.getSize());
@@ -133,7 +135,9 @@ public class ArrowWriter implements AutoCloseable {
   }
 
   private void write(ArrowBuf buffer) throws IOException {
-    write(buffer.nioBuffer(buffer.readerIndex(), buffer.readableBytes()));
+    ByteBuffer nioBuffer = buffer.nioBuffer(buffer.readerIndex(), buffer.readableBytes());
+    LOGGER.debug("Writing buffer with size: " + nioBuffer.remaining());
+    write(nioBuffer);
   }
 
   private void checkStarted() throws IOException {
@@ -166,14 +170,21 @@ public class ArrowWriter implements AutoCloseable {
 
   private void writeFooter() throws IOException {
     // TODO: dictionaries
-    write(new ArrowFooter(schema, Collections.<ArrowBlock>emptyList(), recordBatches));
+    write(new ArrowFooter(schema, Collections.<ArrowBlock>emptyList(), recordBatches), false);
   }
 
-  private long write(FBSerializable writer) throws IOException {
+  private long write(FBSerializable writer, boolean withSizePrefix) throws IOException {
     FlatBufferBuilder builder = new FlatBufferBuilder();
     int root = writer.writeTo(builder);
     builder.finish(root);
-    return write(builder.dataBuffer());
+
+    ByteBuffer buffer = builder.dataBuffer();
+
+    if (withSizePrefix) {
+      writeIntLittleEndian(buffer.remaining());
+    }
+
+    return write(buffer);
   }
 
 }

http://git-wip-us.apache.org/repos/asf/arrow/blob/e3c167bd/java/vector/src/main/java/org/apache/arrow/vector/file/json/JsonFileReader.java
----------------------------------------------------------------------
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/file/json/JsonFileReader.java b/java/vector/src/main/java/org/apache/arrow/vector/file/json/JsonFileReader.java
index f07b517..f205982 100644
--- a/java/vector/src/main/java/org/apache/arrow/vector/file/json/JsonFileReader.java
+++ b/java/vector/src/main/java/org/apache/arrow/vector/file/json/JsonFileReader.java
@@ -127,8 +127,13 @@ public class JsonFileReader implements AutoCloseable {
         ValueVector valueVector = (ValueVector)innerVector;
         valueVector.allocateNew();
         Mutator mutator = valueVector.getMutator();
-        mutator.setValueCount(count);
-        for (int i = 0; i < count; i++) {
+
+        int innerVectorCount = count;
+        if (vectorType.getName() == "OFFSET") {
+          innerVectorCount++;
+        }
+        mutator.setValueCount(innerVectorCount);
+        for (int i = 0; i < innerVectorCount; i++) {
           parser.nextToken();
           setValueFromParser(valueVector, i);
         }

http://git-wip-us.apache.org/repos/asf/arrow/blob/e3c167bd/java/vector/src/main/java/org/apache/arrow/vector/file/json/JsonFileWriter.java
----------------------------------------------------------------------
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/file/json/JsonFileWriter.java b/java/vector/src/main/java/org/apache/arrow/vector/file/json/JsonFileWriter.java
index 812b3da..6ff3577 100644
--- a/java/vector/src/main/java/org/apache/arrow/vector/file/json/JsonFileWriter.java
+++ b/java/vector/src/main/java/org/apache/arrow/vector/file/json/JsonFileWriter.java
@@ -114,7 +114,7 @@ public class JsonFileWriter implements AutoCloseable {
         BufferBacked innerVector = fieldInnerVectors.get(v);
         generator.writeArrayFieldStart(vectorType.getName());
         ValueVector valueVector = (ValueVector)innerVector;
-        for (int i = 0; i < valueCount; i++) {
+        for (int i = 0; i < valueVector.getAccessor().getValueCount(); i++) {
           writeValueToGenerator(valueVector, i);
         }
         generator.writeEndArray();

http://git-wip-us.apache.org/repos/asf/arrow/blob/e3c167bd/python/.gitignore
----------------------------------------------------------------------
diff --git a/python/.gitignore b/python/.gitignore
index 07f2835..c37efc4 100644
--- a/python/.gitignore
+++ b/python/.gitignore
@@ -12,16 +12,6 @@ Testing/
 # Editor temporary/working/backup files
 *flymake*
 
-# Compiled source
-*.a
-*.dll
-*.o
-*.py[ocd]
-*.so
-*.dylib
-.build_cache_dir
-MANIFEST
-
 # Generated sources
 *.c
 *.cpp


[2/2] arrow git commit: ARROW-363: [Java/C++] integration testing harness, initial integration tests

Posted by we...@apache.org.
ARROW-363: [Java/C++] integration testing harness, initial integration tests

This also includes format reconciliation as discussed in ARROW-384.

Author: Wes McKinney <we...@twosigma.com>

Closes #211 from wesm/ARROW-363 and squashes the following commits:

6982c3c [Wes McKinney] Permit end of buffer IPC reads if length is 0
4d46c8b [Wes McKinney] Fix logical error with offsets array in JsonFileWriter. Add broken string test case to simple.json
36ab5d6 [Wes McKinney] Increment MetadataVersion in flatbuffer
844257e [Wes McKinney] cpplint
a2711f2 [Wes McKinney] Address other format incompatibilities, write vectorLayout to Arrow metadata
13608ef [Wes McKinney] Relax 64 byte padding. Do not write RecordBatch embedded in Message for now
6a66fc8 [Wes McKinney] Write record batch size prefix in Java
72ea42c [Wes McKinney] Note that padding is 64-bytes at start of file (for now)
c2ffde4 [Wes McKinney] More notes about the file format
aef4382 [Wes McKinney] cpplint
85128f7 [Wes McKinney] Refactor IPC/File record batch read/write structure to reflect discussion in ARROW-384
dbd6ed6 [Wes McKinney] Do not embed metadata length in WriteDataHeader
c529d63 [Wes McKinney] Fix JSON integration test example to make it further
d806aa6 [Wes McKinney] Exclude JSON files from Apache RAT checks
a7e2d4b [Wes McKinney] Draft testing harness


Project: http://git-wip-us.apache.org/repos/asf/arrow/repo
Commit: http://git-wip-us.apache.org/repos/asf/arrow/commit/e3c167bd
Tree: http://git-wip-us.apache.org/repos/asf/arrow/tree/e3c167bd
Diff: http://git-wip-us.apache.org/repos/asf/arrow/diff/e3c167bd

Branch: refs/heads/master
Commit: e3c167bd101734f92c3a2be2eb7f56f1fba91e67
Parents: 86f56a6
Author: Wes McKinney <we...@twosigma.com>
Authored: Mon Nov 28 21:29:19 2016 -0500
Committer: Wes McKinney <we...@twosigma.com>
Committed: Mon Nov 28 21:29:19 2016 -0500

----------------------------------------------------------------------
 .gitignore                                      |  26 ++
 cpp/CMakeLists.txt                              |   1 -
 cpp/src/arrow/io/io-file-test.cc                |   2 +-
 cpp/src/arrow/io/memory.cc                      |  25 +-
 cpp/src/arrow/io/memory.h                       |   8 +-
 cpp/src/arrow/ipc/adapter.cc                    | 251 ++++++++++---------
 cpp/src/arrow/ipc/adapter.h                     |  65 ++---
 cpp/src/arrow/ipc/file.cc                       |  31 ++-
 cpp/src/arrow/ipc/ipc-adapter-test.cc           |  85 ++++---
 cpp/src/arrow/ipc/ipc-file-test.cc              |   2 +-
 cpp/src/arrow/ipc/ipc-json-test.cc              |  20 +-
 cpp/src/arrow/ipc/ipc-metadata-test.cc          |  12 +-
 cpp/src/arrow/ipc/json-integration-test.cc      |  30 ++-
 cpp/src/arrow/ipc/json-internal.cc              | 110 +++-----
 cpp/src/arrow/ipc/metadata-internal.cc          | 100 +++++---
 cpp/src/arrow/ipc/metadata-internal.h           |   6 +-
 cpp/src/arrow/ipc/metadata.cc                   | 115 +++++----
 cpp/src/arrow/ipc/metadata.h                    |  50 ++--
 cpp/src/arrow/ipc/test-common.h                 |  15 +-
 cpp/src/arrow/ipc/util.h                        |   6 +-
 cpp/src/arrow/test-util.h                       |   8 +-
 cpp/src/arrow/type.cc                           |  46 +++-
 cpp/src/arrow/type.h                            |  73 ++++--
 cpp/src/arrow/types/primitive.cc                |   2 +-
 cpp/src/arrow/util/bit-util.h                   |   4 +
 dev/release/run-rat.sh                          |   3 +-
 format/IPC.md                                   | 106 ++++++++
 format/Message.fbs                              |   3 +-
 integration/data/simple.json                    |  66 +++++
 integration/integration_test.py                 | 177 +++++++++++++
 java/pom.xml                                    |   6 +-
 java/tools/pom.xml                              |   6 +
 .../org/apache/arrow/tools/Integration.java     |   1 +
 .../org/apache/arrow/vector/VectorLoader.java   |   4 +-
 .../apache/arrow/vector/file/ArrowReader.java   |   6 +-
 .../apache/arrow/vector/file/ArrowWriter.java   |  23 +-
 .../arrow/vector/file/json/JsonFileReader.java  |   9 +-
 .../arrow/vector/file/json/JsonFileWriter.java  |   2 +-
 python/.gitignore                               |  10 -
 39 files changed, 1024 insertions(+), 491 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/arrow/blob/e3c167bd/.gitignore
----------------------------------------------------------------------
diff --git a/.gitignore b/.gitignore
new file mode 100644
index 0000000..a00cbba
--- /dev/null
+++ b/.gitignore
@@ -0,0 +1,26 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Compiled source
+*.a
+*.dll
+*.o
+*.py[ocd]
+*.so
+*.dylib
+.build_cache_dir
+MANIFEST

http://git-wip-us.apache.org/repos/asf/arrow/blob/e3c167bd/cpp/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt
index 0edb8ce..1a97008 100644
--- a/cpp/CMakeLists.txt
+++ b/cpp/CMakeLists.txt
@@ -528,7 +528,6 @@ if(ARROW_BUILD_TESTS)
     ExternalProject_Add(gflags_ep
       GIT_REPOSITORY https://github.com/gflags/gflags.git
       GIT_TAG cce68f0c9c5d054017425e6e6fd54f696d36e8ee
-      # URL "https://github.com/gflags/gflags/archive/v${GFLAGS_VERSION}.tar.gz"
       BUILD_IN_SOURCE 1
       CMAKE_ARGS -DCMAKE_BUILD_TYPE=${CMAKE_BUILD_TYPE}
                  -DCMAKE_INSTALL_PREFIX=${GFLAGS_PREFIX}

http://git-wip-us.apache.org/repos/asf/arrow/blob/e3c167bd/cpp/src/arrow/io/io-file-test.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/io/io-file-test.cc b/cpp/src/arrow/io/io-file-test.cc
index 54c21d2..fad49ce 100644
--- a/cpp/src/arrow/io/io-file-test.cc
+++ b/cpp/src/arrow/io/io-file-test.cc
@@ -19,7 +19,7 @@
 #include <cstdio>
 #include <cstring>
 #ifndef _MSC_VER
-# include <fcntl.h>
+#include <fcntl.h>
 #endif
 #include <fstream>
 #include <memory>

http://git-wip-us.apache.org/repos/asf/arrow/blob/e3c167bd/cpp/src/arrow/io/memory.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/io/memory.cc b/cpp/src/arrow/io/memory.cc
index 71b0f1e..af495e2 100644
--- a/cpp/src/arrow/io/memory.cc
+++ b/cpp/src/arrow/io/memory.cc
@@ -258,8 +258,11 @@ Status BufferOutputStream::Reserve(int64_t nbytes) {
 // ----------------------------------------------------------------------
 // In-memory buffer reader
 
-BufferReader::BufferReader(const uint8_t* buffer, int buffer_size)
-    : buffer_(buffer), buffer_size_(buffer_size), position_(0) {}
+BufferReader::BufferReader(const std::shared_ptr<Buffer>& buffer)
+    : buffer_(buffer), data_(buffer->data()), size_(buffer->size()), position_(0) {}
+
+BufferReader::BufferReader(const uint8_t* data, int64_t size)
+    : buffer_(nullptr), data_(data), size_(size), position_(0) {}
 
 BufferReader::~BufferReader() {}
 
@@ -278,26 +281,32 @@ bool BufferReader::supports_zero_copy() const {
 }
 
 Status BufferReader::Read(int64_t nbytes, int64_t* bytes_read, uint8_t* buffer) {
-  memcpy(buffer, buffer_ + position_, nbytes);
-  *bytes_read = std::min(nbytes, buffer_size_ - position_);
+  memcpy(buffer, data_ + position_, nbytes);
+  *bytes_read = std::min(nbytes, size_ - position_);
   position_ += *bytes_read;
   return Status::OK();
 }
 
 Status BufferReader::Read(int64_t nbytes, std::shared_ptr<Buffer>* out) {
-  int64_t size = std::min(nbytes, buffer_size_ - position_);
-  *out = std::make_shared<Buffer>(buffer_ + position_, size);
+  int64_t size = std::min(nbytes, size_ - position_);
+
+  if (buffer_ != nullptr) {
+    *out = SliceBuffer(buffer_, position_, size);
+  } else {
+    *out = std::make_shared<Buffer>(data_ + position_, size);
+  }
+
   position_ += nbytes;
   return Status::OK();
 }
 
 Status BufferReader::GetSize(int64_t* size) {
-  *size = buffer_size_;
+  *size = size_;
   return Status::OK();
 }
 
 Status BufferReader::Seek(int64_t position) {
-  if (position < 0 || position >= buffer_size_) {
+  if (position < 0 || position >= size_) {
     return Status::IOError("position out of bounds");
   }
 

http://git-wip-us.apache.org/repos/asf/arrow/blob/e3c167bd/cpp/src/arrow/io/memory.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/io/memory.h b/cpp/src/arrow/io/memory.h
index df2fe8d..b72f93b 100644
--- a/cpp/src/arrow/io/memory.h
+++ b/cpp/src/arrow/io/memory.h
@@ -99,7 +99,8 @@ class ARROW_EXPORT MemoryMappedFile : public ReadWriteFileInterface {
 
 class ARROW_EXPORT BufferReader : public ReadableFileInterface {
  public:
-  BufferReader(const uint8_t* buffer, int buffer_size);
+  explicit BufferReader(const std::shared_ptr<Buffer>& buffer);
+  BufferReader(const uint8_t* data, int64_t size);
   ~BufferReader();
 
   Status Close() override;
@@ -116,8 +117,9 @@ class ARROW_EXPORT BufferReader : public ReadableFileInterface {
   bool supports_zero_copy() const override;
 
  private:
-  const uint8_t* buffer_;
-  int buffer_size_;
+  std::shared_ptr<Buffer> buffer_;
+  const uint8_t* data_;
+  int64_t size_;
   int64_t position_;
 };
 

http://git-wip-us.apache.org/repos/asf/arrow/blob/e3c167bd/cpp/src/arrow/ipc/adapter.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/adapter.cc b/cpp/src/arrow/ipc/adapter.cc
index da718c0..edf716f 100644
--- a/cpp/src/arrow/ipc/adapter.cc
+++ b/cpp/src/arrow/ipc/adapter.cc
@@ -48,15 +48,6 @@ namespace flatbuf = org::apache::arrow::flatbuf;
 
 namespace ipc {
 
-namespace {
-Status CheckMultipleOf64(int64_t size) {
-  if (BitUtil::IsMultipleOf64(size)) { return Status::OK(); }
-  return Status::Invalid(
-      "Attempted to write a buffer that "
-      "wasn't a multiple of 64 bytes");
-}
-}
-
 static bool IsPrimitive(const DataType* type) {
   DCHECK(type != nullptr);
   switch (type->type) {
@@ -124,30 +115,30 @@ Status VisitArray(const Array* arr, std::vector<flatbuf::FieldNode>* field_nodes
 class RecordBatchWriter {
  public:
   RecordBatchWriter(const std::vector<std::shared_ptr<Array>>& columns, int32_t num_rows,
-      int max_recursion_depth)
+      int64_t buffer_start_offset, int max_recursion_depth)
       : columns_(&columns),
         num_rows_(num_rows),
+        buffer_start_offset_(buffer_start_offset),
         max_recursion_depth_(max_recursion_depth) {}
 
-  Status AssemblePayload() {
+  Status AssemblePayload(int64_t* body_length) {
+    if (field_nodes_.size() > 0) {
+      field_nodes_.clear();
+      buffer_meta_.clear();
+      buffers_.clear();
+    }
+
     // Perform depth-first traversal of the row-batch
     for (size_t i = 0; i < columns_->size(); ++i) {
       const Array* arr = (*columns_)[i].get();
       RETURN_NOT_OK(VisitArray(arr, &field_nodes_, &buffers_, max_recursion_depth_));
     }
-    return Status::OK();
-  }
 
-  Status Write(
-      io::OutputStream* dst, int64_t* body_end_offset, int64_t* header_end_offset) {
-    // Get the starting position
-    int64_t start_position;
-    RETURN_NOT_OK(dst->Tell(&start_position));
-
-    // Keep track of the current position so we can determine the size of the
-    // message body
-    int64_t position = start_position;
+    // The position for the start of a buffer relative to the passed frame of
+    // reference. May be 0 or some other position in an address space
+    int64_t offset = buffer_start_offset_;
 
+    // Construct the buffer metadata for the record batch header
     for (size_t i = 0; i < buffers_.size(); ++i) {
       const Buffer* buffer = buffers_[i].get();
       int64_t size = 0;
@@ -161,65 +152,103 @@ class RecordBatchWriter {
 
       // TODO(wesm): We currently have no notion of shared memory page id's,
       // but we've included it in the metadata IDL for when we have it in the
-      // future. Use page=0 for now
+      // future. Use page = -1 for now
       //
       // Note that page ids are a bespoke notion for Arrow and not a feature we
       // are using from any OS-level shared memory. The thought is that systems
       // may (in the future) associate integer page id's with physical memory
       // pages (according to whatever is the desired shared memory mechanism)
-      buffer_meta_.push_back(flatbuf::Buffer(0, position, size + padding));
-
-      if (size > 0) {
-        RETURN_NOT_OK(dst->Write(buffer->data(), size));
-        position += size;
-      }
-
-      if (padding > 0) {
-        RETURN_NOT_OK(dst->Write(kPaddingBytes, padding));
-        position += padding;
-      }
+      buffer_meta_.push_back(flatbuf::Buffer(-1, offset, size + padding));
+      offset += size + padding;
     }
 
-    *body_end_offset = position;
+    *body_length = offset - buffer_start_offset_;
+    DCHECK(BitUtil::IsMultipleOf64(*body_length));
+
+    return Status::OK();
+  }
 
+  Status WriteMetadata(
+      int64_t body_length, io::OutputStream* dst, int32_t* metadata_length) {
     // Now that we have computed the locations of all of the buffers in shared
     // memory, the data header can be converted to a flatbuffer and written out
     //
     // Note: The memory written here is prefixed by the size of the flatbuffer
-    // itself as an int32_t. On reading from a input, you will have to
-    // determine the data header size then request a buffer such that you can
-    // construct the flatbuffer data accessor object (see arrow::ipc::Message)
-    std::shared_ptr<Buffer> data_header;
-    RETURN_NOT_OK(WriteDataHeader(
-        num_rows_, position - start_position, field_nodes_, buffer_meta_, &data_header));
+    // itself as an int32_t.
+    std::shared_ptr<Buffer> metadata_fb;
+    RETURN_NOT_OK(WriteRecordBatchMetadata(
+        num_rows_, body_length, field_nodes_, buffer_meta_, &metadata_fb));
+
+    // Need to write 4 bytes (metadata size), the metadata, plus padding to
+    // fall on a 64-byte offset
+    int64_t padded_metadata_length =
+        BitUtil::RoundUpToMultipleOf64(metadata_fb->size() + 4);
+
+    // The returned metadata size includes the length prefix, the flatbuffer,
+    // plus padding
+    *metadata_length = padded_metadata_length;
 
-    // Write the data header at the end
-    RETURN_NOT_OK(dst->Write(data_header->data(), data_header->size()));
+    // Write the flatbuffer size prefix
+    int32_t flatbuffer_size = metadata_fb->size();
+    RETURN_NOT_OK(
+        dst->Write(reinterpret_cast<const uint8_t*>(&flatbuffer_size), sizeof(int32_t)));
 
-    position += data_header->size();
-    *header_end_offset = position;
+    // Write the flatbuffer
+    RETURN_NOT_OK(dst->Write(metadata_fb->data(), metadata_fb->size()));
 
-    return Align(dst, &position);
+    // Write any padding
+    int64_t padding = padded_metadata_length - metadata_fb->size() - 4;
+    if (padding > 0) { RETURN_NOT_OK(dst->Write(kPaddingBytes, padding)); }
+
+    return Status::OK();
   }
 
-  Status Align(io::OutputStream* dst, int64_t* position) {
-    // Write all buffers here on word boundaries
-    // TODO(wesm): Is there benefit to 64-byte padding in IPC?
-    int64_t remainder = PaddedLength(*position) - *position;
-    if (remainder > 0) {
-      RETURN_NOT_OK(dst->Write(kPaddingBytes, remainder));
-      *position += remainder;
+  Status Write(io::OutputStream* dst, int32_t* metadata_length, int64_t* body_length) {
+    RETURN_NOT_OK(AssemblePayload(body_length));
+
+#ifndef NDEBUG
+    int64_t start_position, current_position;
+    RETURN_NOT_OK(dst->Tell(&start_position));
+#endif
+
+    RETURN_NOT_OK(WriteMetadata(*body_length, dst, metadata_length));
+
+#ifndef NDEBUG
+    RETURN_NOT_OK(dst->Tell(&current_position));
+    DCHECK(BitUtil::IsMultipleOf8(current_position));
+#endif
+
+    // Now write the buffers
+    for (size_t i = 0; i < buffers_.size(); ++i) {
+      const Buffer* buffer = buffers_[i].get();
+      int64_t size = 0;
+      int64_t padding = 0;
+
+      // The buffer might be null if we are handling zero row lengths.
+      if (buffer) {
+        size = buffer->size();
+        padding = BitUtil::RoundUpToMultipleOf64(size) - size;
+      }
+
+      if (size > 0) { RETURN_NOT_OK(dst->Write(buffer->data(), size)); }
+
+      if (padding > 0) { RETURN_NOT_OK(dst->Write(kPaddingBytes, padding)); }
     }
+
+#ifndef NDEBUG
+    RETURN_NOT_OK(dst->Tell(&current_position));
+    DCHECK(BitUtil::IsMultipleOf8(current_position));
+#endif
+
     return Status::OK();
   }
 
-  // This must be called after invoking AssemblePayload
   Status GetTotalSize(int64_t* size) {
     // emulates the behavior of Write without actually writing
-    int64_t body_offset;
-    int64_t data_header_offset;
+    int32_t metadata_length;
+    int64_t body_length;
     MockOutputStream dst;
-    RETURN_NOT_OK(Write(&dst, &body_offset, &data_header_offset));
+    RETURN_NOT_OK(Write(&dst, &metadata_length, &body_length));
     *size = dst.GetExtentBytesWritten();
     return Status::OK();
   }
@@ -228,6 +257,7 @@ class RecordBatchWriter {
   // Do not copy this vector. Ownership must be retained elsewhere
   const std::vector<std::shared_ptr<Array>>* columns_;
   int32_t num_rows_;
+  int64_t buffer_start_offset_;
 
   std::vector<flatbuf::FieldNode> field_nodes_;
   std::vector<flatbuf::Buffer> buffer_meta_;
@@ -236,18 +266,17 @@ class RecordBatchWriter {
 };
 
 Status WriteRecordBatch(const std::vector<std::shared_ptr<Array>>& columns,
-    int32_t num_rows, io::OutputStream* dst, int64_t* body_end_offset,
-    int64_t* header_end_offset, int max_recursion_depth) {
+    int32_t num_rows, int64_t buffer_start_offset, io::OutputStream* dst,
+    int32_t* metadata_length, int64_t* body_length, int max_recursion_depth) {
   DCHECK_GT(max_recursion_depth, 0);
-  RecordBatchWriter serializer(columns, num_rows, max_recursion_depth);
-  RETURN_NOT_OK(serializer.AssemblePayload());
-  return serializer.Write(dst, body_end_offset, header_end_offset);
+  RecordBatchWriter serializer(
+      columns, num_rows, buffer_start_offset, max_recursion_depth);
+  return serializer.Write(dst, metadata_length, body_length);
 }
 
 Status GetRecordBatchSize(const RecordBatch* batch, int64_t* size) {
   RecordBatchWriter serializer(
-      batch->columns(), batch->num_rows(), kMaxIpcRecursionDepth);
-  RETURN_NOT_OK(serializer.AssemblePayload());
+      batch->columns(), batch->num_rows(), 0, kMaxIpcRecursionDepth);
   RETURN_NOT_OK(serializer.GetTotalSize(size));
   return Status::OK();
 }
@@ -255,30 +284,33 @@ Status GetRecordBatchSize(const RecordBatch* batch, int64_t* size) {
 // ----------------------------------------------------------------------
 // Record batch read path
 
-class RecordBatchReader::RecordBatchReaderImpl {
+class RecordBatchReader {
  public:
-  RecordBatchReaderImpl(io::ReadableFileInterface* file,
-      const std::shared_ptr<RecordBatchMessage>& metadata, int max_recursion_depth)
-      : file_(file), metadata_(metadata), max_recursion_depth_(max_recursion_depth) {
+  RecordBatchReader(const std::shared_ptr<RecordBatchMetadata>& metadata,
+      const std::shared_ptr<Schema>& schema, int max_recursion_depth,
+      io::ReadableFileInterface* file)
+      : metadata_(metadata),
+        schema_(schema),
+        max_recursion_depth_(max_recursion_depth),
+        file_(file) {
     num_buffers_ = metadata->num_buffers();
     num_flattened_fields_ = metadata->num_fields();
   }
 
-  Status AssembleBatch(
-      const std::shared_ptr<Schema>& schema, std::shared_ptr<RecordBatch>* out) {
-    std::vector<std::shared_ptr<Array>> arrays(schema->num_fields());
+  Status Read(std::shared_ptr<RecordBatch>* out) {
+    std::vector<std::shared_ptr<Array>> arrays(schema_->num_fields());
 
     // The field_index and buffer_index are incremented in NextArray based on
     // how much of the batch is "consumed" (through nested data reconstruction,
     // for example)
     field_index_ = 0;
     buffer_index_ = 0;
-    for (int i = 0; i < schema->num_fields(); ++i) {
-      const Field* field = schema->field(i).get();
+    for (int i = 0; i < schema_->num_fields(); ++i) {
+      const Field* field = schema_->field(i).get();
       RETURN_NOT_OK(NextArray(field, max_recursion_depth_, &arrays[i]));
     }
 
-    *out = std::make_shared<RecordBatch>(schema, metadata_->length(), arrays);
+    *out = std::make_shared<RecordBatch>(schema_, metadata_->length(), arrays);
     return Status::OK();
   }
 
@@ -370,67 +402,56 @@ class RecordBatchReader::RecordBatchReaderImpl {
 
   Status GetBuffer(int buffer_index, std::shared_ptr<Buffer>* out) {
     BufferMetadata metadata = metadata_->buffer(buffer_index);
-    RETURN_NOT_OK(CheckMultipleOf64(metadata.length));
-    return file_->ReadAt(metadata.offset, metadata.length, out);
+
+    if (metadata.length == 0) {
+      *out = std::make_shared<Buffer>(nullptr, 0);
+      return Status::OK();
+    } else {
+      return file_->ReadAt(metadata.offset, metadata.length, out);
+    }
   }
 
  private:
+  std::shared_ptr<RecordBatchMetadata> metadata_;
+  std::shared_ptr<Schema> schema_;
+  int max_recursion_depth_;
   io::ReadableFileInterface* file_;
-  std::shared_ptr<RecordBatchMessage> metadata_;
 
   int field_index_;
   int buffer_index_;
-  int max_recursion_depth_;
   int num_buffers_;
   int num_flattened_fields_;
 };
 
-Status RecordBatchReader::Open(io::ReadableFileInterface* file, int64_t offset,
-    std::shared_ptr<RecordBatchReader>* out) {
-  return Open(file, offset, kMaxIpcRecursionDepth, out);
-}
-
-Status RecordBatchReader::Open(io::ReadableFileInterface* file, int64_t offset,
-    int max_recursion_depth, std::shared_ptr<RecordBatchReader>* out) {
+Status ReadRecordBatchMetadata(int64_t offset, int32_t metadata_length,
+    io::ReadableFileInterface* file, std::shared_ptr<RecordBatchMetadata>* metadata) {
   std::shared_ptr<Buffer> buffer;
-  RETURN_NOT_OK(file->ReadAt(offset - sizeof(int32_t), sizeof(int32_t), &buffer));
-
-  int32_t metadata_size = *reinterpret_cast<const int32_t*>(buffer->data());
+  RETURN_NOT_OK(file->ReadAt(offset, metadata_length, &buffer));
 
-  if (metadata_size + static_cast<int>(sizeof(int32_t)) > offset) {
-    return Status::Invalid("metadata size invalid");
-  }
-
-  // Read the metadata
-  RETURN_NOT_OK(
-      file->ReadAt(offset - metadata_size - sizeof(int32_t), metadata_size, &buffer));
-
-  // TODO(wesm): buffer slicing here would be better in case ReadAt returns
-  // allocated memory
-
-  std::shared_ptr<Message> message;
-  RETURN_NOT_OK(Message::Open(buffer, &message));
+  int32_t flatbuffer_size = *reinterpret_cast<const int32_t*>(buffer->data());
 
-  if (message->type() != Message::RECORD_BATCH) {
-    return Status::Invalid("Metadata message is not a record batch");
+  if (flatbuffer_size + static_cast<int>(sizeof(int32_t)) > metadata_length) {
+    std::stringstream ss;
+    ss << "flatbuffer size " << metadata_length << " invalid. File offset: " << offset
+       << ", metadata length: " << metadata_length;
+    return Status::Invalid(ss.str());
   }
 
-  std::shared_ptr<RecordBatchMessage> batch_meta = message->GetRecordBatch();
-
-  std::shared_ptr<RecordBatchReader> result(new RecordBatchReader());
-  result->impl_.reset(new RecordBatchReaderImpl(file, batch_meta, max_recursion_depth));
-  *out = result;
-
+  *metadata = std::make_shared<RecordBatchMetadata>(buffer, sizeof(int32_t));
   return Status::OK();
 }
 
-// Here the explicit destructor is required for compilers to be aware of
-// the complete information of RecordBatchReader::RecordBatchReaderImpl class
-RecordBatchReader::~RecordBatchReader() {}
+Status ReadRecordBatch(const std::shared_ptr<RecordBatchMetadata>& metadata,
+    const std::shared_ptr<Schema>& schema, io::ReadableFileInterface* file,
+    std::shared_ptr<RecordBatch>* out) {
+  return ReadRecordBatch(metadata, schema, kMaxIpcRecursionDepth, file, out);
+}
 
-Status RecordBatchReader::GetRecordBatch(
-    const std::shared_ptr<Schema>& schema, std::shared_ptr<RecordBatch>* out) {
-  return impl_->AssembleBatch(schema, out);
+Status ReadRecordBatch(const std::shared_ptr<RecordBatchMetadata>& metadata,
+    const std::shared_ptr<Schema>& schema, int max_recursion_depth,
+    io::ReadableFileInterface* file, std::shared_ptr<RecordBatch>* out) {
+  RecordBatchReader reader(metadata, schema, max_recursion_depth, file);
+  return reader.Read(out);
 }
 
 }  // namespace ipc

http://git-wip-us.apache.org/repos/asf/arrow/blob/e3c167bd/cpp/src/arrow/ipc/adapter.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/adapter.h b/cpp/src/arrow/ipc/adapter.h
index b02de28..963b9ee 100644
--- a/cpp/src/arrow/ipc/adapter.h
+++ b/cpp/src/arrow/ipc/adapter.h
@@ -43,7 +43,7 @@ class OutputStream;
 
 namespace ipc {
 
-class RecordBatchMessage;
+class RecordBatchMetadata;
 
 // ----------------------------------------------------------------------
 // Write path
@@ -51,22 +51,30 @@ class RecordBatchMessage;
 // TODO(emkornfield) investigate this more
 constexpr int kMaxIpcRecursionDepth = 64;
 
-// Write the RecordBatch (collection of equal-length Arrow arrays) to the output
-// stream
+// Write the RecordBatch (collection of equal-length Arrow arrays) to the
+// output stream in a contiguous block. The record batch metadata is written as
+// a flatbuffer (see format/Message.fbs -- the RecordBatch message type)
+// prefixed by its size, followed by each of the memory buffers in the batch
+// written end to end (with appropriate alignment and padding):
 //
-// First, each of the memory buffers are written out end-to-end
-//
-// Then, this function writes the batch metadata as a flatbuffer (see
-// format/Message.fbs -- the RecordBatch message type) like so:
-//
-// <int32: metadata size> <uint8*: metadata>
+// <int32: metadata size> <uint8*: metadata> <buffers>
 //
 // Finally, the absolute offsets (relative to the start of the output stream)
 // to the end of the body and end of the metadata / data header (suffixed by
 // the header size) is returned in out-variables
+//
+// @param(in) buffer_start_offset: the start offset to use in the buffer metadata,
+// default should be 0
+//
+// @param(out) metadata_length: the size of the length-prefixed flatbuffer
+// including padding to a 64-byte boundary
+//
+// @param(out) body_length: the size of the contiguous buffer block plus
+// padding bytes
 ARROW_EXPORT Status WriteRecordBatch(const std::vector<std::shared_ptr<Array>>& columns,
-    int32_t num_rows, io::OutputStream* dst, int64_t* body_end_offset,
-    int64_t* header_end_offset, int max_recursion_depth = kMaxIpcRecursionDepth);
+    int32_t num_rows, int64_t buffer_start_offset, io::OutputStream* dst,
+    int32_t* metadata_length, int64_t* body_length,
+    int max_recursion_depth = kMaxIpcRecursionDepth);
 
 // int64_t GetRecordBatchMetadata(const RecordBatch* batch);
 
@@ -78,27 +86,20 @@ ARROW_EXPORT Status GetRecordBatchSize(const RecordBatch* batch, int64_t* size);
 // ----------------------------------------------------------------------
 // "Read" path; does not copy data if the input supports zero copy reads
 
-class ARROW_EXPORT RecordBatchReader {
- public:
-  // The offset is the absolute position to the *end* of the record batch data
-  // header
-  static Status Open(io::ReadableFileInterface* file, int64_t offset,
-      std::shared_ptr<RecordBatchReader>* out);
-
-  static Status Open(io::ReadableFileInterface* file, int64_t offset,
-      int max_recursion_depth, std::shared_ptr<RecordBatchReader>* out);
-
-  virtual ~RecordBatchReader();
-
-  // Reassemble the record batch. A Schema is required to be able to construct
-  // the right array containers
-  Status GetRecordBatch(
-      const std::shared_ptr<Schema>& schema, std::shared_ptr<RecordBatch>* out);
-
- private:
-  class RecordBatchReaderImpl;
-  std::unique_ptr<RecordBatchReaderImpl> impl_;
-};
+// Read the record batch flatbuffer metadata starting at the indicated file offset
+//
+// The flatbuffer is expected to be length-prefixed, so the metadata_length
+// includes at least the length prefix and the flatbuffer
+Status ARROW_EXPORT ReadRecordBatchMetadata(int64_t offset, int32_t metadata_length,
+    io::ReadableFileInterface* file, std::shared_ptr<RecordBatchMetadata>* metadata);
+
+Status ARROW_EXPORT ReadRecordBatch(const std::shared_ptr<RecordBatchMetadata>& metadata,
+    const std::shared_ptr<Schema>& schema, io::ReadableFileInterface* file,
+    std::shared_ptr<RecordBatch>* out);
+
+Status ARROW_EXPORT ReadRecordBatch(const std::shared_ptr<RecordBatchMetadata>& metadata,
+    const std::shared_ptr<Schema>& schema, int max_recursion_depth,
+    io::ReadableFileInterface* file, std::shared_ptr<RecordBatch>* out);
 
 }  // namespace ipc
 }  // namespace arrow

http://git-wip-us.apache.org/repos/asf/arrow/blob/e3c167bd/cpp/src/arrow/ipc/file.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/file.cc b/cpp/src/arrow/ipc/file.cc
index c68244d..06001cc 100644
--- a/cpp/src/arrow/ipc/file.cc
+++ b/cpp/src/arrow/ipc/file.cc
@@ -23,6 +23,7 @@
 #include <vector>
 
 #include "arrow/io/interfaces.h"
+#include "arrow/io/memory.h"
 #include "arrow/ipc/adapter.h"
 #include "arrow/ipc/metadata.h"
 #include "arrow/ipc/util.h"
@@ -87,19 +88,19 @@ Status FileWriter::WriteRecordBatch(
 
   int64_t offset = position_;
 
-  int64_t body_end_offset;
-  int64_t header_end_offset;
+  // There may be padding ever the end of the metadata, so we cannot rely on
+  // position_
+  int32_t metadata_length;
+  int64_t body_length;
+
+  // Frame of reference in file format is 0, see ARROW-384
+  const int64_t buffer_start_offset = 0;
   RETURN_NOT_OK(arrow::ipc::WriteRecordBatch(
-      columns, num_rows, sink_, &body_end_offset, &header_end_offset));
+      columns, num_rows, buffer_start_offset, sink_, &metadata_length, &body_length));
   RETURN_NOT_OK(UpdatePosition());
 
   DCHECK(position_ % 8 == 0) << "ipc::WriteRecordBatch did not perform aligned writes";
 
-  // There may be padding ever the end of the metadata, so we cannot rely on
-  // position_
-  int32_t metadata_length = header_end_offset - body_end_offset;
-  int32_t body_length = body_end_offset - offset;
-
   // Append metadata, to be written in the footer later
   record_batches_.emplace_back(offset, metadata_length, body_length);
 
@@ -198,12 +199,18 @@ Status FileReader::GetRecordBatch(int i, std::shared_ptr<RecordBatch>* batch) {
   DCHECK_GE(i, 0);
   DCHECK_LT(i, num_record_batches());
   FileBlock block = footer_->record_batch(i);
-  int64_t metadata_end_offset = block.offset + block.body_length + block.metadata_length;
 
-  std::shared_ptr<RecordBatchReader> reader;
-  RETURN_NOT_OK(RecordBatchReader::Open(file_.get(), metadata_end_offset, &reader));
+  std::shared_ptr<RecordBatchMetadata> metadata;
+  RETURN_NOT_OK(ReadRecordBatchMetadata(
+      block.offset, block.metadata_length, file_.get(), &metadata));
+
+  // TODO(wesm): ARROW-388 -- the buffer frame of reference is 0 (see
+  // ARROW-384).
+  std::shared_ptr<Buffer> buffer_block;
+  RETURN_NOT_OK(file_->Read(block.body_length, &buffer_block));
+  io::BufferReader reader(buffer_block);
 
-  return reader->GetRecordBatch(schema_, batch);
+  return ReadRecordBatch(metadata, schema_, &reader, batch);
 }
 
 }  // namespace ipc

http://git-wip-us.apache.org/repos/asf/arrow/blob/e3c167bd/cpp/src/arrow/ipc/ipc-adapter-test.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/ipc-adapter-test.cc b/cpp/src/arrow/ipc/ipc-adapter-test.cc
index f5611d4..1accfde 100644
--- a/cpp/src/arrow/ipc/ipc-adapter-test.cc
+++ b/cpp/src/arrow/ipc/ipc-adapter-test.cc
@@ -54,17 +54,24 @@ class TestWriteRecordBatch : public ::testing::TestWithParam<MakeRecordBatch*>,
     std::string path = "test-write-row-batch";
     io::MemoryMapFixture::InitMemoryMap(memory_map_size, path, &mmap_);
 
-    int64_t body_end_offset;
-    int64_t header_end_offset;
+    int32_t metadata_length;
+    int64_t body_length;
 
-    RETURN_NOT_OK(WriteRecordBatch(batch.columns(), batch.num_rows(), mmap_.get(),
-        &body_end_offset, &header_end_offset));
+    const int64_t buffer_offset = 0;
 
-    std::shared_ptr<RecordBatchReader> reader;
-    RETURN_NOT_OK(RecordBatchReader::Open(mmap_.get(), header_end_offset, &reader));
+    RETURN_NOT_OK(WriteRecordBatch(batch.columns(), batch.num_rows(), buffer_offset,
+        mmap_.get(), &metadata_length, &body_length));
 
-    RETURN_NOT_OK(reader->GetRecordBatch(batch.schema(), batch_result));
-    return Status::OK();
+    std::shared_ptr<RecordBatchMetadata> metadata;
+    RETURN_NOT_OK(ReadRecordBatchMetadata(0, metadata_length, mmap_.get(), &metadata));
+
+    // The buffer offsets start at 0, so we must construct a
+    // ReadableFileInterface according to that frame of reference
+    std::shared_ptr<Buffer> buffer_payload;
+    RETURN_NOT_OK(mmap_->ReadAt(metadata_length, body_length, &buffer_payload));
+    io::BufferReader buffer_reader(buffer_payload);
+
+    return ReadRecordBatch(metadata, batch.schema(), &buffer_reader, batch_result);
   }
 
  protected:
@@ -96,11 +103,11 @@ INSTANTIATE_TEST_CASE_P(RoundTripTests, TestWriteRecordBatch,
 
 void TestGetRecordBatchSize(std::shared_ptr<RecordBatch> batch) {
   ipc::MockOutputStream mock;
-  int64_t mock_header_offset = -1;
-  int64_t mock_body_offset = -1;
+  int32_t mock_metadata_length = -1;
+  int64_t mock_body_length = -1;
   int64_t size = -1;
-  ASSERT_OK(WriteRecordBatch(batch->columns(), batch->num_rows(), &mock,
-      &mock_body_offset, &mock_header_offset));
+  ASSERT_OK(WriteRecordBatch(batch->columns(), batch->num_rows(), 0, &mock,
+      &mock_metadata_length, &mock_body_length));
   ASSERT_OK(GetRecordBatchSize(batch.get(), &size));
   ASSERT_EQ(mock.GetExtentBytesWritten(), size);
 }
@@ -129,39 +136,36 @@ class RecursionLimits : public ::testing::Test, public io::MemoryMapFixture {
   void SetUp() { pool_ = default_memory_pool(); }
   void TearDown() { io::MemoryMapFixture::TearDown(); }
 
-  Status WriteToMmap(int recursion_level, bool override_level,
-      int64_t* header_out = nullptr, std::shared_ptr<Schema>* schema_out = nullptr) {
+  Status WriteToMmap(int recursion_level, bool override_level, int32_t* metadata_length,
+      int64_t* body_length, std::shared_ptr<Schema>* schema) {
     const int batch_length = 5;
-    TypePtr type = kInt32;
+    TypePtr type = int32();
     ArrayPtr array;
     const bool include_nulls = true;
     RETURN_NOT_OK(MakeRandomInt32Array(1000, include_nulls, pool_, &array));
     for (int i = 0; i < recursion_level; ++i) {
-      type = std::static_pointer_cast<DataType>(std::make_shared<ListType>(type));
+      type = list(type);
       RETURN_NOT_OK(
           MakeRandomListArray(array, batch_length, include_nulls, pool_, &array));
     }
 
-    auto f0 = std::make_shared<Field>("f0", type);
-    std::shared_ptr<Schema> schema(new Schema({f0}));
-    if (schema_out != nullptr) { *schema_out = schema; }
+    auto f0 = field("f0", type);
+
+    *schema = std::shared_ptr<Schema>(new Schema({f0}));
+
     std::vector<ArrayPtr> arrays = {array};
-    auto batch = std::make_shared<RecordBatch>(schema, batch_length, arrays);
+    auto batch = std::make_shared<RecordBatch>(*schema, batch_length, arrays);
 
     std::string path = "test-write-past-max-recursion";
     const int memory_map_size = 1 << 16;
     io::MemoryMapFixture::InitMemoryMap(memory_map_size, path, &mmap_);
 
-    int64_t body_offset;
-    int64_t header_offset;
-
-    int64_t* header_out_param = header_out == nullptr ? &header_offset : header_out;
     if (override_level) {
-      return WriteRecordBatch(batch->columns(), batch->num_rows(), mmap_.get(),
-          &body_offset, header_out_param, recursion_level + 1);
+      return WriteRecordBatch(batch->columns(), batch->num_rows(), 0, mmap_.get(),
+          metadata_length, body_length, recursion_level + 1);
     } else {
-      return WriteRecordBatch(batch->columns(), batch->num_rows(), mmap_.get(),
-          &body_offset, header_out_param);
+      return WriteRecordBatch(batch->columns(), batch->num_rows(), 0, mmap_.get(),
+          metadata_length, body_length);
     }
   }
 
@@ -171,18 +175,29 @@ class RecursionLimits : public ::testing::Test, public io::MemoryMapFixture {
 };
 
 TEST_F(RecursionLimits, WriteLimit) {
-  ASSERT_RAISES(Invalid, WriteToMmap((1 << 8) + 1, false));
+  int32_t metadata_length = -1;
+  int64_t body_length = -1;
+  std::shared_ptr<Schema> schema;
+  ASSERT_RAISES(
+      Invalid, WriteToMmap((1 << 8) + 1, false, &metadata_length, &body_length, &schema));
 }
 
 TEST_F(RecursionLimits, ReadLimit) {
-  int64_t header_offset = -1;
+  int32_t metadata_length = -1;
+  int64_t body_length = -1;
   std::shared_ptr<Schema> schema;
-  ASSERT_OK(WriteToMmap(64, true, &header_offset, &schema));
+  ASSERT_OK(WriteToMmap(64, true, &metadata_length, &body_length, &schema));
 
-  std::shared_ptr<RecordBatchReader> reader;
-  ASSERT_OK(RecordBatchReader::Open(mmap_.get(), header_offset, &reader));
-  std::shared_ptr<RecordBatch> batch_result;
-  ASSERT_RAISES(Invalid, reader->GetRecordBatch(schema, &batch_result));
+  std::shared_ptr<RecordBatchMetadata> metadata;
+  ASSERT_OK(ReadRecordBatchMetadata(0, metadata_length, mmap_.get(), &metadata));
+
+  std::shared_ptr<Buffer> payload;
+  ASSERT_OK(mmap_->ReadAt(metadata_length, body_length, &payload));
+
+  io::BufferReader reader(payload);
+
+  std::shared_ptr<RecordBatch> batch;
+  ASSERT_RAISES(Invalid, ReadRecordBatch(metadata, schema, &reader, &batch));
 }
 
 }  // namespace ipc

http://git-wip-us.apache.org/repos/asf/arrow/blob/e3c167bd/cpp/src/arrow/ipc/ipc-file-test.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/ipc-file-test.cc b/cpp/src/arrow/ipc/ipc-file-test.cc
index cd424bf..a1feac4 100644
--- a/cpp/src/arrow/ipc/ipc-file-test.cc
+++ b/cpp/src/arrow/ipc/ipc-file-test.cc
@@ -68,7 +68,7 @@ class TestFileFormat : public ::testing::TestWithParam<MakeRecordBatch*> {
     RETURN_NOT_OK(sink_->Tell(&footer_offset));
 
     // Open the file
-    auto reader = std::make_shared<io::BufferReader>(buffer_->data(), buffer_->size());
+    auto reader = std::make_shared<io::BufferReader>(buffer_);
     RETURN_NOT_OK(FileReader::Open(reader, footer_offset, &file_reader_));
 
     EXPECT_EQ(num_batches, file_reader_->num_record_batches());

http://git-wip-us.apache.org/repos/asf/arrow/blob/e3c167bd/cpp/src/arrow/ipc/ipc-json-test.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/ipc-json-test.cc b/cpp/src/arrow/ipc/ipc-json-test.cc
index a51371c..e5c3a08 100644
--- a/cpp/src/arrow/ipc/ipc-json-test.cc
+++ b/cpp/src/arrow/ipc/ipc-json-test.cc
@@ -284,19 +284,23 @@ TEST(TestJsonFileReadWrite, MinimalFormatExample) {
         "name": "foo",
         "type": {"name": "int", "isSigned": true, "bitWidth": 32},
         "nullable": true, "children": [],
-        "typeLayout": [
-          {"type": "VALIDITY", "typeBitWidth": 1},
-          {"type": "DATA", "typeBitWidth": 32}
-        ]
+        "typeLayout": {
+          "vectors": [
+            {"type": "VALIDITY", "typeBitWidth": 1},
+            {"type": "DATA", "typeBitWidth": 32}
+          ]
+        }
       },
       {
         "name": "bar",
         "type": {"name": "floatingpoint", "precision": "DOUBLE"},
         "nullable": true, "children": [],
-        "typeLayout": [
-          {"type": "VALIDITY", "typeBitWidth": 1},
-          {"type": "DATA", "typeBitWidth": 64}
-        ]
+        "typeLayout": {
+          "vectors": [
+            {"type": "VALIDITY", "typeBitWidth": 1},
+            {"type": "DATA", "typeBitWidth": 64}
+          ]
+        }
       }
     ]
   },

http://git-wip-us.apache.org/repos/asf/arrow/blob/e3c167bd/cpp/src/arrow/ipc/ipc-metadata-test.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/ipc-metadata-test.cc b/cpp/src/arrow/ipc/ipc-metadata-test.cc
index 1dc3969..d29583f 100644
--- a/cpp/src/arrow/ipc/ipc-metadata-test.cc
+++ b/cpp/src/arrow/ipc/ipc-metadata-test.cc
@@ -43,7 +43,7 @@ static inline void assert_schema_equal(const Schema* lhs, const Schema* rhs) {
   }
 }
 
-class TestSchemaMessage : public ::testing::Test {
+class TestSchemaMetadata : public ::testing::Test {
  public:
   void SetUp() {}
 
@@ -52,11 +52,11 @@ class TestSchemaMessage : public ::testing::Test {
     ASSERT_OK(WriteSchema(schema, &buffer));
 
     std::shared_ptr<Message> message;
-    ASSERT_OK(Message::Open(buffer, &message));
+    ASSERT_OK(Message::Open(buffer, 0, &message));
 
     ASSERT_EQ(Message::SCHEMA, message->type());
 
-    std::shared_ptr<SchemaMessage> schema_msg = message->GetSchema();
+    auto schema_msg = std::make_shared<SchemaMetadata>(message);
     ASSERT_EQ(schema->num_fields(), schema_msg->num_fields());
 
     std::shared_ptr<Schema> schema2;
@@ -68,7 +68,7 @@ class TestSchemaMessage : public ::testing::Test {
 
 const std::shared_ptr<DataType> INT32 = std::make_shared<Int32Type>();
 
-TEST_F(TestSchemaMessage, PrimitiveFields) {
+TEST_F(TestSchemaMetadata, PrimitiveFields) {
   auto f0 = std::make_shared<Field>("f0", std::make_shared<Int8Type>());
   auto f1 = std::make_shared<Field>("f1", std::make_shared<Int16Type>());
   auto f2 = std::make_shared<Field>("f2", std::make_shared<Int32Type>());
@@ -85,7 +85,7 @@ TEST_F(TestSchemaMessage, PrimitiveFields) {
   CheckRoundtrip(&schema);
 }
 
-TEST_F(TestSchemaMessage, NestedFields) {
+TEST_F(TestSchemaMetadata, NestedFields) {
   auto type = std::make_shared<ListType>(std::make_shared<Int32Type>());
   auto f0 = std::make_shared<Field>("f0", type);
 
@@ -111,7 +111,7 @@ class TestFileFooter : public ::testing::Test {
     std::unique_ptr<FileFooter> footer;
     ASSERT_OK(FileFooter::Open(buffer, &footer));
 
-    ASSERT_EQ(MetadataVersion::V1_SNAPSHOT, footer->version());
+    ASSERT_EQ(MetadataVersion::V2, footer->version());
 
     // Check schema
     std::shared_ptr<Schema> schema2;

http://git-wip-us.apache.org/repos/asf/arrow/blob/e3c167bd/cpp/src/arrow/ipc/json-integration-test.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/json-integration-test.cc b/cpp/src/arrow/ipc/json-integration-test.cc
index 5eff899..7a313f7 100644
--- a/cpp/src/arrow/ipc/json-integration-test.cc
+++ b/cpp/src/arrow/ipc/json-integration-test.cc
@@ -255,19 +255,23 @@ static const char* JSON_EXAMPLE = R"example(
         "name": "foo",
         "type": {"name": "int", "isSigned": true, "bitWidth": 32},
         "nullable": true, "children": [],
-        "typeLayout": [
-          {"type": "VALIDITY", "typeBitWidth": 1},
-          {"type": "DATA", "typeBitWidth": 32}
-        ]
+        "typeLayout": {
+          "vectors": [
+            {"type": "VALIDITY", "typeBitWidth": 1},
+            {"type": "DATA", "typeBitWidth": 32}
+          ]
+        }
       },
       {
         "name": "bar",
         "type": {"name": "floatingpoint", "precision": "DOUBLE"},
         "nullable": true, "children": [],
-        "typeLayout": [
-          {"type": "VALIDITY", "typeBitWidth": 1},
-          {"type": "DATA", "typeBitWidth": 64}
-        ]
+        "typeLayout": {
+          "vectors": [
+            {"type": "VALIDITY", "typeBitWidth": 1},
+            {"type": "DATA", "typeBitWidth": 64}
+          ]
+        }
       }
     ]
   },
@@ -301,10 +305,12 @@ static const char* JSON_EXAMPLE2 = R"example(
         "name": "foo",
         "type": {"name": "int", "isSigned": true, "bitWidth": 32},
         "nullable": true, "children": [],
-        "typeLayout": [
-          {"type": "VALIDITY", "typeBitWidth": 1},
-          {"type": "DATA", "typeBitWidth": 32}
-        ]
+        "typeLayout": {
+          "vectors": [
+            {"type": "VALIDITY", "typeBitWidth": 1},
+            {"type": "DATA", "typeBitWidth": 32}
+          ]
+        }
       }
     ]
   },

http://git-wip-us.apache.org/repos/asf/arrow/blob/e3c167bd/cpp/src/arrow/ipc/json-internal.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/json-internal.cc b/cpp/src/arrow/ipc/json-internal.cc
index 31fe35b..e56bcb3 100644
--- a/cpp/src/arrow/ipc/json-internal.cc
+++ b/cpp/src/arrow/ipc/json-internal.cc
@@ -45,8 +45,6 @@ namespace ipc {
 using RjArray = rj::Value::ConstArray;
 using RjObject = rj::Value::ConstObject;
 
-enum class BufferType : char { DATA, OFFSET, TYPE, VALIDITY };
-
 static std::string GetBufferTypeName(BufferType type) {
   switch (type) {
     case BufferType::DATA:
@@ -93,27 +91,6 @@ static std::string GetTimeUnitName(TimeUnit unit) {
   return "UNKNOWN";
 }
 
-class BufferLayout {
- public:
-  BufferLayout(BufferType type, int bit_width) : type_(type), bit_width_(bit_width) {}
-
-  BufferType type() const { return type_; }
-  int bit_width() const { return bit_width_; }
-
- private:
-  BufferType type_;
-  int bit_width_;
-};
-
-static const BufferLayout kValidityBuffer(BufferType::VALIDITY, 1);
-static const BufferLayout kOffsetBuffer(BufferType::OFFSET, 32);
-static const BufferLayout kTypeBuffer(BufferType::TYPE, 32);
-static const BufferLayout kBooleanBuffer(BufferType::DATA, 1);
-static const BufferLayout kValues64(BufferType::DATA, 64);
-static const BufferLayout kValues32(BufferType::DATA, 32);
-static const BufferLayout kValues16(BufferType::DATA, 16);
-static const BufferLayout kValues8(BufferType::DATA, 8);
-
 class JsonSchemaWriter : public TypeVisitor {
  public:
   explicit JsonSchemaWriter(const Schema& schema, RjWriter* writer)
@@ -154,9 +131,9 @@ class JsonSchemaWriter : public TypeVisitor {
   }
 
   template <typename T>
-  typename std::enable_if<std::is_base_of<NoExtraMeta, T>::value ||
-                              std::is_base_of<BooleanType, T>::value ||
-                              std::is_base_of<NullType, T>::value,
+  typename std::enable_if<
+      std::is_base_of<NoExtraMeta, T>::value || std::is_base_of<BooleanType, T>::value ||
+          std::is_base_of<DateType, T>::value || std::is_base_of<NullType, T>::value,
       void>::type
   WriteTypeMetadata(const T& type) {}
 
@@ -243,11 +220,10 @@ class JsonSchemaWriter : public TypeVisitor {
   }
 
   template <typename T>
-  Status WritePrimitive(const std::string& typeclass, const T& type,
-      const std::vector<BufferLayout>& buffer_layout) {
+  Status WritePrimitive(const std::string& typeclass, const T& type) {
     WriteName(typeclass, type);
     SetNoChildren();
-    WriteBufferLayout(buffer_layout);
+    WriteBufferLayout(type.GetBufferLayout());
     return Status::OK();
   }
 
@@ -255,15 +231,17 @@ class JsonSchemaWriter : public TypeVisitor {
   Status WriteVarBytes(const std::string& typeclass, const T& type) {
     WriteName(typeclass, type);
     SetNoChildren();
-    WriteBufferLayout({kValidityBuffer, kOffsetBuffer, kValues8});
+    WriteBufferLayout(type.GetBufferLayout());
     return Status::OK();
   }
 
-  void WriteBufferLayout(const std::vector<BufferLayout>& buffer_layout) {
+  void WriteBufferLayout(const std::vector<BufferDescr>& buffer_layout) {
     writer_->Key("typeLayout");
+    writer_->StartObject();
+    writer_->Key("vectors");
     writer_->StartArray();
 
-    for (const BufferLayout& buffer : buffer_layout) {
+    for (const BufferDescr& buffer : buffer_layout) {
       writer_->StartObject();
       writer_->Key("type");
       writer_->String(GetBufferTypeName(buffer.type()));
@@ -274,6 +252,7 @@ class JsonSchemaWriter : public TypeVisitor {
       writer_->EndObject();
     }
     writer_->EndArray();
+    writer_->EndObject();
   }
 
   Status WriteChildren(const std::vector<std::shared_ptr<Field>>& children) {
@@ -286,74 +265,52 @@ class JsonSchemaWriter : public TypeVisitor {
     return Status::OK();
   }
 
-  Status Visit(const NullType& type) override { return WritePrimitive("null", type, {}); }
+  Status Visit(const NullType& type) override { return WritePrimitive("null", type); }
 
-  Status Visit(const BooleanType& type) override {
-    return WritePrimitive("bool", type, {kValidityBuffer, kBooleanBuffer});
-  }
+  Status Visit(const BooleanType& type) override { return WritePrimitive("bool", type); }
 
-  Status Visit(const Int8Type& type) override {
-    return WritePrimitive("int", type, {kValidityBuffer, kValues8});
-  }
+  Status Visit(const Int8Type& type) override { return WritePrimitive("int", type); }
 
-  Status Visit(const Int16Type& type) override {
-    return WritePrimitive("int", type, {kValidityBuffer, kValues16});
-  }
+  Status Visit(const Int16Type& type) override { return WritePrimitive("int", type); }
 
-  Status Visit(const Int32Type& type) override {
-    return WritePrimitive("int", type, {kValidityBuffer, kValues32});
-  }
+  Status Visit(const Int32Type& type) override { return WritePrimitive("int", type); }
 
-  Status Visit(const Int64Type& type) override {
-    return WritePrimitive("int", type, {kValidityBuffer, kValues64});
-  }
+  Status Visit(const Int64Type& type) override { return WritePrimitive("int", type); }
 
-  Status Visit(const UInt8Type& type) override {
-    return WritePrimitive("int", type, {kValidityBuffer, kValues8});
-  }
+  Status Visit(const UInt8Type& type) override { return WritePrimitive("int", type); }
 
-  Status Visit(const UInt16Type& type) override {
-    return WritePrimitive("int", type, {kValidityBuffer, kValues16});
-  }
+  Status Visit(const UInt16Type& type) override { return WritePrimitive("int", type); }
 
-  Status Visit(const UInt32Type& type) override {
-    return WritePrimitive("int", type, {kValidityBuffer, kValues32});
-  }
+  Status Visit(const UInt32Type& type) override { return WritePrimitive("int", type); }
 
-  Status Visit(const UInt64Type& type) override {
-    return WritePrimitive("int", type, {kValidityBuffer, kValues64});
-  }
+  Status Visit(const UInt64Type& type) override { return WritePrimitive("int", type); }
 
   Status Visit(const HalfFloatType& type) override {
-    return WritePrimitive("floatingpoint", type, {kValidityBuffer, kValues16});
+    return WritePrimitive("floatingpoint", type);
   }
 
   Status Visit(const FloatType& type) override {
-    return WritePrimitive("floatingpoint", type, {kValidityBuffer, kValues32});
+    return WritePrimitive("floatingpoint", type);
   }
 
   Status Visit(const DoubleType& type) override {
-    return WritePrimitive("floatingpoint", type, {kValidityBuffer, kValues64});
+    return WritePrimitive("floatingpoint", type);
   }
 
   Status Visit(const StringType& type) override { return WriteVarBytes("utf8", type); }
 
   Status Visit(const BinaryType& type) override { return WriteVarBytes("binary", type); }
 
-  Status Visit(const DateType& type) override {
-    return WritePrimitive("date", type, {kValidityBuffer, kValues64});
-  }
+  Status Visit(const DateType& type) override { return WritePrimitive("date", type); }
 
-  Status Visit(const TimeType& type) override {
-    return WritePrimitive("time", type, {kValidityBuffer, kValues64});
-  }
+  Status Visit(const TimeType& type) override { return WritePrimitive("time", type); }
 
   Status Visit(const TimestampType& type) override {
-    return WritePrimitive("timestamp", type, {kValidityBuffer, kValues64});
+    return WritePrimitive("timestamp", type);
   }
 
   Status Visit(const IntervalType& type) override {
-    return WritePrimitive("interval", type, {kValidityBuffer, kValues64});
+    return WritePrimitive("interval", type);
   }
 
   Status Visit(const DecimalType& type) override { return Status::NotImplemented("NYI"); }
@@ -361,26 +318,21 @@ class JsonSchemaWriter : public TypeVisitor {
   Status Visit(const ListType& type) override {
     WriteName("list", type);
     RETURN_NOT_OK(WriteChildren(type.children()));
-    WriteBufferLayout({kValidityBuffer, kOffsetBuffer});
+    WriteBufferLayout(type.GetBufferLayout());
     return Status::OK();
   }
 
   Status Visit(const StructType& type) override {
     WriteName("struct", type);
     WriteChildren(type.children());
-    WriteBufferLayout({kValidityBuffer, kTypeBuffer});
+    WriteBufferLayout(type.GetBufferLayout());
     return Status::OK();
   }
 
   Status Visit(const UnionType& type) override {
     WriteName("union", type);
     WriteChildren(type.children());
-
-    if (type.mode == UnionMode::SPARSE) {
-      WriteBufferLayout({kValidityBuffer, kTypeBuffer});
-    } else {
-      WriteBufferLayout({kValidityBuffer, kTypeBuffer, kOffsetBuffer});
-    }
+    WriteBufferLayout(type.GetBufferLayout());
     return Status::OK();
   }
 

http://git-wip-us.apache.org/repos/asf/arrow/blob/e3c167bd/cpp/src/arrow/ipc/metadata-internal.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/metadata-internal.cc b/cpp/src/arrow/ipc/metadata-internal.cc
index 7102012..b995228 100644
--- a/cpp/src/arrow/ipc/metadata-internal.cc
+++ b/cpp/src/arrow/ipc/metadata-internal.cc
@@ -37,20 +37,6 @@ namespace flatbuf = org::apache::arrow::flatbuf;
 
 namespace ipc {
 
-const std::shared_ptr<DataType> BOOL = std::make_shared<BooleanType>();
-const std::shared_ptr<DataType> INT8 = std::make_shared<Int8Type>();
-const std::shared_ptr<DataType> INT16 = std::make_shared<Int16Type>();
-const std::shared_ptr<DataType> INT32 = std::make_shared<Int32Type>();
-const std::shared_ptr<DataType> INT64 = std::make_shared<Int64Type>();
-const std::shared_ptr<DataType> UINT8 = std::make_shared<UInt8Type>();
-const std::shared_ptr<DataType> UINT16 = std::make_shared<UInt16Type>();
-const std::shared_ptr<DataType> UINT32 = std::make_shared<UInt32Type>();
-const std::shared_ptr<DataType> UINT64 = std::make_shared<UInt64Type>();
-const std::shared_ptr<DataType> FLOAT = std::make_shared<FloatType>();
-const std::shared_ptr<DataType> DOUBLE = std::make_shared<DoubleType>();
-const std::shared_ptr<DataType> STRING = std::make_shared<StringType>();
-const std::shared_ptr<DataType> BINARY = std::make_shared<BinaryType>();
-
 static Status IntFromFlatbuffer(
     const flatbuf::Int* int_data, std::shared_ptr<DataType>* out) {
   if (int_data->bitWidth() > 64) {
@@ -62,16 +48,16 @@ static Status IntFromFlatbuffer(
 
   switch (int_data->bitWidth()) {
     case 8:
-      *out = int_data->is_signed() ? INT8 : UINT8;
+      *out = int_data->is_signed() ? int8() : uint8();
       break;
     case 16:
-      *out = int_data->is_signed() ? INT16 : UINT16;
+      *out = int_data->is_signed() ? int16() : uint16();
       break;
     case 32:
-      *out = int_data->is_signed() ? INT32 : UINT32;
+      *out = int_data->is_signed() ? int32() : uint32();
       break;
     case 64:
-      *out = int_data->is_signed() ? INT64 : UINT64;
+      *out = int_data->is_signed() ? int64() : uint64();
       break;
     default:
       return Status::NotImplemented("Integers not in cstdint are not implemented");
@@ -81,10 +67,12 @@ static Status IntFromFlatbuffer(
 
 static Status FloatFromFlatuffer(
     const flatbuf::FloatingPoint* float_data, std::shared_ptr<DataType>* out) {
-  if (float_data->precision() == flatbuf::Precision_SINGLE) {
-    *out = FLOAT;
+  if (float_data->precision() == flatbuf::Precision_HALF) {
+    *out = float16();
+  } else if (float_data->precision() == flatbuf::Precision_SINGLE) {
+    *out = float32();
   } else {
-    *out = DOUBLE;
+    *out = float64();
   }
   return Status::OK();
 }
@@ -100,13 +88,13 @@ static Status TypeFromFlatbuffer(flatbuf::Type type, const void* type_data,
       return FloatFromFlatuffer(
           static_cast<const flatbuf::FloatingPoint*>(type_data), out);
     case flatbuf::Type_Binary:
-      *out = BINARY;
+      *out = binary();
       return Status::OK();
     case flatbuf::Type_Utf8:
-      *out = STRING;
+      *out = utf8();
       return Status::OK();
     case flatbuf::Type_Bool:
-      *out = BOOL;
+      *out = boolean();
       return Status::OK();
     case flatbuf::Type_Decimal:
     case flatbuf::Type_Timestamp:
@@ -164,7 +152,32 @@ static Status StructToFlatbuffer(FBB& fbb, const std::shared_ptr<DataType>& type
   break;
 
 static Status TypeToFlatbuffer(FBB& fbb, const std::shared_ptr<DataType>& type,
-    std::vector<FieldOffset>* children, flatbuf::Type* out_type, Offset* offset) {
+    std::vector<FieldOffset>* children, std::vector<VectorLayoutOffset>* layout,
+    flatbuf::Type* out_type, Offset* offset) {
+  std::vector<BufferDescr> buffer_layout = type->GetBufferLayout();
+  for (const BufferDescr& descr : buffer_layout) {
+    flatbuf::VectorType vector_type;
+    switch (descr.type()) {
+      case BufferType::OFFSET:
+        vector_type = flatbuf::VectorType_OFFSET;
+        break;
+      case BufferType::DATA:
+        vector_type = flatbuf::VectorType_DATA;
+        break;
+      case BufferType::VALIDITY:
+        vector_type = flatbuf::VectorType_VALIDITY;
+        break;
+      case BufferType::TYPE:
+        vector_type = flatbuf::VectorType_TYPE;
+        break;
+      default:
+        vector_type = flatbuf::VectorType_DATA;
+        break;
+    }
+    auto offset = flatbuf::CreateVectorLayout(fbb, descr.bit_width(), vector_type);
+    layout->push_back(offset);
+  }
+
   switch (type->type) {
     case Type::BOOL:
       *out_type = flatbuf::Type_Bool;
@@ -223,14 +236,18 @@ static Status FieldToFlatbuffer(
 
   flatbuf::Type type_enum;
   Offset type_data;
+  Offset type_layout;
   std::vector<FieldOffset> children;
+  std::vector<VectorLayoutOffset> layout;
 
-  RETURN_NOT_OK(TypeToFlatbuffer(fbb, field->type, &children, &type_enum, &type_data));
+  RETURN_NOT_OK(
+      TypeToFlatbuffer(fbb, field->type, &children, &layout, &type_enum, &type_data));
   auto fb_children = fbb.CreateVector(children);
+  auto fb_layout = fbb.CreateVector(layout);
 
   // TODO: produce the list of VectorTypes
   *offset = flatbuf::CreateField(fbb, fb_name, field->nullable, type_enum, type_data,
-      field->dictionary, fb_children);
+      field->dictionary, fb_children, fb_layout);
 
   return Status::OK();
 }
@@ -300,13 +317,26 @@ Status MessageBuilder::SetRecordBatch(int32_t length, int64_t body_length,
   return Status::OK();
 }
 
-Status WriteDataHeader(int32_t length, int64_t body_length,
+Status WriteRecordBatchMetadata(int32_t length, int64_t body_length,
     const std::vector<flatbuf::FieldNode>& nodes,
     const std::vector<flatbuf::Buffer>& buffers, std::shared_ptr<Buffer>* out) {
-  MessageBuilder message;
-  RETURN_NOT_OK(message.SetRecordBatch(length, body_length, nodes, buffers));
-  RETURN_NOT_OK(message.Finish());
-  return message.GetBuffer(out);
+  flatbuffers::FlatBufferBuilder fbb;
+
+  auto batch = flatbuf::CreateRecordBatch(
+      fbb, length, fbb.CreateVectorOfStructs(nodes), fbb.CreateVectorOfStructs(buffers));
+
+  fbb.Finish(batch);
+
+  int32_t size = fbb.GetSize();
+
+  auto result = std::make_shared<PoolBuffer>();
+  RETURN_NOT_OK(result->Resize(size));
+
+  uint8_t* dst = result->mutable_data();
+  memcpy(dst, fbb.GetBufferPointer(), size);
+
+  *out = result;
+  return Status::OK();
 }
 
 Status MessageBuilder::Finish() {
@@ -317,17 +347,13 @@ Status MessageBuilder::Finish() {
 }
 
 Status MessageBuilder::GetBuffer(std::shared_ptr<Buffer>* out) {
-  // The message buffer is suffixed by the size of the complete flatbuffer as
-  // int32_t
-  // <uint8_t*: flatbuffer data><int32_t: flatbuffer size>
   int32_t size = fbb_.GetSize();
 
   auto result = std::make_shared<PoolBuffer>();
-  RETURN_NOT_OK(result->Resize(size + sizeof(int32_t)));
+  RETURN_NOT_OK(result->Resize(size));
 
   uint8_t* dst = result->mutable_data();
   memcpy(dst, fbb_.GetBufferPointer(), size);
-  memcpy(dst + size, reinterpret_cast<int32_t*>(&size), sizeof(int32_t));
 
   *out = result;
   return Status::OK();

http://git-wip-us.apache.org/repos/asf/arrow/blob/e3c167bd/cpp/src/arrow/ipc/metadata-internal.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/metadata-internal.h b/cpp/src/arrow/ipc/metadata-internal.h
index c404cfd..4826ebe 100644
--- a/cpp/src/arrow/ipc/metadata-internal.h
+++ b/cpp/src/arrow/ipc/metadata-internal.h
@@ -41,10 +41,10 @@ namespace ipc {
 
 using FBB = flatbuffers::FlatBufferBuilder;
 using FieldOffset = flatbuffers::Offset<arrow::flatbuf::Field>;
+using VectorLayoutOffset = flatbuffers::Offset<arrow::flatbuf::VectorLayout>;
 using Offset = flatbuffers::Offset<void>;
 
-static constexpr flatbuf::MetadataVersion kMetadataVersion =
-    flatbuf::MetadataVersion_V1_SNAPSHOT;
+static constexpr flatbuf::MetadataVersion kMetadataVersion = flatbuf::MetadataVersion_V2;
 
 Status FieldFromFlatbuffer(const flatbuf::Field* field, std::shared_ptr<Field>* out);
 
@@ -70,7 +70,7 @@ class MessageBuilder {
   flatbuffers::FlatBufferBuilder fbb_;
 };
 
-Status WriteDataHeader(int32_t length, int64_t body_length,
+Status WriteRecordBatchMetadata(int32_t length, int64_t body_length,
     const std::vector<flatbuf::FieldNode>& nodes,
     const std::vector<flatbuf::Buffer>& buffers, std::shared_ptr<Buffer>* out);
 

http://git-wip-us.apache.org/repos/asf/arrow/blob/e3c167bd/cpp/src/arrow/ipc/metadata.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/metadata.cc b/cpp/src/arrow/ipc/metadata.cc
index 66df8a6..44d3939 100644
--- a/cpp/src/arrow/ipc/metadata.cc
+++ b/cpp/src/arrow/ipc/metadata.cc
@@ -50,9 +50,15 @@ Status WriteSchema(const Schema* schema, std::shared_ptr<Buffer>* out) {
 
 class Message::MessageImpl {
  public:
-  explicit MessageImpl(
-      const std::shared_ptr<Buffer>& buffer, const flatbuf::Message* message)
-      : buffer_(buffer), message_(message) {}
+  explicit MessageImpl(const std::shared_ptr<Buffer>& buffer, int64_t offset)
+      : buffer_(buffer), offset_(offset), message_(nullptr) {}
+
+  Status Open() {
+    message_ = flatbuf::GetMessage(buffer_->data() + offset_);
+
+    // TODO(wesm): verify the message
+    return Status::OK();
+  }
 
   Message::Type type() const {
     switch (message_->header_type()) {
@@ -72,25 +78,23 @@ class Message::MessageImpl {
   int64_t body_length() const { return message_->bodyLength(); }
 
  private:
-  // Owns the memory this message accesses
+  // Retain reference to memory
   std::shared_ptr<Buffer> buffer_;
+  int64_t offset_;
 
   const flatbuf::Message* message_;
 };
 
-Message::Message() {}
-
-Status Message::Open(
-    const std::shared_ptr<Buffer>& buffer, std::shared_ptr<Message>* out) {
-  std::shared_ptr<Message> result(new Message());
-
-  const flatbuf::Message* message = flatbuf::GetMessage(buffer->data());
+Message::Message(const std::shared_ptr<Buffer>& buffer, int64_t offset) {
+  impl_.reset(new MessageImpl(buffer, offset));
+}
 
-  // TODO(wesm): verify message
-  result->impl_.reset(new MessageImpl(buffer, message));
-  *out = result;
+Status Message::Open(const std::shared_ptr<Buffer>& buffer, int64_t offset,
+    std::shared_ptr<Message>* out) {
+  // ctor is private
 
-  return Status::OK();
+  *out = std::shared_ptr<Message>(new Message(buffer, offset));
+  return (*out)->impl_->Open();
 }
 
 Message::Type Message::type() const {
@@ -101,20 +105,12 @@ int64_t Message::body_length() const {
   return impl_->body_length();
 }
 
-std::shared_ptr<Message> Message::get_shared_ptr() {
-  return this->shared_from_this();
-}
-
-std::shared_ptr<SchemaMessage> Message::GetSchema() {
-  return std::make_shared<SchemaMessage>(this->shared_from_this(), impl_->header());
-}
-
 // ----------------------------------------------------------------------
-// SchemaMessage
+// SchemaMetadata
 
-class SchemaMessage::SchemaMessageImpl {
+class SchemaMetadata::SchemaMetadataImpl {
  public:
-  explicit SchemaMessageImpl(const void* schema)
+  explicit SchemaMetadataImpl(const void* schema)
       : schema_(static_cast<const flatbuf::Schema*>(schema)) {}
 
   const flatbuf::Field* field(int i) const { return schema_->fields()->Get(i); }
@@ -125,22 +121,29 @@ class SchemaMessage::SchemaMessageImpl {
   const flatbuf::Schema* schema_;
 };
 
-SchemaMessage::SchemaMessage(
-    const std::shared_ptr<Message>& message, const void* schema) {
+SchemaMetadata::SchemaMetadata(
+    const std::shared_ptr<Message>& message, const void* flatbuf) {
+  message_ = message;
+  impl_.reset(new SchemaMetadataImpl(flatbuf));
+}
+
+SchemaMetadata::SchemaMetadata(const std::shared_ptr<Message>& message) {
   message_ = message;
-  impl_.reset(new SchemaMessageImpl(schema));
+  impl_.reset(new SchemaMetadataImpl(message->impl_->header()));
 }
 
-int SchemaMessage::num_fields() const {
+SchemaMetadata::~SchemaMetadata() {}
+
+int SchemaMetadata::num_fields() const {
   return impl_->num_fields();
 }
 
-Status SchemaMessage::GetField(int i, std::shared_ptr<Field>* out) const {
+Status SchemaMetadata::GetField(int i, std::shared_ptr<Field>* out) const {
   const flatbuf::Field* field = impl_->field(i);
   return FieldFromFlatbuffer(field, out);
 }
 
-Status SchemaMessage::GetSchema(std::shared_ptr<Schema>* out) const {
+Status SchemaMetadata::GetSchema(std::shared_ptr<Schema>* out) const {
   std::vector<std::shared_ptr<Field>> fields(num_fields());
   for (int i = 0; i < this->num_fields(); ++i) {
     RETURN_NOT_OK(GetField(i, &fields[i]));
@@ -150,11 +153,11 @@ Status SchemaMessage::GetSchema(std::shared_ptr<Schema>* out) const {
 }
 
 // ----------------------------------------------------------------------
-// RecordBatchMessage
+// RecordBatchMetadata
 
-class RecordBatchMessage::RecordBatchMessageImpl {
+class RecordBatchMetadata::RecordBatchMetadataImpl {
  public:
-  explicit RecordBatchMessageImpl(const void* batch)
+  explicit RecordBatchMetadataImpl(const void* batch)
       : batch_(static_cast<const flatbuf::RecordBatch*>(batch)) {
     nodes_ = batch_->nodes();
     buffers_ = batch_->buffers();
@@ -176,19 +179,29 @@ class RecordBatchMessage::RecordBatchMessageImpl {
   const flatbuffers::Vector<const flatbuf::Buffer*>* buffers_;
 };
 
-std::shared_ptr<RecordBatchMessage> Message::GetRecordBatch() {
-  return std::make_shared<RecordBatchMessage>(this->shared_from_this(), impl_->header());
+RecordBatchMetadata::RecordBatchMetadata(const std::shared_ptr<Message>& message) {
+  message_ = message;
+  impl_.reset(new RecordBatchMetadataImpl(message->impl_->header()));
 }
 
-RecordBatchMessage::RecordBatchMessage(
-    const std::shared_ptr<Message>& message, const void* batch) {
-  message_ = message;
-  impl_.reset(new RecordBatchMessageImpl(batch));
+RecordBatchMetadata::RecordBatchMetadata(
+    const std::shared_ptr<Buffer>& buffer, int64_t offset) {
+  message_ = nullptr;
+  buffer_ = buffer;
+
+  const flatbuf::RecordBatch* metadata =
+      flatbuffers::GetRoot<flatbuf::RecordBatch>(buffer->data() + offset);
+
+  // TODO(wesm): validate table
+
+  impl_.reset(new RecordBatchMetadataImpl(metadata));
 }
 
+RecordBatchMetadata::~RecordBatchMetadata() {}
+
 // TODO(wesm): Copying the flatbuffer data isn't great, but this will do for
 // now
-FieldMetadata RecordBatchMessage::field(int i) const {
+FieldMetadata RecordBatchMetadata::field(int i) const {
   const flatbuf::FieldNode* node = impl_->field(i);
 
   FieldMetadata result;
@@ -197,7 +210,7 @@ FieldMetadata RecordBatchMessage::field(int i) const {
   return result;
 }
 
-BufferMetadata RecordBatchMessage::buffer(int i) const {
+BufferMetadata RecordBatchMetadata::buffer(int i) const {
   const flatbuf::Buffer* buffer = impl_->buffer(i);
 
   BufferMetadata result;
@@ -207,15 +220,15 @@ BufferMetadata RecordBatchMessage::buffer(int i) const {
   return result;
 }
 
-int32_t RecordBatchMessage::length() const {
+int32_t RecordBatchMetadata::length() const {
   return impl_->length();
 }
 
-int RecordBatchMessage::num_buffers() const {
+int RecordBatchMetadata::num_buffers() const {
   return impl_->num_buffers();
 }
 
-int RecordBatchMessage::num_fields() const {
+int RecordBatchMetadata::num_fields() const {
   return impl_->num_fields();
 }
 
@@ -268,11 +281,13 @@ class FileFooter::FileFooterImpl {
 
   MetadataVersion::type version() const {
     switch (footer_->version()) {
-      case flatbuf::MetadataVersion_V1_SNAPSHOT:
-        return MetadataVersion::V1_SNAPSHOT;
+      case flatbuf::MetadataVersion_V1:
+        return MetadataVersion::V1;
+      case flatbuf::MetadataVersion_V2:
+        return MetadataVersion::V2;
       // Add cases as other versions become available
       default:
-        return MetadataVersion::V1_SNAPSHOT;
+        return MetadataVersion::V2;
     }
   }
 
@@ -285,7 +300,7 @@ class FileFooter::FileFooterImpl {
   }
 
   Status GetSchema(std::shared_ptr<Schema>* out) const {
-    auto schema_msg = std::make_shared<SchemaMessage>(nullptr, footer_->schema());
+    auto schema_msg = std::make_shared<SchemaMetadata>(nullptr, footer_->schema());
     return schema_msg->GetSchema(out);
   }
 

http://git-wip-us.apache.org/repos/asf/arrow/blob/e3c167bd/cpp/src/arrow/ipc/metadata.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/metadata.h b/cpp/src/arrow/ipc/metadata.h
index 2f0e853..1c4ef64 100644
--- a/cpp/src/arrow/ipc/metadata.h
+++ b/cpp/src/arrow/ipc/metadata.h
@@ -42,7 +42,7 @@ class OutputStream;
 namespace ipc {
 
 struct MetadataVersion {
-  enum type { V1_SNAPSHOT };
+  enum type { V1, V2 };
 };
 
 //----------------------------------------------------------------------
@@ -58,10 +58,14 @@ Status WriteSchema(const Schema* schema, std::shared_ptr<Buffer>* out);
 class Message;
 
 // Container for serialized Schema metadata contained in an IPC message
-class ARROW_EXPORT SchemaMessage {
+class ARROW_EXPORT SchemaMetadata {
  public:
+  explicit SchemaMetadata(const std::shared_ptr<Message>& message);
+
   // Accepts an opaque flatbuffer pointer
-  SchemaMessage(const std::shared_ptr<Message>& message, const void* schema);
+  SchemaMetadata(const std::shared_ptr<Message>& message, const void* schema);
+
+  ~SchemaMetadata();
 
   int num_fields() const;
 
@@ -76,8 +80,8 @@ class ARROW_EXPORT SchemaMessage {
   // Parent, owns the flatbuffer data
   std::shared_ptr<Message> message_;
 
-  class SchemaMessageImpl;
-  std::unique_ptr<SchemaMessageImpl> impl_;
+  class SchemaMetadataImpl;
+  std::unique_ptr<SchemaMetadataImpl> impl_;
 };
 
 // Field metadata
@@ -93,10 +97,13 @@ struct BufferMetadata {
 };
 
 // Container for serialized record batch metadata contained in an IPC message
-class ARROW_EXPORT RecordBatchMessage {
+class ARROW_EXPORT RecordBatchMetadata {
  public:
-  // Accepts an opaque flatbuffer pointer
-  RecordBatchMessage(const std::shared_ptr<Message>& message, const void* batch_meta);
+  explicit RecordBatchMetadata(const std::shared_ptr<Message>& message);
+
+  RecordBatchMetadata(const std::shared_ptr<Buffer>& message, int64_t offset);
+
+  ~RecordBatchMetadata();
 
   FieldMetadata field(int i) const;
   BufferMetadata buffer(int i) const;
@@ -108,37 +115,34 @@ class ARROW_EXPORT RecordBatchMessage {
  private:
   // Parent, owns the flatbuffer data
   std::shared_ptr<Message> message_;
+  std::shared_ptr<Buffer> buffer_;
 
-  class RecordBatchMessageImpl;
-  std::unique_ptr<RecordBatchMessageImpl> impl_;
+  class RecordBatchMetadataImpl;
+  std::unique_ptr<RecordBatchMetadataImpl> impl_;
 };
 
-class ARROW_EXPORT DictionaryBatchMessage {
+class ARROW_EXPORT DictionaryBatchMetadata {
  public:
   int64_t id() const;
-  std::unique_ptr<RecordBatchMessage> data() const;
+  std::unique_ptr<RecordBatchMetadata> data() const;
 };
 
-class ARROW_EXPORT Message : public std::enable_shared_from_this<Message> {
+class ARROW_EXPORT Message {
  public:
   enum Type { NONE, SCHEMA, DICTIONARY_BATCH, RECORD_BATCH };
 
-  static Status Open(
-      const std::shared_ptr<Buffer>& buffer, std::shared_ptr<Message>* out);
-
-  std::shared_ptr<Message> get_shared_ptr();
+  static Status Open(const std::shared_ptr<Buffer>& buffer, int64_t offset,
+      std::shared_ptr<Message>* out);
 
   int64_t body_length() const;
 
   Type type() const;
 
-  // These methods only to be invoked if you have checked the message type
-  std::shared_ptr<SchemaMessage> GetSchema();
-  std::shared_ptr<RecordBatchMessage> GetRecordBatch();
-  std::shared_ptr<DictionaryBatchMessage> GetDictionaryBatch();
-
  private:
-  Message();
+  Message(const std::shared_ptr<Buffer>& buffer, int64_t offset);
+
+  friend class RecordBatchMetadata;
+  friend class SchemaMetadata;
 
   // Hide serialization details from user API
   class MessageImpl;

http://git-wip-us.apache.org/repos/asf/arrow/blob/e3c167bd/cpp/src/arrow/ipc/test-common.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/test-common.h b/cpp/src/arrow/ipc/test-common.h
index 9abc20d..65b3782 100644
--- a/cpp/src/arrow/ipc/test-common.h
+++ b/cpp/src/arrow/ipc/test-common.h
@@ -39,8 +39,7 @@
 namespace arrow {
 namespace ipc {
 
-const auto kInt32 = std::make_shared<Int32Type>();
-const auto kListInt32 = list(kInt32);
+const auto kListInt32 = list(int32());
 const auto kListListInt32 = list(kListInt32);
 
 Status MakeRandomInt32Array(
@@ -99,8 +98,8 @@ Status MakeIntRecordBatch(std::shared_ptr<RecordBatch>* out) {
   const int length = 1000;
 
   // Make the schema
-  auto f0 = std::make_shared<Field>("f0", kInt32);
-  auto f1 = std::make_shared<Field>("f1", kInt32);
+  auto f0 = std::make_shared<Field>("f0", int32());
+  auto f1 = std::make_shared<Field>("f1", int32());
   std::shared_ptr<Schema> schema(new Schema({f0, f1}));
 
   // Example data
@@ -161,7 +160,7 @@ Status MakeListRecordBatch(std::shared_ptr<RecordBatch>* out) {
   // Make the schema
   auto f0 = std::make_shared<Field>("f0", kListInt32);
   auto f1 = std::make_shared<Field>("f1", kListListInt32);
-  auto f2 = std::make_shared<Field>("f2", kInt32);
+  auto f2 = std::make_shared<Field>("f2", int32());
   std::shared_ptr<Schema> schema(new Schema({f0, f1, f2}));
 
   // Example data
@@ -184,7 +183,7 @@ Status MakeZeroLengthRecordBatch(std::shared_ptr<RecordBatch>* out) {
   // Make the schema
   auto f0 = std::make_shared<Field>("f0", kListInt32);
   auto f1 = std::make_shared<Field>("f1", kListListInt32);
-  auto f2 = std::make_shared<Field>("f2", kInt32);
+  auto f2 = std::make_shared<Field>("f2", int32());
   std::shared_ptr<Schema> schema(new Schema({f0, f1, f2}));
 
   // Example data
@@ -205,7 +204,7 @@ Status MakeNonNullRecordBatch(std::shared_ptr<RecordBatch>* out) {
   // Make the schema
   auto f0 = std::make_shared<Field>("f0", kListInt32);
   auto f1 = std::make_shared<Field>("f1", kListListInt32);
-  auto f2 = std::make_shared<Field>("f2", kInt32);
+  auto f2 = std::make_shared<Field>("f2", int32());
   std::shared_ptr<Schema> schema(new Schema({f0, f1, f2}));
 
   // Example data
@@ -226,7 +225,7 @@ Status MakeNonNullRecordBatch(std::shared_ptr<RecordBatch>* out) {
 
 Status MakeDeeplyNestedList(std::shared_ptr<RecordBatch>* out) {
   const int batch_length = 5;
-  TypePtr type = kInt32;
+  TypePtr type = int32();
 
   MemoryPool* pool = default_memory_pool();
   ArrayPtr array;

http://git-wip-us.apache.org/repos/asf/arrow/blob/e3c167bd/cpp/src/arrow/ipc/util.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/util.h b/cpp/src/arrow/ipc/util.h
index 9000d1b..242d662 100644
--- a/cpp/src/arrow/ipc/util.h
+++ b/cpp/src/arrow/ipc/util.h
@@ -28,12 +28,10 @@ namespace arrow {
 namespace ipc {
 
 // Align on 8-byte boundaries
-static constexpr int kArrowAlignment = 8;
-
 // Buffers are padded to 64-byte boundaries (for SIMD)
-static constexpr int kArrowBufferAlignment = 64;
+static constexpr int kArrowAlignment = 64;
 
-static constexpr uint8_t kPaddingBytes[kArrowBufferAlignment] = {0};
+static constexpr uint8_t kPaddingBytes[kArrowAlignment] = {0};
 
 static inline int64_t PaddedLength(int64_t nbytes, int64_t alignment = kArrowAlignment) {
   return ((nbytes + alignment - 1) / alignment) * alignment;

http://git-wip-us.apache.org/repos/asf/arrow/blob/e3c167bd/cpp/src/arrow/test-util.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/test-util.h b/cpp/src/arrow/test-util.h
index 93dd5b6..63c2166 100644
--- a/cpp/src/arrow/test-util.h
+++ b/cpp/src/arrow/test-util.h
@@ -61,10 +61,10 @@
 
 // Alias MSVC popcount to GCC name
 #ifdef _MSC_VER
-#  include <intrin.h>
-#  define __builtin_popcount __popcnt
-#  include <nmmintrin.h>
-#  define __builtin_popcountll _mm_popcnt_u64
+#include <intrin.h>
+#define __builtin_popcount __popcnt
+#include <nmmintrin.h>
+#define __builtin_popcountll _mm_popcnt_u64
 #endif
 
 namespace arrow {

http://git-wip-us.apache.org/repos/asf/arrow/blob/e3c167bd/cpp/src/arrow/type.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/type.cc b/cpp/src/arrow/type.cc
index 589bdad..80f295c 100644
--- a/cpp/src/arrow/type.cc
+++ b/cpp/src/arrow/type.cc
@@ -105,10 +105,6 @@ std::string UnionType::ToString() const {
   return s.str();
 }
 
-int NullType::bit_width() const {
-  return 0;
-}
-
 std::string NullType::ToString() const {
   return name();
 }
@@ -187,4 +183,46 @@ std::shared_ptr<Field> field(
   return std::make_shared<Field>(name, type, nullable, dictionary);
 }
 
+static const BufferDescr kValidityBuffer(BufferType::VALIDITY, 1);
+static const BufferDescr kOffsetBuffer(BufferType::OFFSET, 32);
+static const BufferDescr kTypeBuffer(BufferType::TYPE, 32);
+static const BufferDescr kBooleanBuffer(BufferType::DATA, 1);
+static const BufferDescr kValues64(BufferType::DATA, 64);
+static const BufferDescr kValues32(BufferType::DATA, 32);
+static const BufferDescr kValues16(BufferType::DATA, 16);
+static const BufferDescr kValues8(BufferType::DATA, 8);
+
+std::vector<BufferDescr> FixedWidthType::GetBufferLayout() const {
+  return {kValidityBuffer, BufferDescr(BufferType::DATA, bit_width())};
+}
+
+std::vector<BufferDescr> NullType::GetBufferLayout() const {
+  return {};
+}
+
+std::vector<BufferDescr> BinaryType::GetBufferLayout() const {
+  return {kValidityBuffer, kOffsetBuffer, kValues8};
+}
+
+std::vector<BufferDescr> ListType::GetBufferLayout() const {
+  return {kValidityBuffer, kOffsetBuffer};
+}
+
+std::vector<BufferDescr> StructType::GetBufferLayout() const {
+  return {kValidityBuffer, kTypeBuffer};
+}
+
+std::vector<BufferDescr> UnionType::GetBufferLayout() const {
+  if (mode == UnionMode::SPARSE) {
+    return {kValidityBuffer, kTypeBuffer};
+  } else {
+    return {kValidityBuffer, kTypeBuffer, kOffsetBuffer};
+  }
+}
+
+std::vector<BufferDescr> DecimalType::GetBufferLayout() const {
+  // TODO(wesm)
+  return {};
+}
+
 }  // namespace arrow

http://git-wip-us.apache.org/repos/asf/arrow/blob/e3c167bd/cpp/src/arrow/type.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/type.h b/cpp/src/arrow/type.h
index 876d7ea..3077738 100644
--- a/cpp/src/arrow/type.h
+++ b/cpp/src/arrow/type.h
@@ -101,6 +101,20 @@ struct Type {
   };
 };
 
+enum class BufferType : char { DATA, OFFSET, TYPE, VALIDITY };
+
+class BufferDescr {
+ public:
+  BufferDescr(BufferType type, int bit_width) : type_(type), bit_width_(bit_width) {}
+
+  BufferType type() const { return type_; }
+  int bit_width() const { return bit_width_; }
+
+ private:
+  BufferType type_;
+  int bit_width_;
+};
+
 struct ARROW_EXPORT DataType {
   Type::type type;
 
@@ -129,12 +143,18 @@ struct ARROW_EXPORT DataType {
   virtual Status Accept(TypeVisitor* visitor) const = 0;
 
   virtual std::string ToString() const = 0;
+
+  virtual std::vector<BufferDescr> GetBufferLayout() const = 0;
 };
 
 typedef std::shared_ptr<DataType> TypePtr;
 
-struct ARROW_EXPORT FixedWidthMeta {
+struct ARROW_EXPORT FixedWidthType : public DataType {
+  using DataType::DataType;
+
   virtual int bit_width() const = 0;
+
+  std::vector<BufferDescr> GetBufferLayout() const override;
 };
 
 struct ARROW_EXPORT IntegerMeta {
@@ -184,12 +204,12 @@ struct ARROW_EXPORT Field {
 };
 typedef std::shared_ptr<Field> FieldPtr;
 
-struct ARROW_EXPORT PrimitiveCType : public DataType {
-  using DataType::DataType;
+struct ARROW_EXPORT PrimitiveCType : public FixedWidthType {
+  using FixedWidthType::FixedWidthType;
 };
 
 template <typename DERIVED, Type::type TYPE_ID, typename C_TYPE>
-struct ARROW_EXPORT CTypeImpl : public PrimitiveCType, public FixedWidthMeta {
+struct ARROW_EXPORT CTypeImpl : public PrimitiveCType {
   using c_type = C_TYPE;
   static constexpr Type::type type_id = TYPE_ID;
 
@@ -204,16 +224,17 @@ struct ARROW_EXPORT CTypeImpl : public PrimitiveCType, public FixedWidthMeta {
   std::string ToString() const override { return std::string(DERIVED::name()); }
 };
 
-struct ARROW_EXPORT NullType : public DataType, public FixedWidthMeta {
+struct ARROW_EXPORT NullType : public DataType {
   static constexpr Type::type type_id = Type::NA;
 
   NullType() : DataType(Type::NA) {}
 
-  int bit_width() const override;
   Status Accept(TypeVisitor* visitor) const override;
   std::string ToString() const override;
 
   static std::string name() { return "null"; }
+
+  std::vector<BufferDescr> GetBufferLayout() const override;
 };
 
 template <typename DERIVED, Type::type TYPE_ID, typename C_TYPE>
@@ -221,10 +242,10 @@ struct IntegerTypeImpl : public CTypeImpl<DERIVED, TYPE_ID, C_TYPE>, public Inte
   bool is_signed() const override { return std::is_signed<C_TYPE>::value; }
 };
 
-struct ARROW_EXPORT BooleanType : public DataType, FixedWidthMeta {
+struct ARROW_EXPORT BooleanType : public FixedWidthType {
   static constexpr Type::type type_id = Type::BOOL;
 
-  BooleanType() : DataType(Type::BOOL) {}
+  BooleanType() : FixedWidthType(Type::BOOL) {}
 
   Status Accept(TypeVisitor* visitor) const override;
   std::string ToString() const override;
@@ -306,6 +327,8 @@ struct ARROW_EXPORT ListType : public DataType, public NoExtraMeta {
   std::string ToString() const override;
 
   static std::string name() { return "list"; }
+
+  std::vector<BufferDescr> GetBufferLayout() const override;
 };
 
 // BinaryType type is reprsents lists of 1-byte values.
@@ -318,6 +341,8 @@ struct ARROW_EXPORT BinaryType : public DataType, public NoExtraMeta {
   std::string ToString() const override;
   static std::string name() { return "binary"; }
 
+  std::vector<BufferDescr> GetBufferLayout() const override;
+
  protected:
   // Allow subclasses to change the logical type.
   explicit BinaryType(Type::type logical_type) : DataType(logical_type) {}
@@ -345,6 +370,8 @@ struct ARROW_EXPORT StructType : public DataType, public NoExtraMeta {
   Status Accept(TypeVisitor* visitor) const override;
   std::string ToString() const override;
   static std::string name() { return "struct"; }
+
+  std::vector<BufferDescr> GetBufferLayout() const override;
 };
 
 struct ARROW_EXPORT DecimalType : public DataType {
@@ -358,6 +385,8 @@ struct ARROW_EXPORT DecimalType : public DataType {
   Status Accept(TypeVisitor* visitor) const override;
   std::string ToString() const override;
   static std::string name() { return "decimal"; }
+
+  std::vector<BufferDescr> GetBufferLayout() const override;
 };
 
 enum class UnionMode : char { SPARSE, DENSE };
@@ -375,14 +404,20 @@ struct ARROW_EXPORT UnionType : public DataType {
   static std::string name() { return "union"; }
   Status Accept(TypeVisitor* visitor) const override;
 
+  std::vector<BufferDescr> GetBufferLayout() const override;
+
   UnionMode mode;
   std::vector<uint8_t> type_ids;
 };
 
-struct ARROW_EXPORT DateType : public DataType, public NoExtraMeta {
+struct ARROW_EXPORT DateType : public FixedWidthType {
   static constexpr Type::type type_id = Type::DATE;
 
-  DateType() : DataType(Type::DATE) {}
+  using c_type = int32_t;
+
+  DateType() : FixedWidthType(Type::DATE) {}
+
+  int bit_width() const override { return sizeof(c_type) * 8; }
 
   Status Accept(TypeVisitor* visitor) const override;
   std::string ToString() const override { return name(); }
@@ -391,13 +426,17 @@ struct ARROW_EXPORT DateType : public DataType, public NoExtraMeta {
 
 enum class TimeUnit : char { SECOND = 0, MILLI = 1, MICRO = 2, NANO = 3 };
 
-struct ARROW_EXPORT TimeType : public DataType {
+struct ARROW_EXPORT TimeType : public FixedWidthType {
   static constexpr Type::type type_id = Type::TIME;
   using Unit = TimeUnit;
+  using c_type = int64_t;
 
   TimeUnit unit;
 
-  explicit TimeType(TimeUnit unit = TimeUnit::MILLI) : DataType(Type::TIME), unit(unit) {}
+  int bit_width() const override { return sizeof(c_type) * 8; }
+
+  explicit TimeType(TimeUnit unit = TimeUnit::MILLI)
+      : FixedWidthType(Type::TIME), unit(unit) {}
   TimeType(const TimeType& other) : TimeType(other.unit) {}
 
   Status Accept(TypeVisitor* visitor) const override;
@@ -405,7 +444,7 @@ struct ARROW_EXPORT TimeType : public DataType {
   static std::string name() { return "time"; }
 };
 
-struct ARROW_EXPORT TimestampType : public DataType, public FixedWidthMeta {
+struct ARROW_EXPORT TimestampType : public FixedWidthType {
   using Unit = TimeUnit;
 
   typedef int64_t c_type;
@@ -416,7 +455,7 @@ struct ARROW_EXPORT TimestampType : public DataType, public FixedWidthMeta {
   TimeUnit unit;
 
   explicit TimestampType(TimeUnit unit = TimeUnit::MILLI)
-      : DataType(Type::TIMESTAMP), unit(unit) {}
+      : FixedWidthType(Type::TIMESTAMP), unit(unit) {}
 
   TimestampType(const TimestampType& other) : TimestampType(other.unit) {}
 
@@ -425,10 +464,10 @@ struct ARROW_EXPORT TimestampType : public DataType, public FixedWidthMeta {
   static std::string name() { return "timestamp"; }
 };
 
-struct ARROW_EXPORT IntervalType : public DataType, public FixedWidthMeta {
+struct ARROW_EXPORT IntervalType : public FixedWidthType {
   enum class Unit : char { YEAR_MONTH = 0, DAY_TIME = 1 };
 
-  typedef int64_t c_type;
+  using c_type = int64_t;
   static constexpr Type::type type_id = Type::INTERVAL;
 
   int bit_width() const override { return sizeof(int64_t) * 8; }
@@ -436,7 +475,7 @@ struct ARROW_EXPORT IntervalType : public DataType, public FixedWidthMeta {
   Unit unit;
 
   explicit IntervalType(Unit unit = Unit::YEAR_MONTH)
-      : DataType(Type::INTERVAL), unit(unit) {}
+      : FixedWidthType(Type::INTERVAL), unit(unit) {}
 
   IntervalType(const IntervalType& other) : IntervalType(other.unit) {}
 

http://git-wip-us.apache.org/repos/asf/arrow/blob/e3c167bd/cpp/src/arrow/types/primitive.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/types/primitive.cc b/cpp/src/arrow/types/primitive.cc
index 14667ee..f42a3ca 100644
--- a/cpp/src/arrow/types/primitive.cc
+++ b/cpp/src/arrow/types/primitive.cc
@@ -49,7 +49,7 @@ bool PrimitiveArray::EqualsExact(const PrimitiveArray& other) const {
     const uint8_t* this_data = raw_data_;
     const uint8_t* other_data = other.raw_data_;
 
-    auto size_meta = dynamic_cast<const FixedWidthMeta*>(type_.get());
+    auto size_meta = dynamic_cast<const FixedWidthType*>(type_.get());
     int value_byte_size = size_meta->bit_width() / 8;
     DCHECK_GT(value_byte_size, 0);
 

http://git-wip-us.apache.org/repos/asf/arrow/blob/e3c167bd/cpp/src/arrow/util/bit-util.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/util/bit-util.h b/cpp/src/arrow/util/bit-util.h
index 13b7e19..5c8055f 100644
--- a/cpp/src/arrow/util/bit-util.h
+++ b/cpp/src/arrow/util/bit-util.h
@@ -78,6 +78,10 @@ static inline bool IsMultipleOf64(int64_t n) {
   return (n & 63) == 0;
 }
 
+static inline bool IsMultipleOf8(int64_t n) {
+  return (n & 7) == 0;
+}
+
 inline int64_t RoundUpToMultipleOf64(int64_t num) {
   // TODO(wesm): is this definitely needed?
   // DCHECK_GE(num, 0);

http://git-wip-us.apache.org/repos/asf/arrow/blob/e3c167bd/dev/release/run-rat.sh
----------------------------------------------------------------------
diff --git a/dev/release/run-rat.sh b/dev/release/run-rat.sh
index d8ec650..e26dd58 100755
--- a/dev/release/run-rat.sh
+++ b/dev/release/run-rat.sh
@@ -28,6 +28,7 @@ $RAT $1 \
   -e ".*" \
   -e mman.h \
   -e "*_generated.h" \
+  -e "*.json" \
   -e random.h \
   -e status.cc \
   -e status.h \
@@ -49,5 +50,3 @@ else
   echo "${UNAPPROVED} unapproved licences. Check rat report: rat.txt"
   exit 1
 fi
-
-