You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2016/04/18 19:44:45 UTC

arrow git commit: ARROW-82: Initial IPC support for ListArray

Repository: arrow
Updated Branches:
  refs/heads/master 5843e6872 -> 0b472d860


ARROW-82: Initial IPC support for ListArray

This is a work in progress because I can't get clang-tidy to shut-up on parameterized test files (see last commit which I need to revert before merge).   I'd like to make sure this is a clean build and make sure people are ok with these change.  This PR also has a lot of collateral damage for small/large things I cleaned up my way to make this work.  I tried to split the commits up logically but if people would prefer separate pull requests I can try to do that as well.

Open questions:
1.  For supporting strings, binary, etc.  I was thinking of changing thei4 type definitions to inherit from ListType, and to hard-code the child type.  This would allow for simpler IPC code (all of the instantiation of types would happen in construct.h/.cc?) vs handling each of there types separately for IPC.
2.  There are some TODOs I left sprinkled in the code and would like peoples thoughts on if they are urgent/worthwhile for following up on.

Open issues:
1.  Supporting the rest of the List backed logical types
2.  More unit tests for added functionality.

As part of this commit I also refactored the Builder interfaces a little bit for the following reasons:
1.  It seems that if ArrayBuilder owns the bitmap it should be responsible for having methods to manipulate it.
2.  This allows ListBuilder to use the parent class + a BufferBuilder instead of inheriting Int32Builder, which means it doesn't need to do strange length/capacity hacks.

Other misc things here:
1.  Native popcount in test-util.h
2.  Ability to build a new list on top of an existing by incrementally add offsets/sizes
3.  Added missing types primitive types in construct.h for primitive.

Author: Micah Kornfield <em...@gmail.com>

Closes #59 from emkornfield/emk_list_ipc_PR and squashes the following commits:

0c5162d [Micah Kornfield] another format fix
0af558b [Micah Kornfield] remove a now unnecessary NOLINT, but mostly to trigger another travis-ci job that failed due to apt get issue
7789205 [Micah Kornfield] make clang-format-3.7 happy
6e57728 [Micah Kornfield] make format fixes
5e15815 [Micah Kornfield] fix make lint
8982723 [Micah Kornfield] remaining style cleanup
be04b3e [Micah Kornfield] add unit tests for zero length row batches and non-null batches.  fix bugs
10e6651 [Micah Kornfield] add in maximum recursion depth, surfaced possible recursion issue with flatbuffers
3b219a1 [Micah Kornfield] Make append is_null parameter is_valid for api consistency
2e6c477 [Micah Kornfield] add missing RETURN_NOT_OK
e71810b [Micah Kornfield] make Resize and Init virtual on builder
8ab5315 [Micah Kornfield] make clang tidy ignore a little bit less hacky
53d37bc [Micah Kornfield] filter out ipc-adapter-test from tidy
8e464b5 [Micah Kornfield] Fixes per tidy and lint
aa0602c [Micah Kornfield] add potentially useful pool factories to test utils
39c57ed [Micah Kornfield] add potentially useful methods for generative arrays to ipc test-common
a2e1e52 [Micah Kornfield] native popcount
61b0481 [Micah Kornfield] small fixes to naming/style for c++ and potential bugs
5f87aef [Micah Kornfield] Refactor ipc-adapter-test to make it paramaterizable.  add unit test for lists.  make unit test pass and and construction method for list arrays
45e41c0 [Micah Kornfield] Make BufferBuilder more useable for appending primitives
1374485 [Micah Kornfield] augment python unittest to have null element in list
20f984b [Micah Kornfield] refactor primitive builders to use parent builders bitmap
3895d34 [Micah Kornfield] Refactor list builder to use ArrayBuilders bitmap methods and a separate buffer builder
01c50be [Micah Kornfield] Add utility methods for managing null bitmap directly to ArrayBuilder
cc7f851 [Micah Kornfield] add Validate method to array and implementation for ListArray


Project: http://git-wip-us.apache.org/repos/asf/arrow/repo
Commit: http://git-wip-us.apache.org/repos/asf/arrow/commit/0b472d86
Tree: http://git-wip-us.apache.org/repos/asf/arrow/tree/0b472d86
Diff: http://git-wip-us.apache.org/repos/asf/arrow/diff/0b472d86

Branch: refs/heads/master
Commit: 0b472d860260f7063aee742939be23b921382741
Parents: 5843e68
Author: Micah Kornfield <em...@gmail.com>
Authored: Mon Apr 18 19:44:29 2016 +0200
Committer: Wes McKinney <we...@apache.org>
Committed: Mon Apr 18 19:44:29 2016 +0200

----------------------------------------------------------------------
 cpp/CMakeLists.txt                     |   2 +-
 cpp/README.md                          |   9 +-
 cpp/src/.clang-tidy-ignore             |   1 +
 cpp/src/arrow/array.cc                 |   5 +
 cpp/src/arrow/array.h                  |   6 +-
 cpp/src/arrow/builder.cc               |  56 ++++++++
 cpp/src/arrow/builder.h                |  46 ++++--
 cpp/src/arrow/ipc/adapter.cc           | 136 +++++++++++++-----
 cpp/src/arrow/ipc/adapter.h            |  11 +-
 cpp/src/arrow/ipc/ipc-adapter-test.cc  | 216 +++++++++++++++++++++++-----
 cpp/src/arrow/ipc/memory.cc            |   1 +
 cpp/src/arrow/ipc/metadata-internal.cc |   3 +-
 cpp/src/arrow/ipc/metadata-internal.h  |   3 +-
 cpp/src/arrow/ipc/metadata.cc          |   3 +-
 cpp/src/arrow/ipc/test-common.h        |  67 +++++++++
 cpp/src/arrow/parquet/schema.cc        |   2 +-
 cpp/src/arrow/schema.cc                |   2 +-
 cpp/src/arrow/test-util.h              |  49 ++++++-
 cpp/src/arrow/type.h                   |   2 +-
 cpp/src/arrow/types/construct.cc       |  43 +++++-
 cpp/src/arrow/types/construct.h        |   9 ++
 cpp/src/arrow/types/list-test.cc       |  80 ++++++++---
 cpp/src/arrow/types/list.cc            |  60 +++++++-
 cpp/src/arrow/types/list.h             | 112 +++++++++------
 cpp/src/arrow/types/primitive-test.cc  |   6 +-
 cpp/src/arrow/types/primitive.cc       |  45 +-----
 cpp/src/arrow/types/primitive.h        |  41 +++---
 cpp/src/arrow/types/string.h           |   5 +-
 cpp/src/arrow/util/buffer.h            |  59 ++++++--
 cpp/src/arrow/util/logging.h           |   2 +-
 cpp/src/arrow/util/memory-pool.cc      |   2 +-
 python/pyarrow/tests/test_array.py     |   5 +-
 32 files changed, 839 insertions(+), 250 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/arrow/blob/0b472d86/cpp/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt
index f803c0f..b38f91e 100644
--- a/cpp/CMakeLists.txt
+++ b/cpp/CMakeLists.txt
@@ -565,7 +565,7 @@ if (${CLANG_TIDY_FOUND})
   `find ${CMAKE_CURRENT_SOURCE_DIR}/src -name \\*.cc | sed -e '/_generated/g'`)
   # runs clang-tidy and exits with a non-zero exit code if any errors are found.
   add_custom_target(check-clang-tidy ${BUILD_SUPPORT_DIR}/run-clang-tidy.sh ${CLANG_TIDY_BIN} ${CMAKE_BINARY_DIR}/compile_commands.json 
-  0 `find ${CMAKE_CURRENT_SOURCE_DIR}/src -name \\*.cc | sed -e '/_generated/g'`)
+  0 `find ${CMAKE_CURRENT_SOURCE_DIR}/src -name \\*.cc |grep -v -F -f ${CMAKE_CURRENT_SOURCE_DIR}/src/.clang-tidy-ignore | sed -e '/_generated/g'`)
 
 endif()
 

http://git-wip-us.apache.org/repos/asf/arrow/blob/0b472d86/cpp/README.md
----------------------------------------------------------------------
diff --git a/cpp/README.md b/cpp/README.md
index 3f5da21..c8cd86f 100644
--- a/cpp/README.md
+++ b/cpp/README.md
@@ -76,4 +76,11 @@ build failures by running the following checks before submitting your pull reque
 
 Note that the clang-tidy target may take a while to run.  You might consider
 running clang-tidy separately on the files you have added/changed before
-invoking the make target to reduce iteration time.
+invoking the make target to reduce iteration time.  Also, it might generate warnings
+that aren't valid.  To avoid these you can use add a line comment `// NOLINT`. If  
+NOLINT doesn't suppress the warnings, you add the file in question to 
+the .clang-tidy-ignore file.  This will allow `make check-clang-tidy` to pass in 
+travis-CI (but still surface the potential warnings in `make clang-tidy`).   Ideally,
+both of these options would be used rarely.  Current known uses-cases whent hey are required:
+
+*  Parameterized tests in google test.

http://git-wip-us.apache.org/repos/asf/arrow/blob/0b472d86/cpp/src/.clang-tidy-ignore
----------------------------------------------------------------------
diff --git a/cpp/src/.clang-tidy-ignore b/cpp/src/.clang-tidy-ignore
new file mode 100644
index 0000000..a128c38
--- /dev/null
+++ b/cpp/src/.clang-tidy-ignore
@@ -0,0 +1 @@
+ipc-adapter-test.cc

http://git-wip-us.apache.org/repos/asf/arrow/blob/0b472d86/cpp/src/arrow/array.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/array.cc b/cpp/src/arrow/array.cc
index a153686..c6b9b15 100644
--- a/cpp/src/arrow/array.cc
+++ b/cpp/src/arrow/array.cc
@@ -20,6 +20,7 @@
 #include <cstdint>
 
 #include "arrow/util/buffer.h"
+#include "arrow/util/status.h"
 
 namespace arrow {
 
@@ -47,6 +48,10 @@ bool Array::EqualsExact(const Array& other) const {
   return true;
 }
 
+Status Array::Validate() const {
+  return Status::OK();
+}
+
 bool NullArray::Equals(const std::shared_ptr<Array>& arr) const {
   if (this == arr.get()) { return true; }
   if (Type::NA != arr->type_enum()) { return false; }

http://git-wip-us.apache.org/repos/asf/arrow/blob/0b472d86/cpp/src/arrow/array.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/array.h b/cpp/src/arrow/array.h
index c6735f8..f98c4c2 100644
--- a/cpp/src/arrow/array.h
+++ b/cpp/src/arrow/array.h
@@ -28,6 +28,7 @@
 namespace arrow {
 
 class Buffer;
+class Status;
 
 // Immutable data array with some logical type and some length. Any memory is
 // owned by the respective Buffer instance (or its parents).
@@ -39,7 +40,7 @@ class Array {
   Array(const std::shared_ptr<DataType>& type, int32_t length, int32_t null_count = 0,
       const std::shared_ptr<Buffer>& null_bitmap = nullptr);
 
-  virtual ~Array() {}
+  virtual ~Array() = default;
 
   // Determine if a slot is null. For inner loops. Does *not* boundscheck
   bool IsNull(int i) const {
@@ -58,6 +59,9 @@ class Array {
 
   bool EqualsExact(const Array& arr) const;
   virtual bool Equals(const std::shared_ptr<Array>& arr) const = 0;
+  // Determines if the array is internally consistent.  Defaults to always
+  // returning Status::OK.  This can be an expensive check.
+  virtual Status Validate() const;
 
  protected:
   std::shared_ptr<DataType> type_;

http://git-wip-us.apache.org/repos/asf/arrow/blob/0b472d86/cpp/src/arrow/builder.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/builder.cc b/cpp/src/arrow/builder.cc
index 1447078..87c1219 100644
--- a/cpp/src/arrow/builder.cc
+++ b/cpp/src/arrow/builder.cc
@@ -25,6 +25,25 @@
 
 namespace arrow {
 
+Status ArrayBuilder::AppendToBitmap(bool is_valid) {
+  if (length_ == capacity_) {
+    // If the capacity was not already a multiple of 2, do so here
+    // TODO(emkornfield) doubling isn't great default allocation practice
+    // see https://github.com/facebook/folly/blob/master/folly/docs/FBVector.md
+    // fo discussion
+    RETURN_NOT_OK(Resize(util::next_power2(capacity_ + 1)));
+  }
+  UnsafeAppendToBitmap(is_valid);
+  return Status::OK();
+}
+
+Status ArrayBuilder::AppendToBitmap(const uint8_t* valid_bytes, int32_t length) {
+  RETURN_NOT_OK(Reserve(length));
+
+  UnsafeAppendToBitmap(valid_bytes, length);
+  return Status::OK();
+}
+
 Status ArrayBuilder::Init(int32_t capacity) {
   capacity_ = capacity;
   int32_t to_alloc = util::ceil_byte(capacity) / 8;
@@ -36,6 +55,7 @@ Status ArrayBuilder::Init(int32_t capacity) {
 }
 
 Status ArrayBuilder::Resize(int32_t new_bits) {
+  if (!null_bitmap_) { return Init(new_bits); }
   int32_t new_bytes = util::ceil_byte(new_bits) / 8;
   int32_t old_bytes = null_bitmap_->size();
   RETURN_NOT_OK(null_bitmap_->Resize(new_bytes));
@@ -56,10 +76,46 @@ Status ArrayBuilder::Advance(int32_t elements) {
 
 Status ArrayBuilder::Reserve(int32_t elements) {
   if (length_ + elements > capacity_) {
+    // TODO(emkornfield) power of 2 growth is potentially suboptimal
     int32_t new_capacity = util::next_power2(length_ + elements);
     return Resize(new_capacity);
   }
   return Status::OK();
 }
 
+Status ArrayBuilder::SetNotNull(int32_t length) {
+  RETURN_NOT_OK(Reserve(length));
+  UnsafeSetNotNull(length);
+  return Status::OK();
+}
+
+void ArrayBuilder::UnsafeAppendToBitmap(bool is_valid) {
+  if (is_valid) {
+    util::set_bit(null_bitmap_data_, length_);
+  } else {
+    ++null_count_;
+  }
+  ++length_;
+}
+
+void ArrayBuilder::UnsafeAppendToBitmap(const uint8_t* valid_bytes, int32_t length) {
+  if (valid_bytes == nullptr) {
+    UnsafeSetNotNull(length);
+    return;
+  }
+  for (int32_t i = 0; i < length; ++i) {
+    // TODO(emkornfield) Optimize for large values of length?
+    UnsafeAppendToBitmap(valid_bytes[i] > 0);
+  }
+}
+
+void ArrayBuilder::UnsafeSetNotNull(int32_t length) {
+  const int32_t new_length = length + length_;
+  // TODO(emkornfield) Optimize for large values of length?
+  for (int32_t i = length_; i < new_length; ++i) {
+    util::set_bit(null_bitmap_data_, i);
+  }
+  length_ = new_length;
+}
+
 }  // namespace arrow

http://git-wip-us.apache.org/repos/asf/arrow/blob/0b472d86/cpp/src/arrow/builder.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/builder.h b/cpp/src/arrow/builder.h
index 21a6341..7d3f439 100644
--- a/cpp/src/arrow/builder.h
+++ b/cpp/src/arrow/builder.h
@@ -34,7 +34,10 @@ class PoolBuffer;
 
 static constexpr int32_t MIN_BUILDER_CAPACITY = 1 << 5;
 
-// Base class for all data array builders
+// Base class for all data array builders.
+// This class provides a facilities for incrementally building the null bitmap
+// (see Append methods) and as a side effect the current number of slots and
+// the null count.
 class ArrayBuilder {
  public:
   explicit ArrayBuilder(MemoryPool* pool, const TypePtr& type)
@@ -46,7 +49,7 @@ class ArrayBuilder {
         length_(0),
         capacity_(0) {}
 
-  virtual ~ArrayBuilder() {}
+  virtual ~ArrayBuilder() = default;
 
   // For nested types. Since the objects are owned by this class instance, we
   // skip shared pointers and just return a raw pointer
@@ -58,14 +61,27 @@ class ArrayBuilder {
   int32_t null_count() const { return null_count_; }
   int32_t capacity() const { return capacity_; }
 
-  // Allocates requires memory at this level, but children need to be
-  // initialized independently
-  Status Init(int32_t capacity);
+  // Append to null bitmap
+  Status AppendToBitmap(bool is_valid);
+  // Vector append. Treat each zero byte as a null.   If valid_bytes is null
+  // assume all of length bits are valid.
+  Status AppendToBitmap(const uint8_t* valid_bytes, int32_t length);
+  // Set the next length bits to not null (i.e. valid).
+  Status SetNotNull(int32_t length);
 
-  // Resizes the null_bitmap array
-  Status Resize(int32_t new_bits);
+  // Allocates initial capacity requirements for the builder.  In most
+  // cases subclasses should override and call there parent classes
+  // method as well.
+  virtual Status Init(int32_t capacity);
 
-  Status Reserve(int32_t extra_bits);
+  // Resizes the null_bitmap array.  In most
+  // cases subclasses should override and call there parent classes
+  // method as well.
+  virtual Status Resize(int32_t new_bits);
+
+  // Ensures there is enough space for adding the number of elements by checking
+  // capacity and calling Resize if necessary.
+  Status Reserve(int32_t elements);
 
   // For cases where raw data was memcpy'd into the internal buffers, allows us
   // to advance the length of the builder. It is your responsibility to use
@@ -75,7 +91,7 @@ class ArrayBuilder {
   const std::shared_ptr<PoolBuffer>& null_bitmap() const { return null_bitmap_; }
 
   // Creates new array object to hold the contents of the builder and transfers
-  // ownership of the data
+  // ownership of the data.  This resets all variables on the builder.
   virtual std::shared_ptr<Array> Finish() = 0;
 
   const std::shared_ptr<DataType>& type() const { return type_; }
@@ -97,6 +113,18 @@ class ArrayBuilder {
   // Child value array builders. These are owned by this class
   std::vector<std::unique_ptr<ArrayBuilder>> children_;
 
+  //
+  // Unsafe operations (don't check capacity/don't resize)
+  //
+
+  // Append to null bitmap.
+  void UnsafeAppendToBitmap(bool is_valid);
+  // Vector append. Treat each zero byte as a nullzero. If valid_bytes is null
+  // assume all of length bits are valid.
+  void UnsafeAppendToBitmap(const uint8_t* valid_bytes, int32_t length);
+  // Set the next length bits to not null (i.e. valid).
+  void UnsafeSetNotNull(int32_t length);
+
  private:
   DISALLOW_COPY_AND_ASSIGN(ArrayBuilder);
 };

http://git-wip-us.apache.org/repos/asf/arrow/blob/0b472d86/cpp/src/arrow/ipc/adapter.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/adapter.cc b/cpp/src/arrow/ipc/adapter.cc
index 2f72c3a..bf6fa94 100644
--- a/cpp/src/arrow/ipc/adapter.cc
+++ b/cpp/src/arrow/ipc/adapter.cc
@@ -19,17 +19,19 @@
 
 #include <cstdint>
 #include <cstring>
+#include <sstream>
 #include <vector>
 
 #include "arrow/array.h"
-#include "arrow/ipc/memory.h"
 #include "arrow/ipc/Message_generated.h"
-#include "arrow/ipc/metadata.h"
+#include "arrow/ipc/memory.h"
 #include "arrow/ipc/metadata-internal.h"
+#include "arrow/ipc/metadata.h"
 #include "arrow/schema.h"
 #include "arrow/table.h"
 #include "arrow/type.h"
 #include "arrow/types/construct.h"
+#include "arrow/types/list.h"
 #include "arrow/types/primitive.h"
 #include "arrow/util/buffer.h"
 #include "arrow/util/logging.h"
@@ -63,44 +65,70 @@ static bool IsPrimitive(const DataType* type) {
   }
 }
 
+static bool IsListType(const DataType* type) {
+  DCHECK(type != nullptr);
+  switch (type->type) {
+    // TODO(emkornfield) grouping like this are used in a few places in the
+    // code consider using pattern like:
+    // http://stackoverflow.com/questions/26784685/c-macro-for-calling-function-based-on-enum-type
+    //
+    // TODO(emkornfield) Fix type systems so these are all considered lists and
+    // the types behave the same way?
+    // case Type::BINARY:
+    // case Type::CHAR:
+    case Type::LIST:
+      // see todo on common types
+      // case Type::STRING:
+      // case Type::VARCHAR:
+      return true;
+    default:
+      return false;
+  }
+}
+
 // ----------------------------------------------------------------------
 // Row batch write path
 
 Status VisitArray(const Array* arr, std::vector<flatbuf::FieldNode>* field_nodes,
-    std::vector<std::shared_ptr<Buffer>>* buffers) {
-  if (IsPrimitive(arr->type().get())) {
-    const PrimitiveArray* prim_arr = static_cast<const PrimitiveArray*>(arr);
-
-    field_nodes->push_back(
-        flatbuf::FieldNode(prim_arr->length(), prim_arr->null_count()));
+    std::vector<std::shared_ptr<Buffer>>* buffers, int max_recursion_depth) {
+  if (max_recursion_depth <= 0) { return Status::Invalid("Max recursion depth reached"); }
+  DCHECK(arr);
+  DCHECK(field_nodes);
+  // push back all common elements
+  field_nodes->push_back(flatbuf::FieldNode(arr->length(), arr->null_count()));
+  if (arr->null_count() > 0) {
+    buffers->push_back(arr->null_bitmap());
+  } else {
+    // Push a dummy zero-length buffer, not to be copied
+    buffers->push_back(std::make_shared<Buffer>(nullptr, 0));
+  }
 
-    if (prim_arr->null_count() > 0) {
-      buffers->push_back(prim_arr->null_bitmap());
-    } else {
-      // Push a dummy zero-length buffer, not to be copied
-      buffers->push_back(std::make_shared<Buffer>(nullptr, 0));
-    }
+  const DataType* arr_type = arr->type().get();
+  if (IsPrimitive(arr_type)) {
+    const auto prim_arr = static_cast<const PrimitiveArray*>(arr);
     buffers->push_back(prim_arr->data());
-  } else if (arr->type_enum() == Type::LIST) {
-    // TODO(wesm)
-    return Status::NotImplemented("List type");
+  } else if (IsListType(arr_type)) {
+    const auto list_arr = static_cast<const ListArray*>(arr);
+    buffers->push_back(list_arr->offset_buffer());
+    RETURN_NOT_OK(VisitArray(
+        list_arr->values().get(), field_nodes, buffers, max_recursion_depth - 1));
   } else if (arr->type_enum() == Type::STRUCT) {
     // TODO(wesm)
     return Status::NotImplemented("Struct type");
   }
-
   return Status::OK();
 }
 
 class RowBatchWriter {
  public:
-  explicit RowBatchWriter(const RowBatch* batch) : batch_(batch) {}
+  RowBatchWriter(const RowBatch* batch, int max_recursion_depth)
+      : batch_(batch), max_recursion_depth_(max_recursion_depth) {}
 
   Status AssemblePayload() {
     // Perform depth-first traversal of the row-batch
     for (int i = 0; i < batch_->num_columns(); ++i) {
       const Array* arr = batch_->column(i).get();
-      RETURN_NOT_OK(VisitArray(arr, &field_nodes_, &buffers_));
+      RETURN_NOT_OK(VisitArray(arr, &field_nodes_, &buffers_, max_recursion_depth_));
     }
     return Status::OK();
   }
@@ -111,8 +139,10 @@ class RowBatchWriter {
     int64_t offset = 0;
     for (size_t i = 0; i < buffers_.size(); ++i) {
       const Buffer* buffer = buffers_[i].get();
-      int64_t size = buffer->size();
+      int64_t size = 0;
 
+      // The buffer might be null if we are handling zero row lengths.
+      if (buffer) { size = buffer->size(); }
       // TODO(wesm): We currently have no notion of shared memory page id's,
       // but we've included it in the metadata IDL for when we have it in the
       // future. Use page=0 for now
@@ -171,11 +201,13 @@ class RowBatchWriter {
   std::vector<flatbuf::FieldNode> field_nodes_;
   std::vector<flatbuf::Buffer> buffer_meta_;
   std::vector<std::shared_ptr<Buffer>> buffers_;
+  int max_recursion_depth_;
 };
 
-Status WriteRowBatch(
-    MemorySource* dst, const RowBatch* batch, int64_t position, int64_t* header_offset) {
-  RowBatchWriter serializer(batch);
+Status WriteRowBatch(MemorySource* dst, const RowBatch* batch, int64_t position,
+    int64_t* header_offset, int max_recursion_depth) {
+  DCHECK_GT(max_recursion_depth, 0);
+  RowBatchWriter serializer(batch, max_recursion_depth);
   RETURN_NOT_OK(serializer.AssemblePayload());
   return serializer.Write(dst, position, header_offset);
 }
@@ -186,8 +218,9 @@ static constexpr int64_t INIT_METADATA_SIZE = 4096;
 
 class RowBatchReader::Impl {
  public:
-  Impl(MemorySource* source, const std::shared_ptr<RecordBatchMessage>& metadata)
-      : source_(source), metadata_(metadata) {
+  Impl(MemorySource* source, const std::shared_ptr<RecordBatchMessage>& metadata,
+      int max_recursion_depth)
+      : source_(source), metadata_(metadata), max_recursion_depth_(max_recursion_depth) {
     num_buffers_ = metadata->num_buffers();
     num_flattened_fields_ = metadata->num_fields();
   }
@@ -203,7 +236,7 @@ class RowBatchReader::Impl {
     buffer_index_ = 0;
     for (int i = 0; i < schema->num_fields(); ++i) {
       const Field* field = schema->field(i).get();
-      RETURN_NOT_OK(NextArray(field, &arrays[i]));
+      RETURN_NOT_OK(NextArray(field, max_recursion_depth_, &arrays[i]));
     }
 
     *out = std::make_shared<RowBatch>(schema, metadata_->length(), arrays);
@@ -213,8 +246,12 @@ class RowBatchReader::Impl {
  private:
   // Traverse the flattened record batch metadata and reassemble the
   // corresponding array containers
-  Status NextArray(const Field* field, std::shared_ptr<Array>* out) {
-    const std::shared_ptr<DataType>& type = field->type;
+  Status NextArray(
+      const Field* field, int max_recursion_depth, std::shared_ptr<Array>* out) {
+    const TypePtr& type = field->type;
+    if (max_recursion_depth <= 0) {
+      return Status::Invalid("Max recursion depth reached");
+    }
 
     // pop off a field
     if (field_index_ >= num_flattened_fields_) {
@@ -226,23 +263,42 @@ class RowBatchReader::Impl {
     // we can skip that buffer without reading from shared memory
     FieldMetadata field_meta = metadata_->field(field_index_++);
 
+    // extract null_bitmap which is common to all arrays
+    std::shared_ptr<Buffer> null_bitmap;
+    if (field_meta.null_count == 0) {
+      ++buffer_index_;
+    } else {
+      RETURN_NOT_OK(GetBuffer(buffer_index_++, &null_bitmap));
+    }
+
     if (IsPrimitive(type.get())) {
-      std::shared_ptr<Buffer> null_bitmap;
       std::shared_ptr<Buffer> data;
-      if (field_meta.null_count == 0) {
-        null_bitmap = nullptr;
-        ++buffer_index_;
-      } else {
-        RETURN_NOT_OK(GetBuffer(buffer_index_++, &null_bitmap));
-      }
       if (field_meta.length > 0) {
         RETURN_NOT_OK(GetBuffer(buffer_index_++, &data));
       } else {
+        buffer_index_++;
         data.reset(new Buffer(nullptr, 0));
       }
       return MakePrimitiveArray(
           type, field_meta.length, data, field_meta.null_count, null_bitmap, out);
     }
+
+    if (IsListType(type.get())) {
+      std::shared_ptr<Buffer> offsets;
+      RETURN_NOT_OK(GetBuffer(buffer_index_++, &offsets));
+      const int num_children = type->num_children();
+      if (num_children != 1) {
+        std::stringstream ss;
+        ss << "Field: " << field->ToString()
+           << " has wrong number of children:" << num_children;
+        return Status::Invalid(ss.str());
+      }
+      std::shared_ptr<Array> values_array;
+      RETURN_NOT_OK(
+          NextArray(type->child(0).get(), max_recursion_depth - 1, &values_array));
+      return MakeListArray(type, field_meta.length, offsets, values_array,
+          field_meta.null_count, null_bitmap, out);
+    }
     return Status::NotImplemented("Non-primitive types not complete yet");
   }
 
@@ -256,12 +312,18 @@ class RowBatchReader::Impl {
 
   int field_index_;
   int buffer_index_;
+  int max_recursion_depth_;
   int num_buffers_;
   int num_flattened_fields_;
 };
 
 Status RowBatchReader::Open(
     MemorySource* source, int64_t position, std::shared_ptr<RowBatchReader>* out) {
+  return Open(source, position, kMaxIpcRecursionDepth, out);
+}
+
+Status RowBatchReader::Open(MemorySource* source, int64_t position,
+    int max_recursion_depth, std::shared_ptr<RowBatchReader>* out) {
   std::shared_ptr<Buffer> metadata;
   RETURN_NOT_OK(source->ReadAt(position, INIT_METADATA_SIZE, &metadata));
 
@@ -286,7 +348,7 @@ Status RowBatchReader::Open(
   std::shared_ptr<RecordBatchMessage> batch_meta = message->GetRecordBatch();
 
   std::shared_ptr<RowBatchReader> result(new RowBatchReader());
-  result->impl_.reset(new Impl(source, batch_meta));
+  result->impl_.reset(new Impl(source, batch_meta, max_recursion_depth));
   *out = result;
 
   return Status::OK();

http://git-wip-us.apache.org/repos/asf/arrow/blob/0b472d86/cpp/src/arrow/ipc/adapter.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/adapter.h b/cpp/src/arrow/ipc/adapter.h
index d453fa0..4c9a8a9 100644
--- a/cpp/src/arrow/ipc/adapter.h
+++ b/cpp/src/arrow/ipc/adapter.h
@@ -38,7 +38,9 @@ class RecordBatchMessage;
 
 // ----------------------------------------------------------------------
 // Write path
-
+// We have trouble decoding flatbuffers if the size i > 70, so 64 is a nice round number
+// TODO(emkornfield) investigate this more
+constexpr int kMaxIpcRecursionDepth = 64;
 // Write the RowBatch (collection of equal-length Arrow arrays) to the memory
 // source at the indicated position
 //
@@ -52,8 +54,8 @@ class RecordBatchMessage;
 //
 // Finally, the memory offset to the start of the metadata / data header is
 // returned in an out-variable
-Status WriteRowBatch(
-    MemorySource* dst, const RowBatch* batch, int64_t position, int64_t* header_offset);
+Status WriteRowBatch(MemorySource* dst, const RowBatch* batch, int64_t position,
+    int64_t* header_offset, int max_recursion_depth = kMaxIpcRecursionDepth);
 
 // int64_t GetRowBatchMetadata(const RowBatch* batch);
 
@@ -70,6 +72,9 @@ class RowBatchReader {
   static Status Open(
       MemorySource* source, int64_t position, std::shared_ptr<RowBatchReader>* out);
 
+  static Status Open(MemorySource* source, int64_t position, int max_recursion_depth,
+      std::shared_ptr<RowBatchReader>* out);
+
   // Reassemble the row batch. A Schema is required to be able to construct the
   // right array containers
   Status GetRowBatch(

http://git-wip-us.apache.org/repos/asf/arrow/blob/0b472d86/cpp/src/arrow/ipc/ipc-adapter-test.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/ipc-adapter-test.cc b/cpp/src/arrow/ipc/ipc-adapter-test.cc
index fbdda77..c243cfb 100644
--- a/cpp/src/arrow/ipc/ipc-adapter-test.cc
+++ b/cpp/src/arrow/ipc/ipc-adapter-test.cc
@@ -18,9 +18,7 @@
 #include <cstdint>
 #include <cstdio>
 #include <cstring>
-#include <limits>
 #include <memory>
-#include <random>
 #include <string>
 #include <vector>
 
@@ -31,6 +29,7 @@
 #include "arrow/ipc/test-common.h"
 
 #include "arrow/test-util.h"
+#include "arrow/types/list.h"
 #include "arrow/types/primitive.h"
 #include "arrow/util/bit-util.h"
 #include "arrow/util/buffer.h"
@@ -40,25 +39,56 @@
 namespace arrow {
 namespace ipc {
 
-class TestWriteRowBatch : public ::testing::Test, public MemoryMapFixture {
+// TODO(emkornfield) convert to google style kInt32, etc?
+const auto INT32 = std::make_shared<Int32Type>();
+const auto LIST_INT32 = std::make_shared<ListType>(INT32);
+const auto LIST_LIST_INT32 = std::make_shared<ListType>(LIST_INT32);
+
+typedef Status MakeRowBatch(std::shared_ptr<RowBatch>* out);
+
+class TestWriteRowBatch : public ::testing::TestWithParam<MakeRowBatch*>,
+                          public MemoryMapFixture {
  public:
   void SetUp() { pool_ = default_memory_pool(); }
   void TearDown() { MemoryMapFixture::TearDown(); }
 
-  void InitMemoryMap(int64_t size) {
+  Status RoundTripHelper(const RowBatch& batch, int memory_map_size,
+      std::shared_ptr<RowBatch>* batch_result) {
     std::string path = "test-write-row-batch";
-    MemoryMapFixture::CreateFile(path, size);
-    ASSERT_OK(MemoryMappedSource::Open(path, MemorySource::READ_WRITE, &mmap_));
+    MemoryMapFixture::InitMemoryMap(memory_map_size, path, &mmap_);
+    int64_t header_location;
+    RETURN_NOT_OK(WriteRowBatch(mmap_.get(), &batch, 0, &header_location));
+
+    std::shared_ptr<RowBatchReader> reader;
+    RETURN_NOT_OK(RowBatchReader::Open(mmap_.get(), header_location, &reader));
+
+    RETURN_NOT_OK(reader->GetRowBatch(batch.schema(), batch_result));
+    return Status::OK();
   }
 
  protected:
-  MemoryPool* pool_;
   std::shared_ptr<MemoryMappedSource> mmap_;
+  MemoryPool* pool_;
 };
 
-const auto INT32 = std::make_shared<Int32Type>();
+TEST_P(TestWriteRowBatch, RoundTrip) {
+  std::shared_ptr<RowBatch> batch;
+  ASSERT_OK((*GetParam())(&batch));  // NOLINT clang-tidy gtest issue
+  std::shared_ptr<RowBatch> batch_result;
+  ASSERT_OK(RoundTripHelper(*batch, 1 << 16, &batch_result));
+
+  // do checks
+  ASSERT_TRUE(batch->schema()->Equals(batch_result->schema()));
+  ASSERT_EQ(batch->num_columns(), batch_result->num_columns())
+      << batch->schema()->ToString() << " result: " << batch_result->schema()->ToString();
+  EXPECT_EQ(batch->num_rows(), batch_result->num_rows());
+  for (int i = 0; i < batch->num_columns(); ++i) {
+    EXPECT_TRUE(batch->column(i)->Equals(batch_result->column(i)))
+        << "Idx: " << i << " Name: " << batch->column_name(i);
+  }
+}
 
-TEST_F(TestWriteRowBatch, IntegerRoundTrip) {
+Status MakeIntRowBatch(std::shared_ptr<RowBatch>* out) {
   const int length = 1000;
 
   // Make the schema
@@ -67,41 +97,159 @@ TEST_F(TestWriteRowBatch, IntegerRoundTrip) {
   std::shared_ptr<Schema> schema(new Schema({f0, f1}));
 
   // Example data
+  std::shared_ptr<Array> a0, a1;
+  MemoryPool* pool = default_memory_pool();
+  RETURN_NOT_OK(MakeRandomInt32Array(length, false, pool, &a0));
+  RETURN_NOT_OK(MakeRandomInt32Array(length, true, pool, &a1));
+  out->reset(new RowBatch(schema, length, {a0, a1}));
+  return Status::OK();
+}
 
-  auto data = std::make_shared<PoolBuffer>(pool_);
-  ASSERT_OK(data->Resize(length * sizeof(int32_t)));
-  test::rand_uniform_int(length, 0, 0, std::numeric_limits<int32_t>::max(),
-      reinterpret_cast<int32_t*>(data->mutable_data()));
+Status MakeListRowBatch(std::shared_ptr<RowBatch>* out) {
+  // Make the schema
+  auto f0 = std::make_shared<Field>("f0", LIST_INT32);
+  auto f1 = std::make_shared<Field>("f1", LIST_LIST_INT32);
+  auto f2 = std::make_shared<Field>("f2", INT32);
+  std::shared_ptr<Schema> schema(new Schema({f0, f1, f2}));
 
-  auto null_bitmap = std::make_shared<PoolBuffer>(pool_);
-  int null_bytes = util::bytes_for_bits(length);
-  ASSERT_OK(null_bitmap->Resize(null_bytes));
-  test::random_bytes(null_bytes, 0, null_bitmap->mutable_data());
+  // Example data
 
-  auto a0 = std::make_shared<Int32Array>(length, data);
-  auto a1 = std::make_shared<Int32Array>(
-      length, data, test::bitmap_popcount(null_bitmap->data(), length), null_bitmap);
+  MemoryPool* pool = default_memory_pool();
+  const int length = 200;
+  std::shared_ptr<Array> leaf_values, list_array, list_list_array, flat_array;
+  const bool include_nulls = true;
+  RETURN_NOT_OK(MakeRandomInt32Array(1000, include_nulls, pool, &leaf_values));
+  RETURN_NOT_OK(
+      MakeRandomListArray(leaf_values, length, include_nulls, pool, &list_array));
+  RETURN_NOT_OK(
+      MakeRandomListArray(list_array, length, include_nulls, pool, &list_list_array));
+  RETURN_NOT_OK(MakeRandomInt32Array(length, include_nulls, pool, &flat_array));
+  out->reset(new RowBatch(schema, length, {list_array, list_list_array, flat_array}));
+  return Status::OK();
+}
 
-  RowBatch batch(schema, length, {a0, a1});
+Status MakeZeroLengthRowBatch(std::shared_ptr<RowBatch>* out) {
+  // Make the schema
+  auto f0 = std::make_shared<Field>("f0", LIST_INT32);
+  auto f1 = std::make_shared<Field>("f1", LIST_LIST_INT32);
+  auto f2 = std::make_shared<Field>("f2", INT32);
+  std::shared_ptr<Schema> schema(new Schema({f0, f1, f2}));
 
-  // TODO(wesm): computing memory requirements for a row batch
-  // 64k is plenty of space
-  InitMemoryMap(1 << 16);
+  // Example data
+  MemoryPool* pool = default_memory_pool();
+  const int length = 200;
+  const bool include_nulls = true;
+  std::shared_ptr<Array> leaf_values, list_array, list_list_array, flat_array;
+  RETURN_NOT_OK(MakeRandomInt32Array(0, include_nulls, pool, &leaf_values));
+  RETURN_NOT_OK(MakeRandomListArray(leaf_values, 0, include_nulls, pool, &list_array));
+  RETURN_NOT_OK(
+      MakeRandomListArray(list_array, 0, include_nulls, pool, &list_list_array));
+  RETURN_NOT_OK(MakeRandomInt32Array(0, include_nulls, pool, &flat_array));
+  out->reset(new RowBatch(schema, length, {list_array, list_list_array, flat_array}));
+  return Status::OK();
+}
 
-  int64_t header_location;
-  ASSERT_OK(WriteRowBatch(mmap_.get(), &batch, 0, &header_location));
+Status MakeNonNullRowBatch(std::shared_ptr<RowBatch>* out) {
+  // Make the schema
+  auto f0 = std::make_shared<Field>("f0", LIST_INT32);
+  auto f1 = std::make_shared<Field>("f1", LIST_LIST_INT32);
+  auto f2 = std::make_shared<Field>("f2", INT32);
+  std::shared_ptr<Schema> schema(new Schema({f0, f1, f2}));
 
-  std::shared_ptr<RowBatchReader> result;
-  ASSERT_OK(RowBatchReader::Open(mmap_.get(), header_location, &result));
+  // Example data
+  MemoryPool* pool = default_memory_pool();
+  const int length = 200;
+  std::shared_ptr<Array> leaf_values, list_array, list_list_array, flat_array;
 
-  std::shared_ptr<RowBatch> batch_result;
-  ASSERT_OK(result->GetRowBatch(schema, &batch_result));
-  EXPECT_EQ(batch.num_rows(), batch_result->num_rows());
+  RETURN_NOT_OK(MakeRandomInt32Array(1000, true, pool, &leaf_values));
+  bool include_nulls = false;
+  RETURN_NOT_OK(MakeRandomListArray(leaf_values, 50, include_nulls, pool, &list_array));
+  RETURN_NOT_OK(
+      MakeRandomListArray(list_array, 50, include_nulls, pool, &list_list_array));
+  RETURN_NOT_OK(MakeRandomInt32Array(0, include_nulls, pool, &flat_array));
+  out->reset(new RowBatch(schema, length, {list_array, list_list_array, flat_array}));
+  return Status::OK();
+}
 
-  for (int i = 0; i < batch.num_columns(); ++i) {
-    EXPECT_TRUE(batch.column(i)->Equals(batch_result->column(i))) << i
-                                                                  << batch.column_name(i);
+Status MakeDeeplyNestedList(std::shared_ptr<RowBatch>* out) {
+  const int batch_length = 5;
+  TypePtr type = INT32;
+
+  MemoryPool* pool = default_memory_pool();
+  ArrayPtr array;
+  const bool include_nulls = true;
+  RETURN_NOT_OK(MakeRandomInt32Array(1000, include_nulls, pool, &array));
+  for (int i = 0; i < 63; ++i) {
+    type = std::static_pointer_cast<DataType>(std::make_shared<ListType>(type));
+    RETURN_NOT_OK(MakeRandomListArray(array, batch_length, include_nulls, pool, &array));
+  }
+
+  auto f0 = std::make_shared<Field>("f0", type);
+  std::shared_ptr<Schema> schema(new Schema({f0}));
+  std::vector<ArrayPtr> arrays = {array};
+  out->reset(new RowBatch(schema, batch_length, arrays));
+  return Status::OK();
+}
+
+INSTANTIATE_TEST_CASE_P(RoundTripTests, TestWriteRowBatch,
+    ::testing::Values(&MakeIntRowBatch, &MakeListRowBatch, &MakeNonNullRowBatch,
+                            &MakeZeroLengthRowBatch, &MakeDeeplyNestedList));
+
+class RecursionLimits : public ::testing::Test, public MemoryMapFixture {
+ public:
+  void SetUp() { pool_ = default_memory_pool(); }
+  void TearDown() { MemoryMapFixture::TearDown(); }
+
+  Status WriteToMmap(int recursion_level, bool override_level,
+      int64_t* header_out = nullptr, std::shared_ptr<Schema>* schema_out = nullptr) {
+    const int batch_length = 5;
+    TypePtr type = INT32;
+    ArrayPtr array;
+    const bool include_nulls = true;
+    RETURN_NOT_OK(MakeRandomInt32Array(1000, include_nulls, pool_, &array));
+    for (int i = 0; i < recursion_level; ++i) {
+      type = std::static_pointer_cast<DataType>(std::make_shared<ListType>(type));
+      RETURN_NOT_OK(
+          MakeRandomListArray(array, batch_length, include_nulls, pool_, &array));
+    }
+
+    auto f0 = std::make_shared<Field>("f0", type);
+    std::shared_ptr<Schema> schema(new Schema({f0}));
+    if (schema_out != nullptr) { *schema_out = schema; }
+    std::vector<ArrayPtr> arrays = {array};
+    auto batch = std::make_shared<RowBatch>(schema, batch_length, arrays);
+
+    std::string path = "test-write-past-max-recursion";
+    const int memory_map_size = 1 << 16;
+    MemoryMapFixture::InitMemoryMap(memory_map_size, path, &mmap_);
+    int64_t header_location;
+    int64_t* header_out_param = header_out == nullptr ? &header_location : header_out;
+    if (override_level) {
+      return WriteRowBatch(
+          mmap_.get(), batch.get(), 0, header_out_param, recursion_level + 1);
+    } else {
+      return WriteRowBatch(mmap_.get(), batch.get(), 0, header_out_param);
+    }
   }
+
+ protected:
+  std::shared_ptr<MemoryMappedSource> mmap_;
+  MemoryPool* pool_;
+};
+
+TEST_F(RecursionLimits, WriteLimit) {
+  ASSERT_RAISES(Invalid, WriteToMmap((1 << 8) + 1, false));
+}
+
+TEST_F(RecursionLimits, ReadLimit) {
+  int64_t header_location;
+  std::shared_ptr<Schema> schema;
+  ASSERT_OK(WriteToMmap(64, true, &header_location, &schema));
+
+  std::shared_ptr<RowBatchReader> reader;
+  ASSERT_OK(RowBatchReader::Open(mmap_.get(), header_location, &reader));
+  std::shared_ptr<RowBatch> batch_result;
+  ASSERT_RAISES(Invalid, reader->GetRowBatch(schema, &batch_result));
 }
 
 }  // namespace ipc

http://git-wip-us.apache.org/repos/asf/arrow/blob/0b472d86/cpp/src/arrow/ipc/memory.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/memory.cc b/cpp/src/arrow/ipc/memory.cc
index 2b077e9..84cbc18 100644
--- a/cpp/src/arrow/ipc/memory.cc
+++ b/cpp/src/arrow/ipc/memory.cc
@@ -18,6 +18,7 @@
 #include "arrow/ipc/memory.h"
 
 #include <sys/mman.h>  // For memory-mapping
+
 #include <algorithm>
 #include <cerrno>
 #include <cstdint>

http://git-wip-us.apache.org/repos/asf/arrow/blob/0b472d86/cpp/src/arrow/ipc/metadata-internal.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/metadata-internal.cc b/cpp/src/arrow/ipc/metadata-internal.cc
index ad5951d..1b1d50f 100644
--- a/cpp/src/arrow/ipc/metadata-internal.cc
+++ b/cpp/src/arrow/ipc/metadata-internal.cc
@@ -17,13 +17,14 @@
 
 #include "arrow/ipc/metadata-internal.h"
 
-#include <flatbuffers/flatbuffers.h>
 #include <cstdint>
 #include <cstring>
 #include <memory>
 #include <sstream>
 #include <string>
 
+#include "flatbuffers/flatbuffers.h"
+
 #include "arrow/ipc/Message_generated.h"
 #include "arrow/schema.h"
 #include "arrow/type.h"

http://git-wip-us.apache.org/repos/asf/arrow/blob/0b472d86/cpp/src/arrow/ipc/metadata-internal.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/metadata-internal.h b/cpp/src/arrow/ipc/metadata-internal.h
index 779c5a3..871b5bc 100644
--- a/cpp/src/arrow/ipc/metadata-internal.h
+++ b/cpp/src/arrow/ipc/metadata-internal.h
@@ -18,11 +18,12 @@
 #ifndef ARROW_IPC_METADATA_INTERNAL_H
 #define ARROW_IPC_METADATA_INTERNAL_H
 
-#include <flatbuffers/flatbuffers.h>
 #include <cstdint>
 #include <memory>
 #include <vector>
 
+#include "flatbuffers/flatbuffers.h"
+
 #include "arrow/ipc/Message_generated.h"
 
 namespace arrow {

http://git-wip-us.apache.org/repos/asf/arrow/blob/0b472d86/cpp/src/arrow/ipc/metadata.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/metadata.cc b/cpp/src/arrow/ipc/metadata.cc
index bcf104f..4fc8ec5 100644
--- a/cpp/src/arrow/ipc/metadata.cc
+++ b/cpp/src/arrow/ipc/metadata.cc
@@ -17,11 +17,12 @@
 
 #include "arrow/ipc/metadata.h"
 
-#include <flatbuffers/flatbuffers.h>
 #include <cstdint>
 #include <memory>
 #include <vector>
 
+#include "flatbuffers/flatbuffers.h"
+
 // Generated C++ flatbuffer IDL
 #include "arrow/ipc/Message_generated.h"
 #include "arrow/ipc/metadata-internal.h"

http://git-wip-us.apache.org/repos/asf/arrow/blob/0b472d86/cpp/src/arrow/ipc/test-common.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/test-common.h b/cpp/src/arrow/ipc/test-common.h
index 65c837d..e7dbb84 100644
--- a/cpp/src/arrow/ipc/test-common.h
+++ b/cpp/src/arrow/ipc/test-common.h
@@ -18,11 +18,19 @@
 #ifndef ARROW_IPC_TEST_COMMON_H
 #define ARROW_IPC_TEST_COMMON_H
 
+#include <algorithm>
 #include <cstdint>
 #include <memory>
 #include <string>
 #include <vector>
 
+#include "arrow/array.h"
+#include "arrow/test-util.h"
+#include "arrow/types/list.h"
+#include "arrow/types/primitive.h"
+#include "arrow/util/buffer.h"
+#include "arrow/util/memory-pool.h"
+
 namespace arrow {
 namespace ipc {
 
@@ -41,10 +49,69 @@ class MemoryMapFixture {
     fclose(file);
   }
 
+  Status InitMemoryMap(
+      int64_t size, const std::string& path, std::shared_ptr<MemoryMappedSource>* mmap) {
+    CreateFile(path, size);
+    return MemoryMappedSource::Open(path, MemorySource::READ_WRITE, mmap);
+  }
+
  private:
   std::vector<std::string> tmp_files_;
 };
 
+Status MakeRandomInt32Array(
+    int32_t length, bool include_nulls, MemoryPool* pool, std::shared_ptr<Array>* array) {
+  std::shared_ptr<PoolBuffer> data;
+  test::MakeRandomInt32PoolBuffer(length, pool, &data);
+  const auto INT32 = std::make_shared<Int32Type>();
+  Int32Builder builder(pool, INT32);
+  if (include_nulls) {
+    std::shared_ptr<PoolBuffer> valid_bytes;
+    test::MakeRandomBytePoolBuffer(length, pool, &valid_bytes);
+    RETURN_NOT_OK(builder.Append(
+        reinterpret_cast<const int32_t*>(data->data()), length, valid_bytes->data()));
+    *array = builder.Finish();
+    return Status::OK();
+  }
+  RETURN_NOT_OK(builder.Append(reinterpret_cast<const int32_t*>(data->data()), length));
+  *array = builder.Finish();
+  return Status::OK();
+}
+
+Status MakeRandomListArray(const std::shared_ptr<Array>& child_array, int num_lists,
+    bool include_nulls, MemoryPool* pool, std::shared_ptr<Array>* array) {
+  // Create the null list values
+  std::vector<uint8_t> valid_lists(num_lists);
+  const double null_percent = include_nulls ? 0.1 : 0;
+  test::random_null_bytes(num_lists, null_percent, valid_lists.data());
+
+  // Create list offsets
+  const int max_list_size = 10;
+
+  std::vector<int32_t> list_sizes(num_lists, 0);
+  std::vector<int32_t> offsets(
+      num_lists + 1, 0);  // +1 so we can shift for nulls. See partial sum below.
+  const int seed = child_array->length();
+  if (num_lists > 0) {
+    test::rand_uniform_int(num_lists, seed, 0, max_list_size, list_sizes.data());
+    // make sure sizes are consistent with null
+    std::transform(list_sizes.begin(), list_sizes.end(), valid_lists.begin(),
+        list_sizes.begin(),
+        [](int32_t size, int32_t valid) { return valid == 0 ? 0 : size; });
+    std::partial_sum(list_sizes.begin(), list_sizes.end(), ++offsets.begin());
+
+    // Force invariants
+    const int child_length = child_array->length();
+    offsets[0] = 0;
+    std::replace_if(offsets.begin(), offsets.end(),
+        [child_length](int32_t offset) { return offset > child_length; }, child_length);
+  }
+  ListBuilder builder(pool, child_array);
+  RETURN_NOT_OK(builder.Append(offsets.data(), num_lists, valid_lists.data()));
+  *array = builder.Finish();
+  return (*array)->Validate();
+}
+
 }  // namespace ipc
 }  // namespace arrow
 

http://git-wip-us.apache.org/repos/asf/arrow/blob/0b472d86/cpp/src/arrow/parquet/schema.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/parquet/schema.cc b/cpp/src/arrow/parquet/schema.cc
index 066388b..560e283 100644
--- a/cpp/src/arrow/parquet/schema.cc
+++ b/cpp/src/arrow/parquet/schema.cc
@@ -21,8 +21,8 @@
 
 #include "parquet/api/schema.h"
 
-#include "arrow/util/status.h"
 #include "arrow/types/decimal.h"
+#include "arrow/util/status.h"
 
 using parquet::schema::Node;
 using parquet::schema::NodePtr;

http://git-wip-us.apache.org/repos/asf/arrow/blob/0b472d86/cpp/src/arrow/schema.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/schema.cc b/cpp/src/arrow/schema.cc
index a38acaa..ff3ea19 100644
--- a/cpp/src/arrow/schema.cc
+++ b/cpp/src/arrow/schema.cc
@@ -18,8 +18,8 @@
 #include "arrow/schema.h"
 
 #include <memory>
-#include <string>
 #include <sstream>
+#include <string>
 #include <vector>
 
 #include "arrow/type.h"

http://git-wip-us.apache.org/repos/asf/arrow/blob/0b472d86/cpp/src/arrow/test-util.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/test-util.h b/cpp/src/arrow/test-util.h
index 538d9b2..2f81161 100644
--- a/cpp/src/arrow/test-util.h
+++ b/cpp/src/arrow/test-util.h
@@ -19,6 +19,7 @@
 #define ARROW_TEST_UTIL_H_
 
 #include <cstdint>
+#include <limits>
 #include <memory>
 #include <random>
 #include <string>
@@ -26,12 +27,13 @@
 
 #include "gtest/gtest.h"
 
-#include "arrow/type.h"
 #include "arrow/column.h"
 #include "arrow/schema.h"
 #include "arrow/table.h"
+#include "arrow/type.h"
 #include "arrow/util/bit-util.h"
 #include "arrow/util/buffer.h"
+#include "arrow/util/logging.h"
 #include "arrow/util/memory-pool.h"
 #include "arrow/util/random.h"
 #include "arrow/util/status.h"
@@ -103,10 +105,12 @@ std::shared_ptr<Buffer> to_buffer(const std::vector<T>& values) {
       reinterpret_cast<const uint8_t*>(values.data()), values.size() * sizeof(T));
 }
 
-void random_null_bitmap(int64_t n, double pct_null, uint8_t* null_bitmap) {
+// Sets approximately pct_null of the first n bytes in null_bytes to zero
+// and the rest to non-zero (true) values.
+void random_null_bytes(int64_t n, double pct_null, uint8_t* null_bytes) {
   Random rng(random_seed());
   for (int i = 0; i < n; ++i) {
-    null_bitmap[i] = rng.NextDoubleFraction() > pct_null;
+    null_bytes[i] = rng.NextDoubleFraction() > pct_null;
   }
 }
 
@@ -121,6 +125,7 @@ static inline void random_bytes(int n, uint32_t seed, uint8_t* out) {
 
 template <typename T>
 void rand_uniform_int(int n, uint32_t seed, T min_value, T max_value, T* out) {
+  DCHECK(out);
   std::mt19937 gen(seed);
   std::uniform_int_distribution<T> d(min_value, max_value);
   for (int i = 0; i < n; ++i) {
@@ -129,11 +134,25 @@ void rand_uniform_int(int n, uint32_t seed, T min_value, T max_value, T* out) {
 }
 
 static inline int bitmap_popcount(const uint8_t* data, int length) {
+  // book keeping
+  constexpr int pop_len = sizeof(uint64_t);
+  const uint64_t* i64_data = reinterpret_cast<const uint64_t*>(data);
+  const int fast_counts = length / pop_len;
+  const uint64_t* end = i64_data + fast_counts;
+
   int count = 0;
-  for (int i = 0; i < length; ++i) {
-    // TODO(wesm): accelerate this
+  // popcount as much as possible with the widest possible count
+  for (auto iter = i64_data; iter < end; ++iter) {
+    count += __builtin_popcountll(*iter);
+  }
+
+  // Account for left over bytes (in theory we could fall back to smaller
+  // versions of popcount but the code complexity is likely not worth it)
+  const int loop_tail_index = fast_counts * pop_len;
+  for (int i = loop_tail_index; i < length; ++i) {
     if (util::get_bit(data, i)) { ++count; }
   }
+
   return count;
 }
 
@@ -153,6 +172,26 @@ std::shared_ptr<Buffer> bytes_to_null_buffer(const std::vector<uint8_t>& bytes)
   return out;
 }
 
+Status MakeRandomInt32PoolBuffer(int32_t length, MemoryPool* pool,
+    std::shared_ptr<PoolBuffer>* pool_buffer, uint32_t seed = 0) {
+  DCHECK(pool);
+  auto data = std::make_shared<PoolBuffer>(pool);
+  RETURN_NOT_OK(data->Resize(length * sizeof(int32_t)));
+  test::rand_uniform_int(length, seed, 0, std::numeric_limits<int32_t>::max(),
+      reinterpret_cast<int32_t*>(data->mutable_data()));
+  *pool_buffer = data;
+  return Status::OK();
+}
+
+Status MakeRandomBytePoolBuffer(int32_t length, MemoryPool* pool,
+    std::shared_ptr<PoolBuffer>* pool_buffer, uint32_t seed = 0) {
+  auto bytes = std::make_shared<PoolBuffer>(pool);
+  RETURN_NOT_OK(bytes->Resize(length));
+  test::random_bytes(length, seed, bytes->mutable_data());
+  *pool_buffer = bytes;
+  return Status::OK();
+}
+
 }  // namespace test
 }  // namespace arrow
 

http://git-wip-us.apache.org/repos/asf/arrow/blob/0b472d86/cpp/src/arrow/type.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/type.h b/cpp/src/arrow/type.h
index 051ab46..77404cd 100644
--- a/cpp/src/arrow/type.h
+++ b/cpp/src/arrow/type.h
@@ -116,7 +116,7 @@ struct DataType {
 
   bool Equals(const DataType* other) {
     // Call with a pointer so more friendly to subclasses
-    return this == other || (this->type == other->type);
+    return other && ((this == other) || (this->type == other->type));
   }
 
   bool Equals(const std::shared_ptr<DataType>& other) { return Equals(other.get()); }

http://git-wip-us.apache.org/repos/asf/arrow/blob/0b472d86/cpp/src/arrow/types/construct.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/types/construct.cc b/cpp/src/arrow/types/construct.cc
index 0a30929..78036d4 100644
--- a/cpp/src/arrow/types/construct.cc
+++ b/cpp/src/arrow/types/construct.cc
@@ -20,8 +20,8 @@
 #include <memory>
 
 #include "arrow/type.h"
-#include "arrow/types/primitive.h"
 #include "arrow/types/list.h"
+#include "arrow/types/primitive.h"
 #include "arrow/types/string.h"
 #include "arrow/util/buffer.h"
 #include "arrow/util/status.h"
@@ -60,11 +60,10 @@ Status MakeBuilder(MemoryPool* pool, const std::shared_ptr<DataType>& type,
 
     case Type::LIST: {
       std::shared_ptr<ArrayBuilder> value_builder;
-
       const std::shared_ptr<DataType>& value_type =
           static_cast<ListType*>(type.get())->value_type();
       RETURN_NOT_OK(MakeBuilder(pool, value_type, &value_builder));
-      out->reset(new ListBuilder(pool, type, value_builder));
+      out->reset(new ListBuilder(pool, value_builder));
       return Status::OK();
     }
     default:
@@ -75,11 +74,11 @@ Status MakeBuilder(MemoryPool* pool, const std::shared_ptr<DataType>& type,
 #define MAKE_PRIMITIVE_ARRAY_CASE(ENUM, ArrayType)                          \
   case Type::ENUM:                                                          \
     out->reset(new ArrayType(type, length, data, null_count, null_bitmap)); \
-    return Status::OK();
+    break;
 
-Status MakePrimitiveArray(const std::shared_ptr<DataType>& type, int32_t length,
+Status MakePrimitiveArray(const TypePtr& type, int32_t length,
     const std::shared_ptr<Buffer>& data, int32_t null_count,
-    const std::shared_ptr<Buffer>& null_bitmap, std::shared_ptr<Array>* out) {
+    const std::shared_ptr<Buffer>& null_bitmap, ArrayPtr* out) {
   switch (type->type) {
     MAKE_PRIMITIVE_ARRAY_CASE(BOOL, BooleanArray);
     MAKE_PRIMITIVE_ARRAY_CASE(UINT8, UInt8Array);
@@ -90,11 +89,43 @@ Status MakePrimitiveArray(const std::shared_ptr<DataType>& type, int32_t length,
     MAKE_PRIMITIVE_ARRAY_CASE(INT32, Int32Array);
     MAKE_PRIMITIVE_ARRAY_CASE(UINT64, UInt64Array);
     MAKE_PRIMITIVE_ARRAY_CASE(INT64, Int64Array);
+    MAKE_PRIMITIVE_ARRAY_CASE(TIME, Int64Array);
+    MAKE_PRIMITIVE_ARRAY_CASE(TIMESTAMP, Int64Array);
     MAKE_PRIMITIVE_ARRAY_CASE(FLOAT, FloatArray);
     MAKE_PRIMITIVE_ARRAY_CASE(DOUBLE, DoubleArray);
+    MAKE_PRIMITIVE_ARRAY_CASE(TIMESTAMP_DOUBLE, DoubleArray);
+    default:
+      return Status::NotImplemented(type->ToString());
+  }
+#ifdef NDEBUG
+  return Status::OK();
+#else
+  return (*out)->Validate();
+#endif
+}
+
+Status MakeListArray(const TypePtr& type, int32_t length,
+    const std::shared_ptr<Buffer>& offsets, const ArrayPtr& values, int32_t null_count,
+    const std::shared_ptr<Buffer>& null_bitmap, ArrayPtr* out) {
+  switch (type->type) {
+    case Type::BINARY:
+    case Type::LIST:
+      out->reset(new ListArray(type, length, offsets, values, null_count, null_bitmap));
+      break;
+    case Type::CHAR:
+    case Type::DECIMAL_TEXT:
+    case Type::STRING:
+    case Type::VARCHAR:
+      out->reset(new StringArray(type, length, offsets, values, null_count, null_bitmap));
+      break;
     default:
       return Status::NotImplemented(type->ToString());
   }
+#ifdef NDEBUG
+  return Status::OK();
+#else
+  return (*out)->Validate();
+#endif
 }
 
 }  // namespace arrow

http://git-wip-us.apache.org/repos/asf/arrow/blob/0b472d86/cpp/src/arrow/types/construct.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/types/construct.h b/cpp/src/arrow/types/construct.h
index 27fb7bd..43c0018 100644
--- a/cpp/src/arrow/types/construct.h
+++ b/cpp/src/arrow/types/construct.h
@@ -33,10 +33,19 @@ class Status;
 Status MakeBuilder(MemoryPool* pool, const std::shared_ptr<DataType>& type,
     std::shared_ptr<ArrayBuilder>* out);
 
+// Create new arrays for logical types that are backed by primitive arrays.
 Status MakePrimitiveArray(const std::shared_ptr<DataType>& type, int32_t length,
     const std::shared_ptr<Buffer>& data, int32_t null_count,
     const std::shared_ptr<Buffer>& null_bitmap, std::shared_ptr<Array>* out);
 
+// Create new list arrays for logical types that are backed by ListArrays (e.g. list of
+// primitives and strings)
+// TODO(emkornfield) split up string vs list?
+Status MakeListArray(const std::shared_ptr<DataType>& type, int32_t length,
+    const std::shared_ptr<Buffer>& offests, const std::shared_ptr<Array>& values,
+    int32_t null_count, const std::shared_ptr<Buffer>& null_bitmap,
+    std::shared_ptr<Array>* out);
+
 }  // namespace arrow
 
 #endif  // ARROW_BUILDER_H_

http://git-wip-us.apache.org/repos/asf/arrow/blob/0b472d86/cpp/src/arrow/types/list-test.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/types/list-test.cc b/cpp/src/arrow/types/list-test.cc
index aa34f23..6a8ad9a 100644
--- a/cpp/src/arrow/types/list-test.cc
+++ b/cpp/src/arrow/types/list-test.cc
@@ -15,8 +15,8 @@
 // specific language governing permissions and limitations
 // under the License.
 
-#include <cstdlib>
 #include <cstdint>
+#include <cstdlib>
 #include <memory>
 #include <string>
 #include <vector>
@@ -94,6 +94,7 @@ TEST_F(TestListBuilder, TestAppendNull) {
 
   Done();
 
+  ASSERT_OK(result_->Validate());
   ASSERT_TRUE(result_->IsNull(0));
   ASSERT_TRUE(result_->IsNull(1));
 
@@ -105,50 +106,93 @@ TEST_F(TestListBuilder, TestAppendNull) {
   ASSERT_EQ(0, values->length());
 }
 
+void ValidateBasicListArray(const ListArray* result, const vector<int32_t>& values,
+    const vector<uint8_t>& is_valid) {
+  ASSERT_OK(result->Validate());
+  ASSERT_EQ(1, result->null_count());
+  ASSERT_EQ(0, result->values()->null_count());
+
+  ASSERT_EQ(3, result->length());
+  vector<int32_t> ex_offsets = {0, 3, 3, 7};
+  for (size_t i = 0; i < ex_offsets.size(); ++i) {
+    ASSERT_EQ(ex_offsets[i], result->offset(i));
+  }
+
+  for (int i = 0; i < result->length(); ++i) {
+    ASSERT_EQ(!static_cast<bool>(is_valid[i]), result->IsNull(i));
+  }
+
+  ASSERT_EQ(7, result->values()->length());
+  Int32Array* varr = static_cast<Int32Array*>(result->values().get());
+
+  for (size_t i = 0; i < values.size(); ++i) {
+    ASSERT_EQ(values[i], varr->Value(i));
+  }
+}
+
 TEST_F(TestListBuilder, TestBasics) {
   vector<int32_t> values = {0, 1, 2, 3, 4, 5, 6};
   vector<int> lengths = {3, 0, 4};
-  vector<uint8_t> is_null = {0, 1, 0};
+  vector<uint8_t> is_valid = {1, 0, 1};
 
   Int32Builder* vb = static_cast<Int32Builder*>(builder_->value_builder().get());
 
-  EXPECT_OK(builder_->Reserve(lengths.size()));
-  EXPECT_OK(vb->Reserve(values.size()));
+  ASSERT_OK(builder_->Reserve(lengths.size()));
+  ASSERT_OK(vb->Reserve(values.size()));
 
   int pos = 0;
   for (size_t i = 0; i < lengths.size(); ++i) {
-    ASSERT_OK(builder_->Append(is_null[i] > 0));
+    ASSERT_OK(builder_->Append(is_valid[i] > 0));
     for (int j = 0; j < lengths[i]; ++j) {
       vb->Append(values[pos++]);
     }
   }
 
   Done();
+  ValidateBasicListArray(result_.get(), values, is_valid);
+}
 
-  ASSERT_EQ(1, result_->null_count());
-  ASSERT_EQ(0, result_->values()->null_count());
+TEST_F(TestListBuilder, BulkAppend) {
+  vector<int32_t> values = {0, 1, 2, 3, 4, 5, 6};
+  vector<int> lengths = {3, 0, 4};
+  vector<uint8_t> is_valid = {1, 0, 1};
+  vector<int32_t> offsets = {0, 3, 3};
 
-  ASSERT_EQ(3, result_->length());
-  vector<int32_t> ex_offsets = {0, 3, 3, 7};
-  for (size_t i = 0; i < ex_offsets.size(); ++i) {
-    ASSERT_EQ(ex_offsets[i], result_->offset(i));
-  }
+  Int32Builder* vb = static_cast<Int32Builder*>(builder_->value_builder().get());
+  ASSERT_OK(vb->Reserve(values.size()));
 
-  for (int i = 0; i < result_->length(); ++i) {
-    ASSERT_EQ(static_cast<bool>(is_null[i]), result_->IsNull(i));
+  builder_->Append(offsets.data(), offsets.size(), is_valid.data());
+  for (int32_t value : values) {
+    vb->Append(value);
   }
+  Done();
+  ValidateBasicListArray(result_.get(), values, is_valid);
+}
 
-  ASSERT_EQ(7, result_->values()->length());
-  Int32Array* varr = static_cast<Int32Array*>(result_->values().get());
+TEST_F(TestListBuilder, BulkAppendInvalid) {
+  vector<int32_t> values = {0, 1, 2, 3, 4, 5, 6};
+  vector<int> lengths = {3, 0, 4};
+  vector<uint8_t> is_null = {0, 1, 0};
+  vector<uint8_t> is_valid = {1, 0, 1};
+  vector<int32_t> offsets = {0, 2, 4};  // should be 0, 3, 3 given the is_null array
 
-  for (size_t i = 0; i < values.size(); ++i) {
-    ASSERT_EQ(values[i], varr->Value(i));
+  Int32Builder* vb = static_cast<Int32Builder*>(builder_->value_builder().get());
+  ASSERT_OK(vb->Reserve(values.size()));
+
+  builder_->Append(offsets.data(), offsets.size(), is_valid.data());
+  builder_->Append(offsets.data(), offsets.size(), is_valid.data());
+  for (int32_t value : values) {
+    vb->Append(value);
   }
+
+  Done();
+  ASSERT_RAISES(Invalid, result_->Validate());
 }
 
 TEST_F(TestListBuilder, TestZeroLength) {
   // All buffers are null
   Done();
+  ASSERT_OK(result_->Validate());
 }
 
 }  // namespace arrow

http://git-wip-us.apache.org/repos/asf/arrow/blob/0b472d86/cpp/src/arrow/types/list.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/types/list.cc b/cpp/src/arrow/types/list.cc
index 23f12dd..fc33311 100644
--- a/cpp/src/arrow/types/list.cc
+++ b/cpp/src/arrow/types/list.cc
@@ -14,23 +14,26 @@
 // KIND, either express or implied.  See the License for the
 // specific language governing permissions and limitations
 // under the License.
-
 #include "arrow/types/list.h"
 
+#include <sstream>
+
 namespace arrow {
 
 bool ListArray::EqualsExact(const ListArray& other) const {
   if (this == &other) { return true; }
   if (null_count_ != other.null_count_) { return false; }
 
-  bool equal_offsets = offset_buf_->Equals(*other.offset_buf_, length_ + 1);
+  bool equal_offsets =
+      offset_buf_->Equals(*other.offset_buf_, (length_ + 1) * sizeof(int32_t));
+  if (!equal_offsets) { return false; }
   bool equal_null_bitmap = true;
   if (null_count_ > 0) {
     equal_null_bitmap =
         null_bitmap_->Equals(*other.null_bitmap_, util::bytes_for_bits(length_));
   }
 
-  if (!(equal_offsets && equal_null_bitmap)) { return false; }
+  if (!equal_null_bitmap) { return false; }
 
   return values()->Equals(other.values());
 }
@@ -41,4 +44,55 @@ bool ListArray::Equals(const std::shared_ptr<Array>& arr) const {
   return EqualsExact(*static_cast<const ListArray*>(arr.get()));
 }
 
+Status ListArray::Validate() const {
+  if (length_ < 0) { return Status::Invalid("Length was negative"); }
+  if (!offset_buf_) { return Status::Invalid("offset_buf_ was null"); }
+  if (offset_buf_->size() / sizeof(int32_t) < length_) {
+    std::stringstream ss;
+    ss << "offset buffer size (bytes): " << offset_buf_->size()
+       << " isn't large enough for length: " << length_;
+    return Status::Invalid(ss.str());
+  }
+  const int32_t last_offset = offset(length_);
+  if (last_offset > 0) {
+    if (!values_) {
+      return Status::Invalid("last offset was non-zero and values was null");
+    }
+    if (values_->length() != last_offset) {
+      std::stringstream ss;
+      ss << "Final offset invariant not equal to values length: " << last_offset
+         << "!=" << values_->length();
+      return Status::Invalid(ss.str());
+    }
+
+    const Status child_valid = values_->Validate();
+    if (!child_valid.ok()) {
+      std::stringstream ss;
+      ss << "Child array invalid: " << child_valid.ToString();
+      return Status::Invalid(ss.str());
+    }
+  }
+
+  int32_t prev_offset = offset(0);
+  if (prev_offset != 0) { return Status::Invalid("The first offset wasn't zero"); }
+  for (int32_t i = 1; i <= length_; ++i) {
+    int32_t current_offset = offset(i);
+    if (IsNull(i - 1) && current_offset != prev_offset) {
+      std::stringstream ss;
+      ss << "Offset invariant failure at: " << i << " inconsistent offsets for null slot"
+         << current_offset << "!=" << prev_offset;
+      return Status::Invalid(ss.str());
+    }
+    if (current_offset < prev_offset) {
+      std::stringstream ss;
+      ss << "Offset invariant failure: " << i
+         << " inconsistent offset for non-null slot: " << current_offset << "<"
+         << prev_offset;
+      return Status::Invalid(ss.str());
+    }
+    prev_offset = current_offset;
+  }
+  return Status::OK();
+}
+
 }  // namespace arrow

http://git-wip-us.apache.org/repos/asf/arrow/blob/0b472d86/cpp/src/arrow/types/list.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/types/list.h b/cpp/src/arrow/types/list.h
index 6b81546..e2302d9 100644
--- a/cpp/src/arrow/types/list.h
+++ b/cpp/src/arrow/types/list.h
@@ -28,6 +28,7 @@
 #include "arrow/types/primitive.h"
 #include "arrow/util/bit-util.h"
 #include "arrow/util/buffer.h"
+#include "arrow/util/logging.h"
 #include "arrow/util/status.h"
 
 namespace arrow {
@@ -46,11 +47,16 @@ class ListArray : public Array {
     values_ = values;
   }
 
-  virtual ~ListArray() {}
+  Status Validate() const override;
+
+  virtual ~ListArray() = default;
 
   // Return a shared pointer in case the requestor desires to share ownership
   // with this array.
   const std::shared_ptr<Array>& values() const { return values_; }
+  const std::shared_ptr<Buffer> offset_buffer() const {
+    return std::static_pointer_cast<Buffer>(offset_buf_);
+  }
 
   const std::shared_ptr<DataType>& value_type() const { return values_->type(); }
 
@@ -78,59 +84,73 @@ class ListArray : public Array {
 //
 // To use this class, you must append values to the child array builder and use
 // the Append function to delimit each distinct list value (once the values
-// have been appended to the child array)
-class ListBuilder : public Int32Builder {
+// have been appended to the child array) or use the bulk API to append
+// a sequence of offests and null values.
+//
+// A note on types.  Per arrow/type.h all types in the c++ implementation are
+// logical so even though this class always builds an Array of lists, this can
+// represent multiple different logical types.  If no logical type is provided
+// at construction time, the class defaults to List<T> where t is take from the
+// value_builder/values that the object is constructed with.
+class ListBuilder : public ArrayBuilder {
  public:
+  // Use this constructor to incrementally build the value array along with offsets and
+  // null bitmap.
+  ListBuilder(MemoryPool* pool, std::shared_ptr<ArrayBuilder> value_builder,
+      const TypePtr& type = nullptr)
+      : ArrayBuilder(
+            pool, type ? type : std::static_pointer_cast<DataType>(
+                                    std::make_shared<ListType>(value_builder->type()))),
+        offset_builder_(pool),
+        value_builder_(value_builder) {}
+
+  // Use this constructor to build the list with a pre-existing values array
   ListBuilder(
-      MemoryPool* pool, const TypePtr& type, std::shared_ptr<ArrayBuilder> value_builder)
-      : Int32Builder(pool, type), value_builder_(value_builder) {}
-
-  Status Init(int32_t elements) {
-    // One more than requested.
-    //
-    // XXX: This is slightly imprecise, because we might trigger null mask
-    // resizes that are unnecessary when creating arrays with power-of-two size
-    return Int32Builder::Init(elements + 1);
+      MemoryPool* pool, std::shared_ptr<Array> values, const TypePtr& type = nullptr)
+      : ArrayBuilder(pool, type ? type : std::static_pointer_cast<DataType>(
+                                             std::make_shared<ListType>(values->type()))),
+        offset_builder_(pool),
+        values_(values) {}
+
+  Status Init(int32_t elements) override {
+    RETURN_NOT_OK(ArrayBuilder::Init(elements));
+    // one more then requested for offsets
+    return offset_builder_.Resize((elements + 1) * sizeof(int32_t));
   }
 
-  Status Resize(int32_t capacity) {
-    // Need space for the end offset
-    RETURN_NOT_OK(Int32Builder::Resize(capacity + 1));
-
-    // Slight hack, as the "real" capacity is one less
-    --capacity_;
-    return Status::OK();
+  Status Resize(int32_t capacity) override {
+    // one more then requested for offsets
+    RETURN_NOT_OK(offset_builder_.Resize((capacity + 1) * sizeof(int32_t)));
+    return ArrayBuilder::Resize(capacity);
   }
 
   // Vector append
   //
   // If passed, valid_bytes is of equal length to values, and any zero byte
   // will be considered as a null for that slot
-  Status Append(value_type* values, int32_t length, uint8_t* valid_bytes = nullptr) {
-    if (length_ + length > capacity_) {
-      int32_t new_capacity = util::next_power2(length_ + length);
-      RETURN_NOT_OK(Resize(new_capacity));
-    }
-    memcpy(raw_data_ + length_, values, type_traits<Int32Type>::bytes_required(length));
-
-    if (valid_bytes != nullptr) { AppendNulls(valid_bytes, length); }
-
-    length_ += length;
+  Status Append(
+      const int32_t* offsets, int32_t length, const uint8_t* valid_bytes = nullptr) {
+    RETURN_NOT_OK(Reserve(length));
+    UnsafeAppendToBitmap(valid_bytes, length);
+    offset_builder_.UnsafeAppend<int32_t>(offsets, length);
     return Status::OK();
   }
 
+  // The same as Finalize but allows for overridding the c++ type
   template <typename Container>
   std::shared_ptr<Array> Transfer() {
-    std::shared_ptr<Array> items = value_builder_->Finish();
+    std::shared_ptr<Array> items = values_;
+    if (!items) { items = value_builder_->Finish(); }
 
-    // Add final offset if the length is non-zero
-    if (length_) { raw_data_[length_] = items->length(); }
+    offset_builder_.Append<int32_t>(items->length());
 
+    const auto offsets_buffer = offset_builder_.Finish();
     auto result = std::make_shared<Container>(
-        type_, length_, data_, items, null_count_, null_bitmap_);
+        type_, length_, offsets_buffer, items, null_count_, null_bitmap_);
 
-    data_ = null_bitmap_ = nullptr;
+    // TODO(emkornfield) make a reset method
     capacity_ = length_ = null_count_ = 0;
+    null_bitmap_ = nullptr;
 
     return result;
   }
@@ -141,26 +161,24 @@ class ListBuilder : public Int32Builder {
   //
   // This function should be called before beginning to append elements to the
   // value builder
-  Status Append(bool is_null = false) {
-    if (length_ == capacity_) {
-      // If the capacity was not already a multiple of 2, do so here
-      RETURN_NOT_OK(Resize(util::next_power2(capacity_ + 1)));
-    }
-    if (is_null) {
-      ++null_count_;
-    } else {
-      util::set_bit(null_bitmap_data_, length_);
-    }
-    raw_data_[length_++] = value_builder_->length();
+  Status Append(bool is_valid = true) {
+    RETURN_NOT_OK(Reserve(1));
+    UnsafeAppendToBitmap(is_valid);
+    RETURN_NOT_OK(offset_builder_.Append<int32_t>(value_builder_->length()));
     return Status::OK();
   }
 
-  Status AppendNull() { return Append(true); }
+  Status AppendNull() { return Append(false); }
 
-  const std::shared_ptr<ArrayBuilder>& value_builder() const { return value_builder_; }
+  const std::shared_ptr<ArrayBuilder>& value_builder() const {
+    DCHECK(!values_) << "Using value builder is pointless when values_ is set";
+    return value_builder_;
+  }
 
  protected:
+  BufferBuilder offset_builder_;
   std::shared_ptr<ArrayBuilder> value_builder_;
+  std::shared_ptr<Array> values_;
 };
 
 }  // namespace arrow

http://git-wip-us.apache.org/repos/asf/arrow/blob/0b472d86/cpp/src/arrow/types/primitive-test.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/types/primitive-test.cc b/cpp/src/arrow/types/primitive-test.cc
index 6bd9e73..2b4c087 100644
--- a/cpp/src/arrow/types/primitive-test.cc
+++ b/cpp/src/arrow/types/primitive-test.cc
@@ -102,7 +102,7 @@ class TestPrimitiveBuilder : public TestBuilder {
     Attrs::draw(N, &draws_);
 
     valid_bytes_.resize(N);
-    test::random_null_bitmap(N, pct_null, valid_bytes_.data());
+    test::random_null_bytes(N, pct_null, valid_bytes_.data());
   }
 
   void Check(const std::shared_ptr<BuilderType>& builder, bool nullable) {
@@ -193,8 +193,8 @@ void TestPrimitiveBuilder<PBoolean>::RandomData(int N, double pct_null) {
   draws_.resize(N);
   valid_bytes_.resize(N);
 
-  test::random_null_bitmap(N, 0.5, draws_.data());
-  test::random_null_bitmap(N, pct_null, valid_bytes_.data());
+  test::random_null_bytes(N, 0.5, draws_.data());
+  test::random_null_bytes(N, pct_null, valid_bytes_.data());
 }
 
 template <>

http://git-wip-us.apache.org/repos/asf/arrow/blob/0b472d86/cpp/src/arrow/types/primitive.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/types/primitive.cc b/cpp/src/arrow/types/primitive.cc
index 9549c47..9102c53 100644
--- a/cpp/src/arrow/types/primitive.cc
+++ b/cpp/src/arrow/types/primitive.cc
@@ -57,12 +57,14 @@ bool PrimitiveArray::EqualsExact(const PrimitiveArray& other) const {
     }
     return true;
   } else {
+    if (length_ == 0 && other.length_ == 0) { return true; }
     return data_->Equals(*other.data_, length_);
   }
 }
 
 bool PrimitiveArray::Equals(const std::shared_ptr<Array>& arr) const {
   if (this == arr.get()) { return true; }
+  if (!arr) { return false; }
   if (this->type_enum() != arr->type_enum()) { return false; }
   return EqualsExact(*static_cast<const PrimitiveArray*>(arr.get()));
 }
@@ -102,48 +104,21 @@ Status PrimitiveBuilder<T>::Resize(int32_t capacity) {
 }
 
 template <typename T>
-Status PrimitiveBuilder<T>::Reserve(int32_t elements) {
-  if (length_ + elements > capacity_) {
-    int32_t new_capacity = util::next_power2(length_ + elements);
-    return Resize(new_capacity);
-  }
-  return Status::OK();
-}
-
-template <typename T>
 Status PrimitiveBuilder<T>::Append(
     const value_type* values, int32_t length, const uint8_t* valid_bytes) {
-  RETURN_NOT_OK(PrimitiveBuilder<T>::Reserve(length));
+  RETURN_NOT_OK(Reserve(length));
 
   if (length > 0) {
     memcpy(raw_data_ + length_, values, type_traits<T>::bytes_required(length));
   }
 
-  if (valid_bytes != nullptr) {
-    PrimitiveBuilder<T>::AppendNulls(valid_bytes, length);
-  } else {
-    for (int i = 0; i < length; ++i) {
-      util::set_bit(null_bitmap_data_, length_ + i);
-    }
-  }
+  // length_ is update by these
+  ArrayBuilder::UnsafeAppendToBitmap(valid_bytes, length);
 
-  length_ += length;
   return Status::OK();
 }
 
 template <typename T>
-void PrimitiveBuilder<T>::AppendNulls(const uint8_t* valid_bytes, int32_t length) {
-  // If valid_bytes is all not null, then none of the values are null
-  for (int i = 0; i < length; ++i) {
-    if (valid_bytes[i] == 0) {
-      ++null_count_;
-    } else {
-      util::set_bit(null_bitmap_data_, length_ + i);
-    }
-  }
-}
-
-template <typename T>
 std::shared_ptr<Array> PrimitiveBuilder<T>::Finish() {
   std::shared_ptr<Array> result = std::make_shared<typename type_traits<T>::ArrayType>(
       type_, length_, data_, null_count_, null_bitmap_);
@@ -166,14 +141,8 @@ Status PrimitiveBuilder<BooleanType>::Append(
     }
   }
 
-  if (valid_bytes != nullptr) {
-    PrimitiveBuilder<BooleanType>::AppendNulls(valid_bytes, length);
-  } else {
-    for (int i = 0; i < length; ++i) {
-      util::set_bit(null_bitmap_data_, length_ + i);
-    }
-  }
-  length_ += length;
+  // this updates length_
+  ArrayBuilder::UnsafeAppendToBitmap(valid_bytes, length);
   return Status::OK();
 }
 

http://git-wip-us.apache.org/repos/asf/arrow/blob/0b472d86/cpp/src/arrow/types/primitive.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/types/primitive.h b/cpp/src/arrow/types/primitive.h
index fcd3db4..6f6b2fe 100644
--- a/cpp/src/arrow/types/primitive.h
+++ b/cpp/src/arrow/types/primitive.h
@@ -95,15 +95,13 @@ class PrimitiveBuilder : public ArrayBuilder {
   using ArrayBuilder::Advance;
 
   // Write nulls as uint8_t* (0 value indicates null) into pre-allocated memory
-  void AppendNulls(const uint8_t* valid_bytes, int32_t length);
+  void AppendNulls(const uint8_t* valid_bytes, int32_t length) {
+    UnsafeAppendToBitmap(valid_bytes, length);
+  }
 
   Status AppendNull() {
-    if (length_ == capacity_) {
-      // If the capacity was not already a multiple of 2, do so here
-      RETURN_NOT_OK(Resize(util::next_power2(capacity_ + 1)));
-    }
-    ++null_count_;
-    ++length_;
+    RETURN_NOT_OK(Reserve(1));
+    UnsafeAppendToBitmap(false);
     return Status::OK();
   }
 
@@ -116,21 +114,17 @@ class PrimitiveBuilder : public ArrayBuilder {
   Status Append(
       const value_type* values, int32_t length, const uint8_t* valid_bytes = nullptr);
 
-  // Ensure that builder can accommodate an additional number of
-  // elements. Resizes if the current capacity is not sufficient
-  Status Reserve(int32_t elements);
-
   std::shared_ptr<Array> Finish() override;
 
- protected:
-  std::shared_ptr<PoolBuffer> data_;
-  value_type* raw_data_;
-
-  Status Init(int32_t capacity);
+  Status Init(int32_t capacity) override;
 
   // Increase the capacity of the builder to accommodate at least the indicated
   // number of elements
-  Status Resize(int32_t capacity);
+  Status Resize(int32_t capacity) override;
+
+ protected:
+  std::shared_ptr<PoolBuffer> data_;
+  value_type* raw_data_;
 };
 
 template <typename T>
@@ -140,9 +134,17 @@ class NumericBuilder : public PrimitiveBuilder<T> {
   using PrimitiveBuilder<T>::PrimitiveBuilder;
 
   using PrimitiveBuilder<T>::Append;
+  using PrimitiveBuilder<T>::Init;
+  using PrimitiveBuilder<T>::Resize;
 
-  // Scalar append. Does not capacity-check; make sure to call Reserve beforehand
+  // Scalar append.
   void Append(value_type val) {
+    ArrayBuilder::Reserve(1);
+    UnsafeAppend(val);
+  }
+
+  // Does not capacity-check; make sure to call Reserve beforehand
+  void UnsafeAppend(value_type val) {
     util::set_bit(null_bitmap_data_, length_);
     raw_data_[length_++] = val;
   }
@@ -151,9 +153,6 @@ class NumericBuilder : public PrimitiveBuilder<T> {
   using PrimitiveBuilder<T>::length_;
   using PrimitiveBuilder<T>::null_bitmap_data_;
   using PrimitiveBuilder<T>::raw_data_;
-
-  using PrimitiveBuilder<T>::Init;
-  using PrimitiveBuilder<T>::Resize;
 };
 
 template <>

http://git-wip-us.apache.org/repos/asf/arrow/blob/0b472d86/cpp/src/arrow/types/string.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/types/string.h b/cpp/src/arrow/types/string.h
index c5cbe10..d2d3c5b 100644
--- a/cpp/src/arrow/types/string.h
+++ b/cpp/src/arrow/types/string.h
@@ -89,11 +89,11 @@ class StringArray : public ListArray {
   const uint8_t* raw_bytes_;
 };
 
-// Array builder
+// String builder
 class StringBuilder : public ListBuilder {
  public:
   explicit StringBuilder(MemoryPool* pool, const TypePtr& type)
-      : ListBuilder(pool, type, std::make_shared<UInt8Builder>(pool, value_type_)) {
+      : ListBuilder(pool, std::make_shared<UInt8Builder>(pool, value_type_), type) {
     byte_builder_ = static_cast<UInt8Builder*>(value_builder_.get());
   }
 
@@ -110,7 +110,6 @@ class StringBuilder : public ListBuilder {
   }
 
  protected:
-  std::shared_ptr<ListBuilder> list_builder_;
   UInt8Builder* byte_builder_;
 
   static TypePtr value_type_;

http://git-wip-us.apache.org/repos/asf/arrow/blob/0b472d86/cpp/src/arrow/util/buffer.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/util/buffer.h b/cpp/src/arrow/util/buffer.h
index 56532be..5ef0076 100644
--- a/cpp/src/arrow/util/buffer.h
+++ b/cpp/src/arrow/util/buffer.h
@@ -23,6 +23,7 @@
 #include <cstring>
 #include <memory>
 
+#include "arrow/util/bit-util.h"
 #include "arrow/util/macros.h"
 #include "arrow/util/status.h"
 
@@ -137,26 +138,64 @@ class BufferBuilder {
  public:
   explicit BufferBuilder(MemoryPool* pool) : pool_(pool), capacity_(0), size_(0) {}
 
+  Status Resize(int32_t elements) {
+    if (capacity_ == 0) { buffer_ = std::make_shared<PoolBuffer>(pool_); }
+    capacity_ = elements;
+    RETURN_NOT_OK(buffer_->Resize(capacity_));
+    data_ = buffer_->mutable_data();
+    return Status::OK();
+  }
+
   Status Append(const uint8_t* data, int length) {
-    if (capacity_ < length + size_) {
-      if (capacity_ == 0) { buffer_ = std::make_shared<PoolBuffer>(pool_); }
-      capacity_ = std::max(MIN_BUFFER_CAPACITY, capacity_);
-      while (capacity_ < length + size_) {
-        capacity_ *= 2;
-      }
-      RETURN_NOT_OK(buffer_->Resize(capacity_));
-      data_ = buffer_->mutable_data();
-    }
+    if (capacity_ < length + size_) { RETURN_NOT_OK(Resize(length + size_)); }
+    UnsafeAppend(data, length);
+    return Status::OK();
+  }
+
+  template <typename T>
+  Status Append(T arithmetic_value) {
+    static_assert(std::is_arithmetic<T>::value,
+        "Convenience buffer append only supports arithmetic types");
+    return Append(reinterpret_cast<uint8_t*>(&arithmetic_value), sizeof(T));
+  }
+
+  template <typename T>
+  Status Append(const T* arithmetic_values, int num_elements) {
+    static_assert(std::is_arithmetic<T>::value,
+        "Convenience buffer append only supports arithmetic types");
+    return Append(
+        reinterpret_cast<const uint8_t*>(arithmetic_values), num_elements * sizeof(T));
+  }
+
+  // Unsafe methods don't check existing size
+  void UnsafeAppend(const uint8_t* data, int length) {
     memcpy(data_ + size_, data, length);
     size_ += length;
-    return Status::OK();
+  }
+
+  template <typename T>
+  void UnsafeAppend(T arithmetic_value) {
+    static_assert(std::is_arithmetic<T>::value,
+        "Convenience buffer append only supports arithmetic types");
+    UnsafeAppend(reinterpret_cast<uint8_t*>(&arithmetic_value), sizeof(T));
+  }
+
+  template <typename T>
+  void UnsafeAppend(const T* arithmetic_values, int num_elements) {
+    static_assert(std::is_arithmetic<T>::value,
+        "Convenience buffer append only supports arithmetic types");
+    UnsafeAppend(
+        reinterpret_cast<const uint8_t*>(arithmetic_values), num_elements * sizeof(T));
   }
 
   std::shared_ptr<Buffer> Finish() {
     auto result = buffer_;
     buffer_ = nullptr;
+    capacity_ = size_ = 0;
     return result;
   }
+  int capacity() { return capacity_; }
+  int length() { return size_; }
 
  private:
   std::shared_ptr<PoolBuffer> buffer_;

http://git-wip-us.apache.org/repos/asf/arrow/blob/0b472d86/cpp/src/arrow/util/logging.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/util/logging.h b/cpp/src/arrow/util/logging.h
index 527ce42..fccc5e3 100644
--- a/cpp/src/arrow/util/logging.h
+++ b/cpp/src/arrow/util/logging.h
@@ -18,8 +18,8 @@
 #ifndef ARROW_UTIL_LOGGING_H
 #define ARROW_UTIL_LOGGING_H
 
-#include <iostream>
 #include <cstdlib>
+#include <iostream>
 
 namespace arrow {
 

http://git-wip-us.apache.org/repos/asf/arrow/blob/0b472d86/cpp/src/arrow/util/memory-pool.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/util/memory-pool.cc b/cpp/src/arrow/util/memory-pool.cc
index fb417e7..961554f 100644
--- a/cpp/src/arrow/util/memory-pool.cc
+++ b/cpp/src/arrow/util/memory-pool.cc
@@ -18,8 +18,8 @@
 #include "arrow/util/memory-pool.h"
 
 #include <cstdlib>
-#include <sstream>
 #include <mutex>
+#include <sstream>
 
 #include "arrow/util/status.h"
 

http://git-wip-us.apache.org/repos/asf/arrow/blob/0b472d86/python/pyarrow/tests/test_array.py
----------------------------------------------------------------------
diff --git a/python/pyarrow/tests/test_array.py b/python/pyarrow/tests/test_array.py
index d608f81..bf5a220 100644
--- a/python/pyarrow/tests/test_array.py
+++ b/python/pyarrow/tests/test_array.py
@@ -31,14 +31,15 @@ class TestArrayAPI(unittest.TestCase):
         assert arr[1] is pyarrow.NA
 
     def test_list_format(self):
-        arr = pyarrow.from_pylist([[1], None, [2, 3]])
+        arr = pyarrow.from_pylist([[1], None, [2, 3, None]])
         result = fmt.array_format(arr)
         expected = """\
 [
   [1],
   NA,
   [2,
-   3]
+   3,
+   NA]
 ]"""
         assert result == expected