You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2017/03/09 19:00:59 UTC

arrow git commit: ARROW-605: [C++] Refactor IPC adapter code into generic ArrayLoader class. Add Date32Type

Repository: arrow
Updated Branches:
  refs/heads/master b109a246f -> 6b3ae2aec


ARROW-605: [C++] Refactor IPC adapter code into generic ArrayLoader class. Add Date32Type

These are various changes introduced to support the Feather merge in ARROW-452 #361

Author: Wes McKinney <we...@twosigma.com>

Closes #365 from wesm/array-loader and squashes the following commits:

bc22872 [Wes McKinney] Revert Array::type_id to type_enum since Parquet uses this API
344e6b1 [Wes McKinney] fix compiler warning
997b7a2 [Wes McKinney] Refactor IPC adapter code into generic ArrayLoader class. Add Date32Type


Project: http://git-wip-us.apache.org/repos/asf/arrow/repo
Commit: http://git-wip-us.apache.org/repos/asf/arrow/commit/6b3ae2ae
Tree: http://git-wip-us.apache.org/repos/asf/arrow/tree/6b3ae2ae
Diff: http://git-wip-us.apache.org/repos/asf/arrow/diff/6b3ae2ae

Branch: refs/heads/master
Commit: 6b3ae2aecc8cd31425035a021fa04b9ed3385a8d
Parents: b109a24
Author: Wes McKinney <we...@twosigma.com>
Authored: Thu Mar 9 14:00:48 2017 -0500
Committer: Wes McKinney <we...@twosigma.com>
Committed: Thu Mar 9 14:00:48 2017 -0500

----------------------------------------------------------------------
 cpp/CMakeLists.txt                          |   1 +
 cpp/src/arrow/CMakeLists.txt                |   5 +-
 cpp/src/arrow/array.cc                      |  52 +----
 cpp/src/arrow/array.h                       |  17 +-
 cpp/src/arrow/builder.cc                    |   1 +
 cpp/src/arrow/builder.h                     |   1 +
 cpp/src/arrow/column.cc                     |   3 +
 cpp/src/arrow/column.h                      |   3 +
 cpp/src/arrow/compare.cc                    |   8 +-
 cpp/src/arrow/io/memory.cc                  |  19 +-
 cpp/src/arrow/io/memory.h                   |   6 +
 cpp/src/arrow/ipc/adapter.cc                | 252 ++------------------
 cpp/src/arrow/ipc/adapter.h                 |   8 +-
 cpp/src/arrow/ipc/metadata.cc               |   1 +
 cpp/src/arrow/ipc/metadata.h                |   7 +-
 cpp/src/arrow/loader.cc                     | 285 +++++++++++++++++++++++
 cpp/src/arrow/loader.h                      |  89 +++++++
 cpp/src/arrow/pretty_print.cc               |   6 +-
 cpp/src/arrow/type.cc                       |  22 +-
 cpp/src/arrow/type.h                        |  38 ++-
 cpp/src/arrow/type_fwd.h                    |   5 +
 cpp/src/arrow/type_traits.h                 |  12 +
 python/pyarrow/array.pyx                    |  31 ++-
 python/pyarrow/table.pyx                    |   3 +-
 python/pyarrow/tests/test_convert_pandas.py |   6 +-
 python/src/pyarrow/adapters/pandas.cc       |  17 +-
 26 files changed, 558 insertions(+), 340 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/arrow/blob/6b3ae2ae/cpp/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt
index 22c6e9a..294c439 100644
--- a/cpp/CMakeLists.txt
+++ b/cpp/CMakeLists.txt
@@ -799,6 +799,7 @@ set(ARROW_SRCS
   src/arrow/builder.cc
   src/arrow/column.cc
   src/arrow/compare.cc
+  src/arrow/loader.cc
   src/arrow/memory_pool.cc
   src/arrow/pretty_print.cc
   src/arrow/schema.cc

http://git-wip-us.apache.org/repos/asf/arrow/blob/6b3ae2ae/cpp/src/arrow/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/CMakeLists.txt b/cpp/src/arrow/CMakeLists.txt
index d1efa02..ddeb81c 100644
--- a/cpp/src/arrow/CMakeLists.txt
+++ b/cpp/src/arrow/CMakeLists.txt
@@ -19,10 +19,11 @@
 install(FILES
   api.h
   array.h
-  column.h
-  compare.h
   buffer.h
   builder.h
+  column.h
+  compare.h
+  loader.h
   memory_pool.h
   pretty_print.h
   schema.h

http://git-wip-us.apache.org/repos/asf/arrow/blob/6b3ae2ae/cpp/src/arrow/array.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/array.cc b/cpp/src/arrow/array.cc
index 284bb57..49da6bb 100644
--- a/cpp/src/arrow/array.cc
+++ b/cpp/src/arrow/array.cc
@@ -165,6 +165,7 @@ template class NumericArray<Int32Type>;
 template class NumericArray<Int64Type>;
 template class NumericArray<TimestampType>;
 template class NumericArray<DateType>;
+template class NumericArray<Date32Type>;
 template class NumericArray<TimeType>;
 template class NumericArray<HalfFloatType>;
 template class NumericArray<FloatType>;
@@ -193,7 +194,7 @@ std::shared_ptr<Array> BooleanArray::Slice(int64_t offset, int64_t length) const
 
 Status ListArray::Validate() const {
   if (length_ < 0) { return Status::Invalid("Length was negative"); }
-  if (!value_offsets_) { return Status::Invalid("value_offsets_ was null"); }
+  if (length_ && !value_offsets_) { return Status::Invalid("value_offsets_ was null"); }
   if (value_offsets_->size() / static_cast<int>(sizeof(int32_t)) < length_) {
     std::stringstream ss;
     ss << "offset buffer size (bytes): " << value_offsets_->size()
@@ -425,20 +426,6 @@ std::shared_ptr<Array> UnionArray::Slice(int64_t offset, int64_t length) const {
 // ----------------------------------------------------------------------
 // DictionaryArray
 
-Status DictionaryArray::FromBuffer(const std::shared_ptr<DataType>& type, int64_t length,
-    const std::shared_ptr<Buffer>& indices, const std::shared_ptr<Buffer>& null_bitmap,
-    int64_t null_count, int64_t offset, std::shared_ptr<DictionaryArray>* out) {
-  DCHECK_EQ(type->type, Type::DICTIONARY);
-  const auto& dict_type = static_cast<const DictionaryType*>(type.get());
-
-  std::shared_ptr<Array> boxed_indices;
-  RETURN_NOT_OK(MakePrimitiveArray(dict_type->index_type(), length, indices, null_bitmap,
-      null_count, offset, &boxed_indices));
-
-  *out = std::make_shared<DictionaryArray>(type, boxed_indices);
-  return Status::OK();
-}
-
 DictionaryArray::DictionaryArray(
     const std::shared_ptr<DataType>& type, const std::shared_ptr<Array>& indices)
     : Array(type, indices->length(), indices->null_bitmap(), indices->null_count(),
@@ -470,40 +457,6 @@ std::shared_ptr<Array> DictionaryArray::Slice(int64_t offset, int64_t length) co
 }
 
 // ----------------------------------------------------------------------
-
-#define MAKE_PRIMITIVE_ARRAY_CASE(ENUM, ArrayType)                                  \
-  case Type::ENUM:                                                                  \
-    out->reset(new ArrayType(type, length, data, null_bitmap, null_count, offset)); \
-    break;
-
-Status MakePrimitiveArray(const std::shared_ptr<DataType>& type, int64_t length,
-    const std::shared_ptr<Buffer>& data, const std::shared_ptr<Buffer>& null_bitmap,
-    int64_t null_count, int64_t offset, std::shared_ptr<Array>* out) {
-  switch (type->type) {
-    MAKE_PRIMITIVE_ARRAY_CASE(BOOL, BooleanArray);
-    MAKE_PRIMITIVE_ARRAY_CASE(UINT8, UInt8Array);
-    MAKE_PRIMITIVE_ARRAY_CASE(INT8, Int8Array);
-    MAKE_PRIMITIVE_ARRAY_CASE(UINT16, UInt16Array);
-    MAKE_PRIMITIVE_ARRAY_CASE(INT16, Int16Array);
-    MAKE_PRIMITIVE_ARRAY_CASE(UINT32, UInt32Array);
-    MAKE_PRIMITIVE_ARRAY_CASE(INT32, Int32Array);
-    MAKE_PRIMITIVE_ARRAY_CASE(UINT64, UInt64Array);
-    MAKE_PRIMITIVE_ARRAY_CASE(INT64, Int64Array);
-    MAKE_PRIMITIVE_ARRAY_CASE(FLOAT, FloatArray);
-    MAKE_PRIMITIVE_ARRAY_CASE(DOUBLE, DoubleArray);
-    MAKE_PRIMITIVE_ARRAY_CASE(TIME, Int64Array);
-    MAKE_PRIMITIVE_ARRAY_CASE(TIMESTAMP, TimestampArray);
-    default:
-      return Status::NotImplemented(type->ToString());
-  }
-#ifdef NDEBUG
-  return Status::OK();
-#else
-  return (*out)->Validate();
-#endif
-}
-
-// ----------------------------------------------------------------------
 // Default implementations of ArrayVisitor methods
 
 #define ARRAY_VISITOR_DEFAULT(ARRAY_CLASS)                   \
@@ -527,6 +480,7 @@ ARRAY_VISITOR_DEFAULT(DoubleArray);
 ARRAY_VISITOR_DEFAULT(StringArray);
 ARRAY_VISITOR_DEFAULT(BinaryArray);
 ARRAY_VISITOR_DEFAULT(DateArray);
+ARRAY_VISITOR_DEFAULT(Date32Array);
 ARRAY_VISITOR_DEFAULT(TimeArray);
 ARRAY_VISITOR_DEFAULT(TimestampArray);
 ARRAY_VISITOR_DEFAULT(IntervalArray);

http://git-wip-us.apache.org/repos/asf/arrow/blob/6b3ae2ae/cpp/src/arrow/array.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/array.h b/cpp/src/arrow/array.h
index f20f212..f111609 100644
--- a/cpp/src/arrow/array.h
+++ b/cpp/src/arrow/array.h
@@ -58,6 +58,7 @@ class ARROW_EXPORT ArrayVisitor {
   virtual Status Visit(const StringArray& array);
   virtual Status Visit(const BinaryArray& array);
   virtual Status Visit(const DateArray& array);
+  virtual Status Visit(const Date32Array& array);
   virtual Status Visit(const TimeArray& array);
   virtual Status Visit(const TimestampArray& array);
   virtual Status Visit(const IntervalArray& array);
@@ -485,12 +486,6 @@ class ARROW_EXPORT DictionaryArray : public Array {
   DictionaryArray(
       const std::shared_ptr<DataType>& type, const std::shared_ptr<Array>& indices);
 
-  // Alternate ctor; other attributes (like null count) are inherited from the
-  // passed indices array
-  static Status FromBuffer(const std::shared_ptr<DataType>& type, int64_t length,
-      const std::shared_ptr<Buffer>& indices, const std::shared_ptr<Buffer>& null_bitmap,
-      int64_t null_count, int64_t offset, std::shared_ptr<DictionaryArray>* out);
-
   Status Validate() const override;
 
   std::shared_ptr<Array> indices() const { return indices_; }
@@ -531,21 +526,13 @@ extern template class ARROW_EXPORT NumericArray<FloatType>;
 extern template class ARROW_EXPORT NumericArray<DoubleType>;
 extern template class ARROW_EXPORT NumericArray<TimestampType>;
 extern template class ARROW_EXPORT NumericArray<DateType>;
+extern template class ARROW_EXPORT NumericArray<Date32Type>;
 extern template class ARROW_EXPORT NumericArray<TimeType>;
 
 #if defined(__GNUC__) && !defined(__clang__)
 #pragma GCC diagnostic pop
 #endif
 
-// ----------------------------------------------------------------------
-// Helper functions
-
-// Create new arrays for logical types that are backed by primitive arrays.
-Status ARROW_EXPORT MakePrimitiveArray(const std::shared_ptr<DataType>& type,
-    int64_t length, const std::shared_ptr<Buffer>& data,
-    const std::shared_ptr<Buffer>& null_bitmap, int64_t null_count, int64_t offset,
-    std::shared_ptr<Array>* out);
-
 }  // namespace arrow
 
 #endif

http://git-wip-us.apache.org/repos/asf/arrow/blob/6b3ae2ae/cpp/src/arrow/builder.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/builder.cc b/cpp/src/arrow/builder.cc
index 9086598..4372925 100644
--- a/cpp/src/arrow/builder.cc
+++ b/cpp/src/arrow/builder.cc
@@ -238,6 +238,7 @@ template class PrimitiveBuilder<Int16Type>;
 template class PrimitiveBuilder<Int32Type>;
 template class PrimitiveBuilder<Int64Type>;
 template class PrimitiveBuilder<DateType>;
+template class PrimitiveBuilder<Date32Type>;
 template class PrimitiveBuilder<TimestampType>;
 template class PrimitiveBuilder<TimeType>;
 template class PrimitiveBuilder<HalfFloatType>;

http://git-wip-us.apache.org/repos/asf/arrow/blob/6b3ae2ae/cpp/src/arrow/builder.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/builder.h b/cpp/src/arrow/builder.h
index e642d3c..ebc683a 100644
--- a/cpp/src/arrow/builder.h
+++ b/cpp/src/arrow/builder.h
@@ -233,6 +233,7 @@ using Int64Builder = NumericBuilder<Int64Type>;
 using TimestampBuilder = NumericBuilder<TimestampType>;
 using TimeBuilder = NumericBuilder<TimeType>;
 using DateBuilder = NumericBuilder<DateType>;
+using Date32Builder = NumericBuilder<Date32Type>;
 
 using HalfFloatBuilder = NumericBuilder<HalfFloatType>;
 using FloatBuilder = NumericBuilder<FloatType>;

http://git-wip-us.apache.org/repos/asf/arrow/blob/6b3ae2ae/cpp/src/arrow/column.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/column.cc b/cpp/src/arrow/column.cc
index 1822870..78501f9 100644
--- a/cpp/src/arrow/column.cc
+++ b/cpp/src/arrow/column.cc
@@ -97,6 +97,9 @@ Column::Column(const std::shared_ptr<Field>& field, const std::shared_ptr<Array>
   }
 }
 
+Column::Column(const std::string& name, const std::shared_ptr<Array>& data)
+    : Column(::arrow::field(name, data->type()), data) {}
+
 Column::Column(
     const std::shared_ptr<Field>& field, const std::shared_ptr<ChunkedArray>& data)
     : field_(field), data_(data) {}

http://git-wip-us.apache.org/repos/asf/arrow/blob/6b3ae2ae/cpp/src/arrow/column.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/column.h b/cpp/src/arrow/column.h
index 93a34c7..bfcfd8e 100644
--- a/cpp/src/arrow/column.h
+++ b/cpp/src/arrow/column.h
@@ -69,6 +69,9 @@ class ARROW_EXPORT Column {
 
   Column(const std::shared_ptr<Field>& field, const std::shared_ptr<Array>& data);
 
+  /// Construct from name and array
+  Column(const std::string& name, const std::shared_ptr<Array>& data);
+
   int64_t length() const { return data_->length(); }
 
   int64_t null_count() const { return data_->null_count(); }

http://git-wip-us.apache.org/repos/asf/arrow/blob/6b3ae2ae/cpp/src/arrow/compare.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/compare.cc b/cpp/src/arrow/compare.cc
index f38f8d6..17b8833 100644
--- a/cpp/src/arrow/compare.cc
+++ b/cpp/src/arrow/compare.cc
@@ -145,6 +145,10 @@ class RangeEqualsVisitor : public ArrayVisitor {
 
   Status Visit(const DateArray& left) override { return CompareValues<DateArray>(left); }
 
+  Status Visit(const Date32Array& left) override {
+    return CompareValues<Date32Array>(left);
+  }
+
   Status Visit(const TimeArray& left) override { return CompareValues<TimeArray>(left); }
 
   Status Visit(const TimestampArray& left) override {
@@ -381,6 +385,8 @@ class ArrayEqualsVisitor : public RangeEqualsVisitor {
 
   Status Visit(const DateArray& left) override { return ComparePrimitive(left); }
 
+  Status Visit(const Date32Array& left) override { return ComparePrimitive(left); }
+
   Status Visit(const TimeArray& left) override { return ComparePrimitive(left); }
 
   Status Visit(const TimestampArray& left) override { return ComparePrimitive(left); }
@@ -622,7 +628,7 @@ class TypeEqualsVisitor : public TypeVisitor {
 
   Status Visit(const TimestampType& left) override {
     const auto& right = static_cast<const TimestampType&>(right_);
-    result_ = left.unit == right.unit;
+    result_ = left.unit == right.unit && left.timezone == right.timezone;
     return Status::OK();
   }
 

http://git-wip-us.apache.org/repos/asf/arrow/blob/6b3ae2ae/cpp/src/arrow/io/memory.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/io/memory.cc b/cpp/src/arrow/io/memory.cc
index 1339a99..5b5c864 100644
--- a/cpp/src/arrow/io/memory.cc
+++ b/cpp/src/arrow/io/memory.cc
@@ -28,6 +28,7 @@
 #include "arrow/buffer.h"
 #include "arrow/io/interfaces.h"
 #include "arrow/status.h"
+#include "arrow/util/logging.h"
 
 namespace arrow {
 namespace io {
@@ -43,9 +44,17 @@ BufferOutputStream::BufferOutputStream(const std::shared_ptr<ResizableBuffer>& b
       position_(0),
       mutable_data_(buffer->mutable_data()) {}
 
+Status BufferOutputStream::Create(int64_t initial_capacity, MemoryPool* pool,
+    std::shared_ptr<BufferOutputStream>* out) {
+  std::shared_ptr<ResizableBuffer> buffer;
+  RETURN_NOT_OK(AllocateResizableBuffer(pool, initial_capacity, &buffer));
+  *out = std::make_shared<BufferOutputStream>(buffer);
+  return Status::OK();
+}
+
 BufferOutputStream::~BufferOutputStream() {
   // This can fail, better to explicitly call close
-  Close();
+  if (buffer_) { Close(); }
 }
 
 Status BufferOutputStream::Close() {
@@ -56,12 +65,20 @@ Status BufferOutputStream::Close() {
   }
 }
 
+Status BufferOutputStream::Finish(std::shared_ptr<Buffer>* result) {
+  RETURN_NOT_OK(Close());
+  *result = buffer_;
+  buffer_ = nullptr;
+  return Status::OK();
+}
+
 Status BufferOutputStream::Tell(int64_t* position) {
   *position = position_;
   return Status::OK();
 }
 
 Status BufferOutputStream::Write(const uint8_t* data, int64_t nbytes) {
+  DCHECK(buffer_);
   RETURN_NOT_OK(Reserve(nbytes));
   std::memcpy(mutable_data_ + position_, data, nbytes);
   position_ += nbytes;

http://git-wip-us.apache.org/repos/asf/arrow/blob/6b3ae2ae/cpp/src/arrow/io/memory.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/io/memory.h b/cpp/src/arrow/io/memory.h
index 2d3df42..8280750 100644
--- a/cpp/src/arrow/io/memory.h
+++ b/cpp/src/arrow/io/memory.h
@@ -43,6 +43,9 @@ class ARROW_EXPORT BufferOutputStream : public OutputStream {
  public:
   explicit BufferOutputStream(const std::shared_ptr<ResizableBuffer>& buffer);
 
+  static Status Create(int64_t initial_capacity, MemoryPool* pool,
+      std::shared_ptr<BufferOutputStream>* out);
+
   ~BufferOutputStream();
 
   // Implement the OutputStream interface
@@ -50,6 +53,9 @@ class ARROW_EXPORT BufferOutputStream : public OutputStream {
   Status Tell(int64_t* position) override;
   Status Write(const uint8_t* data, int64_t nbytes) override;
 
+  /// Close the stream and return the buffer
+  Status Finish(std::shared_ptr<Buffer>* result);
+
  private:
   // Ensures there is sufficient space available to write nbytes
   Status Reserve(int64_t nbytes);

http://git-wip-us.apache.org/repos/asf/arrow/blob/6b3ae2ae/cpp/src/arrow/ipc/adapter.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/adapter.cc b/cpp/src/arrow/ipc/adapter.cc
index f11c88a..78d5810 100644
--- a/cpp/src/arrow/ipc/adapter.cc
+++ b/cpp/src/arrow/ipc/adapter.cc
@@ -32,6 +32,7 @@
 #include "arrow/ipc/metadata-internal.h"
 #include "arrow/ipc/metadata.h"
 #include "arrow/ipc/util.h"
+#include "arrow/loader.h"
 #include "arrow/memory_pool.h"
 #include "arrow/schema.h"
 #include "arrow/status.h"
@@ -531,12 +532,12 @@ Status WriteRecordBatch(const RecordBatch& batch, int64_t buffer_start_offset,
 Status WriteDictionary(int64_t dictionary_id, const std::shared_ptr<Array>& dictionary,
     int64_t buffer_start_offset, io::OutputStream* dst, int32_t* metadata_length,
     int64_t* body_length, MemoryPool* pool) {
-  DictionaryWriter writer(pool, buffer_start_offset, kMaxIpcRecursionDepth);
+  DictionaryWriter writer(pool, buffer_start_offset, kMaxNestingDepth);
   return writer.Write(dictionary_id, dictionary, dst, metadata_length, body_length);
 }
 
 Status GetRecordBatchSize(const RecordBatch& batch, int64_t* size) {
-  RecordBatchWriter writer(default_memory_pool(), 0, kMaxIpcRecursionDepth);
+  RecordBatchWriter writer(default_memory_pool(), 0, kMaxNestingDepth);
   RETURN_NOT_OK(writer.GetTotalSize(batch, size));
   return Status::OK();
 }
@@ -544,235 +545,33 @@ Status GetRecordBatchSize(const RecordBatch& batch, int64_t* size) {
 // ----------------------------------------------------------------------
 // Record batch read path
 
-struct RecordBatchContext {
-  const RecordBatchMetadata* metadata;
-  int buffer_index;
-  int field_index;
-  int max_recursion_depth;
-};
-
-// Traverse the flattened record batch metadata and reassemble the
-// corresponding array containers
-class ArrayLoader : public TypeVisitor {
+class IpcComponentSource : public ArrayComponentSource {
  public:
-  ArrayLoader(
-      const Field& field, RecordBatchContext* context, io::ReadableFileInterface* file)
-      : field_(field), context_(context), file_(file) {}
-
-  Status Load(std::shared_ptr<Array>* out) {
-    if (context_->max_recursion_depth <= 0) {
-      return Status::Invalid("Max recursion depth reached");
-    }
-
-    // Load the array
-    RETURN_NOT_OK(field_.type->Accept(this));
+  IpcComponentSource(const RecordBatchMetadata& metadata, io::ReadableFileInterface* file)
+      : metadata_(metadata), file_(file) {}
 
-    *out = std::move(result_);
-    return Status::OK();
-  }
-
- private:
-  const Field& field_;
-  RecordBatchContext* context_;
-  io::ReadableFileInterface* file_;
-
-  // Used in visitor pattern
-  std::shared_ptr<Array> result_;
-
-  Status LoadChild(const Field& field, std::shared_ptr<Array>* out) {
-    ArrayLoader loader(field, context_, file_);
-    --context_->max_recursion_depth;
-    RETURN_NOT_OK(loader.Load(out));
-    ++context_->max_recursion_depth;
-    return Status::OK();
-  }
-
-  Status GetBuffer(int buffer_index, std::shared_ptr<Buffer>* out) {
-    BufferMetadata metadata = context_->metadata->buffer(buffer_index);
-
-    if (metadata.length == 0) {
+  Status GetBuffer(int buffer_index, std::shared_ptr<Buffer>* out) override {
+    BufferMetadata buffer_meta = metadata_.buffer(buffer_index);
+    if (buffer_meta.length == 0) {
       *out = nullptr;
       return Status::OK();
     } else {
-      return file_->ReadAt(metadata.offset, metadata.length, out);
+      return file_->ReadAt(buffer_meta.offset, buffer_meta.length, out);
     }
   }
 
-  Status LoadCommon(FieldMetadata* field_meta, std::shared_ptr<Buffer>* null_bitmap) {
+  Status GetFieldMetadata(int field_index, FieldMetadata* metadata) override {
     // pop off a field
-    if (context_->field_index >= context_->metadata->num_fields()) {
+    if (field_index >= metadata_.num_fields()) {
       return Status::Invalid("Ran out of field metadata, likely malformed");
     }
-
-    // This only contains the length and null count, which we need to figure
-    // out what to do with the buffers. For example, if null_count == 0, then
-    // we can skip that buffer without reading from shared memory
-    *field_meta = context_->metadata->field(context_->field_index++);
-
-    // extract null_bitmap which is common to all arrays
-    if (field_meta->null_count == 0) {
-      *null_bitmap = nullptr;
-    } else {
-      RETURN_NOT_OK(GetBuffer(context_->buffer_index, null_bitmap));
-    }
-    context_->buffer_index++;
-    return Status::OK();
-  }
-
-  Status LoadPrimitive(const DataType& type) {
-    FieldMetadata field_meta;
-    std::shared_ptr<Buffer> null_bitmap, data;
-
-    RETURN_NOT_OK(LoadCommon(&field_meta, &null_bitmap));
-    if (field_meta.length > 0) {
-      RETURN_NOT_OK(GetBuffer(context_->buffer_index++, &data));
-    } else {
-      context_->buffer_index++;
-      data.reset(new Buffer(nullptr, 0));
-    }
-    return MakePrimitiveArray(field_.type, field_meta.length, data, null_bitmap,
-        field_meta.null_count, 0, &result_);
-  }
-
-  template <typename CONTAINER>
-  Status LoadBinary() {
-    FieldMetadata field_meta;
-    std::shared_ptr<Buffer> null_bitmap, offsets, values;
-
-    RETURN_NOT_OK(LoadCommon(&field_meta, &null_bitmap));
-    if (field_meta.length > 0) {
-      RETURN_NOT_OK(GetBuffer(context_->buffer_index++, &offsets));
-      RETURN_NOT_OK(GetBuffer(context_->buffer_index++, &values));
-    } else {
-      context_->buffer_index += 2;
-      offsets = values = nullptr;
-    }
-
-    result_ = std::make_shared<CONTAINER>(
-        field_meta.length, offsets, values, null_bitmap, field_meta.null_count);
-    return Status::OK();
-  }
-
-  Status Visit(const BooleanType& type) override { return LoadPrimitive(type); }
-
-  Status Visit(const Int8Type& type) override { return LoadPrimitive(type); }
-
-  Status Visit(const Int16Type& type) override { return LoadPrimitive(type); }
-
-  Status Visit(const Int32Type& type) override { return LoadPrimitive(type); }
-
-  Status Visit(const Int64Type& type) override { return LoadPrimitive(type); }
-
-  Status Visit(const UInt8Type& type) override { return LoadPrimitive(type); }
-
-  Status Visit(const UInt16Type& type) override { return LoadPrimitive(type); }
-
-  Status Visit(const UInt32Type& type) override { return LoadPrimitive(type); }
-
-  Status Visit(const UInt64Type& type) override { return LoadPrimitive(type); }
-
-  Status Visit(const HalfFloatType& type) override { return LoadPrimitive(type); }
-
-  Status Visit(const FloatType& type) override { return LoadPrimitive(type); }
-
-  Status Visit(const DoubleType& type) override { return LoadPrimitive(type); }
-
-  Status Visit(const StringType& type) override { return LoadBinary<StringArray>(); }
-
-  Status Visit(const BinaryType& type) override { return LoadBinary<BinaryArray>(); }
-
-  Status Visit(const DateType& type) override { return LoadPrimitive(type); }
-
-  Status Visit(const TimeType& type) override { return LoadPrimitive(type); }
-
-  Status Visit(const TimestampType& type) override { return LoadPrimitive(type); }
-
-  Status Visit(const ListType& type) override {
-    FieldMetadata field_meta;
-    std::shared_ptr<Buffer> null_bitmap, offsets;
-
-    RETURN_NOT_OK(LoadCommon(&field_meta, &null_bitmap));
-    if (field_meta.length > 0) {
-      RETURN_NOT_OK(GetBuffer(context_->buffer_index, &offsets));
-    } else {
-      offsets = nullptr;
-    }
-    ++context_->buffer_index;
-
-    const int num_children = type.num_children();
-    if (num_children != 1) {
-      std::stringstream ss;
-      ss << "Wrong number of children: " << num_children;
-      return Status::Invalid(ss.str());
-    }
-    std::shared_ptr<Array> values_array;
-
-    RETURN_NOT_OK(LoadChild(*type.child(0).get(), &values_array));
-
-    result_ = std::make_shared<ListArray>(field_.type, field_meta.length, offsets,
-        values_array, null_bitmap, field_meta.null_count);
-    return Status::OK();
-  }
-
-  Status LoadChildren(std::vector<std::shared_ptr<Field>> child_fields,
-      std::vector<std::shared_ptr<Array>>* arrays) {
-    arrays->reserve(static_cast<int>(child_fields.size()));
-
-    for (const auto& child_field : child_fields) {
-      std::shared_ptr<Array> field_array;
-      RETURN_NOT_OK(LoadChild(*child_field.get(), &field_array));
-      arrays->emplace_back(field_array);
-    }
+    *metadata = metadata_.field(field_index);
     return Status::OK();
   }
 
-  Status Visit(const StructType& type) override {
-    FieldMetadata field_meta;
-    std::shared_ptr<Buffer> null_bitmap;
-    RETURN_NOT_OK(LoadCommon(&field_meta, &null_bitmap));
-
-    std::vector<std::shared_ptr<Array>> fields;
-    RETURN_NOT_OK(LoadChildren(type.children(), &fields));
-
-    result_ = std::make_shared<StructArray>(
-        field_.type, field_meta.length, fields, null_bitmap, field_meta.null_count);
-    return Status::OK();
-  }
-
-  Status Visit(const UnionType& type) override {
-    FieldMetadata field_meta;
-    std::shared_ptr<Buffer> null_bitmap, type_ids, offsets;
-
-    RETURN_NOT_OK(LoadCommon(&field_meta, &null_bitmap));
-    if (field_meta.length > 0) {
-      RETURN_NOT_OK(GetBuffer(context_->buffer_index, &type_ids));
-      if (type.mode == UnionMode::DENSE) {
-        RETURN_NOT_OK(GetBuffer(context_->buffer_index + 1, &offsets));
-      }
-    }
-    context_->buffer_index += type.mode == UnionMode::DENSE ? 2 : 1;
-
-    std::vector<std::shared_ptr<Array>> fields;
-    RETURN_NOT_OK(LoadChildren(type.children(), &fields));
-
-    result_ = std::make_shared<UnionArray>(field_.type, field_meta.length, fields,
-        type_ids, offsets, null_bitmap, field_meta.null_count);
-    return Status::OK();
-  }
-
-  Status Visit(const DictionaryType& type) override {
-    FieldMetadata field_meta;
-    std::shared_ptr<Buffer> null_bitmap, indices_data;
-    RETURN_NOT_OK(LoadCommon(&field_meta, &null_bitmap));
-    RETURN_NOT_OK(GetBuffer(context_->buffer_index++, &indices_data));
-
-    std::shared_ptr<Array> indices;
-    RETURN_NOT_OK(MakePrimitiveArray(type.index_type(), field_meta.length, indices_data,
-        null_bitmap, field_meta.null_count, 0, &indices));
-
-    result_ = std::make_shared<DictionaryArray>(field_.type, indices);
-    return Status::OK();
-  };
+ private:
+  const RecordBatchMetadata& metadata_;
+  io::ReadableFileInterface* file_;
 };
 
 class RecordBatchReader {
@@ -788,17 +587,15 @@ class RecordBatchReader {
   Status Read(std::shared_ptr<RecordBatch>* out) {
     std::vector<std::shared_ptr<Array>> arrays(schema_->num_fields());
 
-    // The field_index and buffer_index are incremented in the ArrayLoader
-    // based on how much of the batch is "consumed" (through nested data
-    // reconstruction, for example)
-    context_.metadata = &metadata_;
-    context_.field_index = 0;
-    context_.buffer_index = 0;
-    context_.max_recursion_depth = max_recursion_depth_;
+    IpcComponentSource source(metadata_, file_);
+    ArrayLoaderContext context;
+    context.source = &source;
+    context.field_index = 0;
+    context.buffer_index = 0;
+    context.max_recursion_depth = max_recursion_depth_;
 
     for (int i = 0; i < schema_->num_fields(); ++i) {
-      ArrayLoader loader(*schema_->field(i).get(), &context_, file_);
-      RETURN_NOT_OK(loader.Load(&arrays[i]));
+      RETURN_NOT_OK(LoadArray(schema_->field(i)->type, &context, &arrays[i]));
     }
 
     *out = std::make_shared<RecordBatch>(schema_, metadata_.length(), arrays);
@@ -806,7 +603,6 @@ class RecordBatchReader {
   }
 
  private:
-  RecordBatchContext context_;
   const RecordBatchMetadata& metadata_;
   std::shared_ptr<Schema> schema_;
   int max_recursion_depth_;
@@ -816,7 +612,7 @@ class RecordBatchReader {
 Status ReadRecordBatch(const RecordBatchMetadata& metadata,
     const std::shared_ptr<Schema>& schema, io::ReadableFileInterface* file,
     std::shared_ptr<RecordBatch>* out) {
-  return ReadRecordBatch(metadata, schema, kMaxIpcRecursionDepth, file, out);
+  return ReadRecordBatch(metadata, schema, kMaxNestingDepth, file, out);
 }
 
 Status ReadRecordBatch(const RecordBatchMetadata& metadata,

http://git-wip-us.apache.org/repos/asf/arrow/blob/6b3ae2ae/cpp/src/arrow/ipc/adapter.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/adapter.h b/cpp/src/arrow/ipc/adapter.h
index 933d3a4..21d814d 100644
--- a/cpp/src/arrow/ipc/adapter.h
+++ b/cpp/src/arrow/ipc/adapter.h
@@ -26,6 +26,7 @@
 #include <vector>
 
 #include "arrow/ipc/metadata.h"
+#include "arrow/loader.h"
 #include "arrow/util/visibility.h"
 
 namespace arrow {
@@ -47,11 +48,6 @@ namespace ipc {
 
 // ----------------------------------------------------------------------
 // Write path
-//
-// ARROW-109: We set this number arbitrarily to help catch user mistakes. For
-// deeply nested schemas, it is expected the user will indicate explicitly the
-// maximum allowed recursion depth
-constexpr int kMaxIpcRecursionDepth = 64;
 
 // Write the RecordBatch (collection of equal-length Arrow arrays) to the
 // output stream in a contiguous block. The record batch metadata is written as
@@ -75,7 +71,7 @@ constexpr int kMaxIpcRecursionDepth = 64;
 // padding bytes
 Status WriteRecordBatch(const RecordBatch& batch, int64_t buffer_start_offset,
     io::OutputStream* dst, int32_t* metadata_length, int64_t* body_length,
-    MemoryPool* pool, int max_recursion_depth = kMaxIpcRecursionDepth);
+    MemoryPool* pool, int max_recursion_depth = kMaxNestingDepth);
 
 // Write Array as a DictionaryBatch message
 Status WriteDictionary(int64_t dictionary_id, const std::shared_ptr<Array>& dictionary,

http://git-wip-us.apache.org/repos/asf/arrow/blob/6b3ae2ae/cpp/src/arrow/ipc/metadata.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/metadata.cc b/cpp/src/arrow/ipc/metadata.cc
index 2ba44ac..695e788 100644
--- a/cpp/src/arrow/ipc/metadata.cc
+++ b/cpp/src/arrow/ipc/metadata.cc
@@ -289,6 +289,7 @@ FieldMetadata RecordBatchMetadata::field(int i) const {
   FieldMetadata result;
   result.length = node->length();
   result.null_count = node->null_count();
+  result.offset = 0;
   return result;
 }
 

http://git-wip-us.apache.org/repos/asf/arrow/blob/6b3ae2ae/cpp/src/arrow/ipc/metadata.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/metadata.h b/cpp/src/arrow/ipc/metadata.h
index f12529b..f6a0a3a 100644
--- a/cpp/src/arrow/ipc/metadata.h
+++ b/cpp/src/arrow/ipc/metadata.h
@@ -25,6 +25,7 @@
 #include <unordered_map>
 #include <vector>
 
+#include "arrow/loader.h"
 #include "arrow/util/macros.h"
 #include "arrow/util/visibility.h"
 
@@ -135,12 +136,6 @@ class ARROW_EXPORT SchemaMetadata {
   DISALLOW_COPY_AND_ASSIGN(SchemaMetadata);
 };
 
-// Field metadata
-struct ARROW_EXPORT FieldMetadata {
-  int32_t length;
-  int32_t null_count;
-};
-
 struct ARROW_EXPORT BufferMetadata {
   int32_t page;
   int64_t offset;

http://git-wip-us.apache.org/repos/asf/arrow/blob/6b3ae2ae/cpp/src/arrow/loader.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/loader.cc b/cpp/src/arrow/loader.cc
new file mode 100644
index 0000000..3cb51ae
--- /dev/null
+++ b/cpp/src/arrow/loader.cc
@@ -0,0 +1,285 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/loader.h"
+
+#include <cstdint>
+#include <memory>
+#include <sstream>
+#include <vector>
+
+#include "arrow/array.h"
+#include "arrow/buffer.h"
+#include "arrow/type.h"
+#include "arrow/type_traits.h"
+#include "arrow/util/logging.h"
+#include "arrow/util/visibility.h"
+
+namespace arrow {
+
+class Array;
+struct DataType;
+class Status;
+
+class ArrayLoader : public TypeVisitor {
+ public:
+  ArrayLoader(const std::shared_ptr<DataType>& type, ArrayLoaderContext* context)
+      : type_(type), context_(context) {}
+
+  Status Load(std::shared_ptr<Array>* out) {
+    if (context_->max_recursion_depth <= 0) {
+      return Status::Invalid("Max recursion depth reached");
+    }
+
+    // Load the array
+    RETURN_NOT_OK(type_->Accept(this));
+
+    *out = std::move(result_);
+    return Status::OK();
+  }
+
+  Status GetBuffer(int buffer_index, std::shared_ptr<Buffer>* out) {
+    return context_->source->GetBuffer(buffer_index, out);
+  }
+
+  Status LoadCommon(FieldMetadata* field_meta, std::shared_ptr<Buffer>* null_bitmap) {
+    // This only contains the length and null count, which we need to figure
+    // out what to do with the buffers. For example, if null_count == 0, then
+    // we can skip that buffer without reading from shared memory
+    RETURN_NOT_OK(
+        context_->source->GetFieldMetadata(context_->field_index++, field_meta));
+
+    // extract null_bitmap which is common to all arrays
+    if (field_meta->null_count == 0) {
+      *null_bitmap = nullptr;
+    } else {
+      RETURN_NOT_OK(GetBuffer(context_->buffer_index, null_bitmap));
+    }
+    context_->buffer_index++;
+    return Status::OK();
+  }
+
+  template <typename TYPE>
+  Status LoadPrimitive() {
+    using ArrayType = typename TypeTraits<TYPE>::ArrayType;
+
+    FieldMetadata field_meta;
+    std::shared_ptr<Buffer> null_bitmap, data;
+
+    RETURN_NOT_OK(LoadCommon(&field_meta, &null_bitmap));
+    if (field_meta.length > 0) {
+      RETURN_NOT_OK(GetBuffer(context_->buffer_index++, &data));
+    } else {
+      context_->buffer_index++;
+      data.reset(new Buffer(nullptr, 0));
+    }
+    result_ = std::make_shared<ArrayType>(type_, field_meta.length, data, null_bitmap,
+        field_meta.null_count, field_meta.offset);
+    return Status::OK();
+  }
+
+  template <typename CONTAINER>
+  Status LoadBinary() {
+    FieldMetadata field_meta;
+    std::shared_ptr<Buffer> null_bitmap, offsets, values;
+
+    RETURN_NOT_OK(LoadCommon(&field_meta, &null_bitmap));
+    if (field_meta.length > 0) {
+      RETURN_NOT_OK(GetBuffer(context_->buffer_index++, &offsets));
+      RETURN_NOT_OK(GetBuffer(context_->buffer_index++, &values));
+    } else {
+      context_->buffer_index += 2;
+      offsets = values = nullptr;
+    }
+
+    result_ = std::make_shared<CONTAINER>(
+        field_meta.length, offsets, values, null_bitmap, field_meta.null_count);
+    return Status::OK();
+  }
+
+  Status LoadChild(const Field& field, std::shared_ptr<Array>* out) {
+    ArrayLoader loader(field.type, context_);
+    --context_->max_recursion_depth;
+    RETURN_NOT_OK(loader.Load(out));
+    ++context_->max_recursion_depth;
+    return Status::OK();
+  }
+
+  Status LoadChildren(std::vector<std::shared_ptr<Field>> child_fields,
+      std::vector<std::shared_ptr<Array>>* arrays) {
+    arrays->reserve(static_cast<int>(child_fields.size()));
+
+    for (const auto& child_field : child_fields) {
+      std::shared_ptr<Array> field_array;
+      RETURN_NOT_OK(LoadChild(*child_field.get(), &field_array));
+      arrays->emplace_back(field_array);
+    }
+    return Status::OK();
+  }
+
+#define VISIT_PRIMITIVE(TYPE) \
+  Status Visit(const TYPE& type) override { return LoadPrimitive<TYPE>(); }
+
+  VISIT_PRIMITIVE(BooleanType);
+  VISIT_PRIMITIVE(Int8Type);
+  VISIT_PRIMITIVE(Int16Type);
+  VISIT_PRIMITIVE(Int32Type);
+  VISIT_PRIMITIVE(Int64Type);
+  VISIT_PRIMITIVE(UInt8Type);
+  VISIT_PRIMITIVE(UInt16Type);
+  VISIT_PRIMITIVE(UInt32Type);
+  VISIT_PRIMITIVE(UInt64Type);
+  VISIT_PRIMITIVE(HalfFloatType);
+  VISIT_PRIMITIVE(FloatType);
+  VISIT_PRIMITIVE(DoubleType);
+  VISIT_PRIMITIVE(DateType);
+  VISIT_PRIMITIVE(Date32Type);
+  VISIT_PRIMITIVE(TimeType);
+  VISIT_PRIMITIVE(TimestampType);
+
+#undef VISIT_PRIMITIVE
+
+  Status Visit(const StringType& type) override { return LoadBinary<StringArray>(); }
+
+  Status Visit(const BinaryType& type) override { return LoadBinary<BinaryArray>(); }
+
+  Status Visit(const ListType& type) override {
+    FieldMetadata field_meta;
+    std::shared_ptr<Buffer> null_bitmap, offsets;
+
+    RETURN_NOT_OK(LoadCommon(&field_meta, &null_bitmap));
+    if (field_meta.length > 0) {
+      RETURN_NOT_OK(GetBuffer(context_->buffer_index, &offsets));
+    } else {
+      offsets = nullptr;
+    }
+    ++context_->buffer_index;
+
+    const int num_children = type.num_children();
+    if (num_children != 1) {
+      std::stringstream ss;
+      ss << "Wrong number of children: " << num_children;
+      return Status::Invalid(ss.str());
+    }
+    std::shared_ptr<Array> values_array;
+
+    RETURN_NOT_OK(LoadChild(*type.child(0).get(), &values_array));
+
+    result_ = std::make_shared<ListArray>(type_, field_meta.length, offsets, values_array,
+        null_bitmap, field_meta.null_count);
+    return Status::OK();
+  }
+
+  Status Visit(const StructType& type) override {
+    FieldMetadata field_meta;
+    std::shared_ptr<Buffer> null_bitmap;
+    RETURN_NOT_OK(LoadCommon(&field_meta, &null_bitmap));
+
+    std::vector<std::shared_ptr<Array>> fields;
+    RETURN_NOT_OK(LoadChildren(type.children(), &fields));
+
+    result_ = std::make_shared<StructArray>(
+        type_, field_meta.length, fields, null_bitmap, field_meta.null_count);
+    return Status::OK();
+  }
+
+  Status Visit(const UnionType& type) override {
+    FieldMetadata field_meta;
+    std::shared_ptr<Buffer> null_bitmap, type_ids, offsets;
+
+    RETURN_NOT_OK(LoadCommon(&field_meta, &null_bitmap));
+    if (field_meta.length > 0) {
+      RETURN_NOT_OK(GetBuffer(context_->buffer_index, &type_ids));
+      if (type.mode == UnionMode::DENSE) {
+        RETURN_NOT_OK(GetBuffer(context_->buffer_index + 1, &offsets));
+      }
+    }
+    context_->buffer_index += type.mode == UnionMode::DENSE ? 2 : 1;
+
+    std::vector<std::shared_ptr<Array>> fields;
+    RETURN_NOT_OK(LoadChildren(type.children(), &fields));
+
+    result_ = std::make_shared<UnionArray>(type_, field_meta.length, fields, type_ids,
+        offsets, null_bitmap, field_meta.null_count);
+    return Status::OK();
+  }
+
+  Status Visit(const DictionaryType& type) override {
+    std::shared_ptr<Array> indices;
+    RETURN_NOT_OK(LoadArray(type.index_type(), context_, &indices));
+    result_ = std::make_shared<DictionaryArray>(type_, indices);
+    return Status::OK();
+  };
+
+  std::shared_ptr<Array> result() const { return result_; }
+
+ private:
+  const std::shared_ptr<DataType> type_;
+  ArrayLoaderContext* context_;
+
+  // Used in visitor pattern
+  std::shared_ptr<Array> result_;
+};
+
+Status ARROW_EXPORT LoadArray(const std::shared_ptr<DataType>& type,
+    ArrayComponentSource* source, std::shared_ptr<Array>* out) {
+  ArrayLoaderContext context;
+  context.source = source;
+  context.field_index = context.buffer_index = 0;
+  context.max_recursion_depth = kMaxNestingDepth;
+  return LoadArray(type, &context, out);
+}
+
+Status ARROW_EXPORT LoadArray(const std::shared_ptr<DataType>& type,
+    ArrayLoaderContext* context, std::shared_ptr<Array>* out) {
+  ArrayLoader loader(type, context);
+  RETURN_NOT_OK(loader.Load(out));
+
+  return Status::OK();
+}
+
+class InMemorySource : public ArrayComponentSource {
+ public:
+  InMemorySource(const std::vector<FieldMetadata>& fields,
+      const std::vector<std::shared_ptr<Buffer>>& buffers)
+      : fields_(fields), buffers_(buffers) {}
+
+  Status GetBuffer(int buffer_index, std::shared_ptr<Buffer>* out) {
+    DCHECK(buffer_index < static_cast<int>(buffers_.size()));
+    *out = buffers_[buffer_index];
+    return Status::OK();
+  }
+
+  Status GetFieldMetadata(int field_index, FieldMetadata* metadata) {
+    DCHECK(field_index < static_cast<int>(fields_.size()));
+    *metadata = fields_[field_index];
+    return Status::OK();
+  }
+
+ private:
+  const std::vector<FieldMetadata>& fields_;
+  const std::vector<std::shared_ptr<Buffer>>& buffers_;
+};
+
+Status ARROW_EXPORT LoadArray(const std::shared_ptr<DataType>& type,
+    const std::vector<FieldMetadata>& fields,
+    const std::vector<std::shared_ptr<Buffer>>& buffers, std::shared_ptr<Array>* out) {
+  InMemorySource source(fields, buffers);
+  return LoadArray(type, &source, out);
+}
+
+}  // namespace arrow

http://git-wip-us.apache.org/repos/asf/arrow/blob/6b3ae2ae/cpp/src/arrow/loader.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/loader.h b/cpp/src/arrow/loader.h
new file mode 100644
index 0000000..b4949f2
--- /dev/null
+++ b/cpp/src/arrow/loader.h
@@ -0,0 +1,89 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Function for constructing Array array objects from metadata and raw memory
+// buffers
+
+#ifndef ARROW_LOADER_H
+#define ARROW_LOADER_H
+
+#include <cstdint>
+#include <memory>
+#include <string>
+#include <vector>
+
+#include "arrow/status.h"
+#include "arrow/util/visibility.h"
+
+namespace arrow {
+
+class Array;
+class Buffer;
+struct DataType;
+
+// ARROW-109: We set this number arbitrarily to help catch user mistakes. For
+// deeply nested schemas, it is expected the user will indicate explicitly the
+// maximum allowed recursion depth
+constexpr int kMaxNestingDepth = 64;
+
+struct ARROW_EXPORT FieldMetadata {
+  int64_t length;
+  int64_t null_count;
+  int64_t offset;
+};
+
+/// Implement this to create new types of Arrow data loaders
+class ARROW_EXPORT ArrayComponentSource {
+ public:
+  virtual ~ArrayComponentSource() = default;
+
+  virtual Status GetBuffer(int buffer_index, std::shared_ptr<Buffer>* out) = 0;
+  virtual Status GetFieldMetadata(int field_index, FieldMetadata* metadata) = 0;
+};
+
+/// Bookkeeping struct for loading array objects from their constituent pieces of raw data
+///
+/// The field_index and buffer_index are incremented in the ArrayLoader
+/// based on how much of the batch is "consumed" (through nested data
+/// reconstruction, for example)
+struct ArrayLoaderContext {
+  ArrayComponentSource* source;
+  int buffer_index;
+  int field_index;
+  int max_recursion_depth;
+};
+
+/// Construct an Array container from type metadata and a collection of memory
+/// buffers
+///
+/// \param[in] field the data type of the array being loaded
+/// \param[in] source an implementation of ArrayComponentSource
+/// \param[out] out the constructed array
+/// \return Status indicating success or failure
+Status ARROW_EXPORT LoadArray(const std::shared_ptr<DataType>& type,
+    ArrayComponentSource* source, std::shared_ptr<Array>* out);
+
+Status ARROW_EXPORT LoadArray(const std::shared_ptr<DataType>& field,
+    ArrayLoaderContext* context, std::shared_ptr<Array>* out);
+
+Status ARROW_EXPORT LoadArray(const std::shared_ptr<DataType>& type,
+    const std::vector<FieldMetadata>& fields,
+    const std::vector<std::shared_ptr<Buffer>>& buffers, std::shared_ptr<Array>* out);
+
+}  // namespace arrow
+
+#endif  // ARROW_LOADER_H

http://git-wip-us.apache.org/repos/asf/arrow/blob/6b3ae2ae/cpp/src/arrow/pretty_print.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/pretty_print.cc b/cpp/src/arrow/pretty_print.cc
index 7e69e42..2508fa5 100644
--- a/cpp/src/arrow/pretty_print.cc
+++ b/cpp/src/arrow/pretty_print.cc
@@ -145,9 +145,11 @@ class ArrayPrinter : public ArrayVisitor {
 
   Status Visit(const BinaryArray& array) override { return WriteVarBytes(array); }
 
-  Status Visit(const DateArray& array) override { return Status::NotImplemented("date"); }
+  Status Visit(const DateArray& array) override { return WritePrimitive(array); }
 
-  Status Visit(const TimeArray& array) override { return Status::NotImplemented("time"); }
+  Status Visit(const Date32Array& array) override { return WritePrimitive(array); }
+
+  Status Visit(const TimeArray& array) override { return WritePrimitive(array); }
 
   Status Visit(const TimestampArray& array) override {
     return Status::NotImplemented("timestamp");

http://git-wip-us.apache.org/repos/asf/arrow/blob/6b3ae2ae/cpp/src/arrow/type.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/type.cc b/cpp/src/arrow/type.cc
index 7e5f13a..4679a2f 100644
--- a/cpp/src/arrow/type.cc
+++ b/cpp/src/arrow/type.cc
@@ -54,9 +54,7 @@ bool DataType::Equals(const DataType& other) const {
 }
 
 bool DataType::Equals(const std::shared_ptr<DataType>& other) const {
-  if (!other) {
-    return false;
-  }
+  if (!other) { return false; }
   return Equals(*other.get());
 }
 
@@ -106,6 +104,10 @@ std::string DateType::ToString() const {
   return std::string("date");
 }
 
+std::string Date32Type::ToString() const {
+  return std::string("date32");
+}
+
 // ----------------------------------------------------------------------
 // Union type
 
@@ -135,11 +137,12 @@ std::string UnionType::ToString() const {
 // ----------------------------------------------------------------------
 // DictionaryType
 
-DictionaryType::DictionaryType(
-    const std::shared_ptr<DataType>& index_type, const std::shared_ptr<Array>& dictionary)
+DictionaryType::DictionaryType(const std::shared_ptr<DataType>& index_type,
+    const std::shared_ptr<Array>& dictionary, bool ordered)
     : FixedWidthType(Type::DICTIONARY),
       index_type_(index_type),
-      dictionary_(dictionary) {}
+      dictionary_(dictionary),
+      ordered_(ordered) {}
 
 int DictionaryType::bit_width() const {
   return static_cast<const FixedWidthType*>(index_type_.get())->bit_width();
@@ -178,6 +181,7 @@ ACCEPT_VISITOR(StructType);
 ACCEPT_VISITOR(DecimalType);
 ACCEPT_VISITOR(UnionType);
 ACCEPT_VISITOR(DateType);
+ACCEPT_VISITOR(Date32Type);
 ACCEPT_VISITOR(TimeType);
 ACCEPT_VISITOR(TimestampType);
 ACCEPT_VISITOR(IntervalType);
@@ -205,11 +209,16 @@ TYPE_FACTORY(float64, DoubleType);
 TYPE_FACTORY(utf8, StringType);
 TYPE_FACTORY(binary, BinaryType);
 TYPE_FACTORY(date, DateType);
+TYPE_FACTORY(date32, Date32Type);
 
 std::shared_ptr<DataType> timestamp(TimeUnit unit) {
   return std::make_shared<TimestampType>(unit);
 }
 
+std::shared_ptr<DataType> timestamp(const std::string& timezone, TimeUnit unit) {
+  return std::make_shared<TimestampType>(timezone, unit);
+}
+
 std::shared_ptr<DataType> time(TimeUnit unit) {
   return std::make_shared<TimeType>(unit);
 }
@@ -313,6 +322,7 @@ TYPE_VISITOR_DEFAULT(DoubleType);
 TYPE_VISITOR_DEFAULT(StringType);
 TYPE_VISITOR_DEFAULT(BinaryType);
 TYPE_VISITOR_DEFAULT(DateType);
+TYPE_VISITOR_DEFAULT(Date32Type);
 TYPE_VISITOR_DEFAULT(TimeType);
 TYPE_VISITOR_DEFAULT(TimestampType);
 TYPE_VISITOR_DEFAULT(IntervalType);

http://git-wip-us.apache.org/repos/asf/arrow/blob/6b3ae2ae/cpp/src/arrow/type.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/type.h b/cpp/src/arrow/type.h
index 9b1ab32..aa0d70e 100644
--- a/cpp/src/arrow/type.h
+++ b/cpp/src/arrow/type.h
@@ -67,9 +67,12 @@ struct Type {
     // Variable-length bytes (no guarantee of UTF8-ness)
     BINARY,
 
-    // By default, int32 days since the UNIX epoch
+    // int64_t milliseconds since the UNIX epoch
     DATE,
 
+    // int32_t days since the UNIX epoch
+    DATE32,
+
     // Exact timestamp encoded with int64 since UNIX epoch
     // Default unit millisecond
     TIMESTAMP,
@@ -132,6 +135,7 @@ class ARROW_EXPORT TypeVisitor {
   virtual Status Visit(const StringType& type);
   virtual Status Visit(const BinaryType& type);
   virtual Status Visit(const DateType& type);
+  virtual Status Visit(const Date32Type& type);
   virtual Status Visit(const TimeType& type);
   virtual Status Visit(const TimestampType& type);
   virtual Status Visit(const IntervalType& type);
@@ -425,6 +429,7 @@ struct ARROW_EXPORT UnionType : public DataType {
 // ----------------------------------------------------------------------
 // Date and time types
 
+/// Date as int64_t milliseconds since UNIX epoch
 struct ARROW_EXPORT DateType : public FixedWidthType {
   static constexpr Type::type type_id = Type::DATE;
 
@@ -439,6 +444,20 @@ struct ARROW_EXPORT DateType : public FixedWidthType {
   static std::string name() { return "date"; }
 };
 
+/// Date as int32_t days since UNIX epoch
+struct ARROW_EXPORT Date32Type : public FixedWidthType {
+  static constexpr Type::type type_id = Type::DATE32;
+
+  using c_type = int32_t;
+
+  Date32Type() : FixedWidthType(Type::DATE32) {}
+
+  int bit_width() const override { return static_cast<int>(sizeof(c_type) * 8); }
+
+  Status Accept(TypeVisitor* visitor) const override;
+  std::string ToString() const override;
+};
+
 enum class TimeUnit : char { SECOND = 0, MILLI = 1, MICRO = 2, NANO = 3 };
 
 struct ARROW_EXPORT TimeType : public FixedWidthType {
@@ -467,16 +486,20 @@ struct ARROW_EXPORT TimestampType : public FixedWidthType {
 
   int bit_width() const override { return static_cast<int>(sizeof(int64_t) * 8); }
 
-  TimeUnit unit;
-
   explicit TimestampType(TimeUnit unit = TimeUnit::MILLI)
       : FixedWidthType(Type::TIMESTAMP), unit(unit) {}
 
+  explicit TimestampType(const std::string& timezone, TimeUnit unit = TimeUnit::MILLI)
+      : FixedWidthType(Type::TIMESTAMP), unit(unit), timezone(timezone) {}
+
   TimestampType(const TimestampType& other) : TimestampType(other.unit) {}
 
   Status Accept(TypeVisitor* visitor) const override;
   std::string ToString() const override { return name(); }
   static std::string name() { return "timestamp"; }
+
+  TimeUnit unit;
+  std::string timezone;
 };
 
 struct ARROW_EXPORT IntervalType : public FixedWidthType {
@@ -507,7 +530,7 @@ class ARROW_EXPORT DictionaryType : public FixedWidthType {
   static constexpr Type::type type_id = Type::DICTIONARY;
 
   DictionaryType(const std::shared_ptr<DataType>& index_type,
-      const std::shared_ptr<Array>& dictionary);
+      const std::shared_ptr<Array>& dictionary, bool ordered = false);
 
   int bit_width() const override;
 
@@ -518,11 +541,13 @@ class ARROW_EXPORT DictionaryType : public FixedWidthType {
   Status Accept(TypeVisitor* visitor) const override;
   std::string ToString() const override;
 
+  bool ordered() const { return ordered_; }
+
  private:
   // Must be an integer type (not currently checked)
   std::shared_ptr<DataType> index_type_;
-
   std::shared_ptr<Array> dictionary_;
+  bool ordered_;
 };
 
 // ----------------------------------------------------------------------
@@ -532,6 +557,8 @@ std::shared_ptr<DataType> ARROW_EXPORT list(const std::shared_ptr<Field>& value_
 std::shared_ptr<DataType> ARROW_EXPORT list(const std::shared_ptr<DataType>& value_type);
 
 std::shared_ptr<DataType> ARROW_EXPORT timestamp(TimeUnit unit);
+std::shared_ptr<DataType> ARROW_EXPORT timestamp(
+    const std::string& timezone, TimeUnit unit);
 std::shared_ptr<DataType> ARROW_EXPORT time(TimeUnit unit);
 
 std::shared_ptr<DataType> ARROW_EXPORT struct_(
@@ -595,6 +622,7 @@ static inline bool is_primitive(Type::type type_id) {
     case Type::FLOAT:
     case Type::DOUBLE:
     case Type::DATE:
+    case Type::DATE32:
     case Type::TIMESTAMP:
     case Type::TIME:
     case Type::INTERVAL:

http://git-wip-us.apache.org/repos/asf/arrow/blob/6b3ae2ae/cpp/src/arrow/type_fwd.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/type_fwd.h b/cpp/src/arrow/type_fwd.h
index fc4ad3d..e53afe1 100644
--- a/cpp/src/arrow/type_fwd.h
+++ b/cpp/src/arrow/type_fwd.h
@@ -95,6 +95,10 @@ struct DateType;
 using DateArray = NumericArray<DateType>;
 using DateBuilder = NumericBuilder<DateType>;
 
+struct Date32Type;
+using Date32Array = NumericArray<Date32Type>;
+using Date32Builder = NumericBuilder<Date32Type>;
+
 struct TimeType;
 using TimeArray = NumericArray<TimeType>;
 using TimeBuilder = NumericBuilder<TimeType>;
@@ -125,6 +129,7 @@ std::shared_ptr<DataType> ARROW_EXPORT float64();
 std::shared_ptr<DataType> ARROW_EXPORT utf8();
 std::shared_ptr<DataType> ARROW_EXPORT binary();
 std::shared_ptr<DataType> ARROW_EXPORT date();
+std::shared_ptr<DataType> ARROW_EXPORT date32();
 
 }  // namespace arrow
 

http://git-wip-us.apache.org/repos/asf/arrow/blob/6b3ae2ae/cpp/src/arrow/type_traits.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/type_traits.h b/cpp/src/arrow/type_traits.h
index d6687c1..2cd1420 100644
--- a/cpp/src/arrow/type_traits.h
+++ b/cpp/src/arrow/type_traits.h
@@ -131,6 +131,18 @@ struct TypeTraits<DateType> {
 };
 
 template <>
+struct TypeTraits<Date32Type> {
+  using ArrayType = Date32Array;
+  using BuilderType = Date32Builder;
+
+  static inline int64_t bytes_required(int64_t elements) {
+    return elements * sizeof(int32_t);
+  }
+  constexpr static bool is_parameter_free = true;
+  static inline std::shared_ptr<DataType> type_singleton() { return date32(); }
+};
+
+template <>
 struct TypeTraits<TimestampType> {
   using ArrayType = TimestampArray;
   // using BuilderType = TimestampBuilder;

http://git-wip-us.apache.org/repos/asf/arrow/blob/6b3ae2ae/python/pyarrow/array.pyx
----------------------------------------------------------------------
diff --git a/python/pyarrow/array.pyx b/python/pyarrow/array.pyx
index 7787e95..6a6b4ba 100644
--- a/python/pyarrow/array.pyx
+++ b/python/pyarrow/array.pyx
@@ -54,7 +54,8 @@ cdef class Array:
         self.type.init(self.sp_array.get().type())
 
     @staticmethod
-    def from_pandas(obj, mask=None, timestamps_to_ms=False, Field field=None, MemoryPool memory_pool=None):
+    def from_pandas(obj, mask=None, timestamps_to_ms=False, Field field=None,
+                    MemoryPool memory_pool=None):
         """
         Convert pandas.Series to an Arrow Array.
 
@@ -75,8 +76,9 @@ cdef class Array:
 
         Notes
         -----
-        Localized timestamps will currently be returned as UTC (pandas's native representation).
-        Timezone-naive data will be implicitly interpreted as UTC.
+        Localized timestamps will currently be returned as UTC (pandas's native
+        representation).  Timezone-naive data will be implicitly interpreted as
+        UTC.
 
         Examples
         --------
@@ -119,9 +121,9 @@ cdef class Array:
         series_values = get_series_values(obj)
 
         if isinstance(series_values, pd.Categorical):
-            return DictionaryArray.from_arrays(series_values.codes,
-                                               series_values.categories.values,
-                                               mask=mask, memory_pool=memory_pool)
+            return DictionaryArray.from_arrays(
+                series_values.codes, series_values.categories.values,
+                mask=mask, memory_pool=memory_pool)
         else:
             if series_values.dtype.type == np.datetime64 and timestamps_to_ms:
                 series_values = series_values.astype('datetime64[ms]')
@@ -134,7 +136,8 @@ cdef class Array:
             return box_array(out)
 
     @staticmethod
-    def from_list(object list_obj, DataType type=None, MemoryPool memory_pool=None):
+    def from_list(object list_obj, DataType type=None,
+                  MemoryPool memory_pool=None):
         """
         Convert Python list to Arrow array
 
@@ -358,7 +361,8 @@ cdef class BinaryArray(Array):
 cdef class DictionaryArray(Array):
 
     @staticmethod
-    def from_arrays(indices, dictionary, mask=None, MemoryPool memory_pool=None):
+    def from_arrays(indices, dictionary, mask=None,
+                    MemoryPool memory_pool=None):
         """
         Construct Arrow DictionaryArray from array of indices (must be
         non-negative integers) and corresponding array of dictionary values
@@ -380,8 +384,15 @@ cdef class DictionaryArray(Array):
             shared_ptr[CDataType] c_type
             shared_ptr[CArray] c_result
 
-        arrow_indices = Array.from_pandas(indices, mask=mask, memory_pool=memory_pool)
-        arrow_dictionary = Array.from_pandas(dictionary, memory_pool=memory_pool)
+        if mask is None:
+            mask = indices == -1
+        else:
+            mask = mask | (indices == -1)
+
+        arrow_indices = Array.from_pandas(indices, mask=mask,
+                                          memory_pool=memory_pool)
+        arrow_dictionary = Array.from_pandas(dictionary,
+                                             memory_pool=memory_pool)
 
         if not isinstance(arrow_indices, IntegerArray):
             raise ValueError('Indices must be integer type')

http://git-wip-us.apache.org/repos/asf/arrow/blob/6b3ae2ae/python/pyarrow/table.pyx
----------------------------------------------------------------------
diff --git a/python/pyarrow/table.pyx b/python/pyarrow/table.pyx
index 93bc6dd..ad5af1b 100644
--- a/python/pyarrow/table.pyx
+++ b/python/pyarrow/table.pyx
@@ -359,7 +359,8 @@ cdef class RecordBatch:
         """
         Number of rows
 
-        Due to the definition of a RecordBatch, all columns have the same number of rows.
+        Due to the definition of a RecordBatch, all columns have the same
+        number of rows.
 
         Returns
         -------

http://git-wip-us.apache.org/repos/asf/arrow/blob/6b3ae2ae/python/pyarrow/tests/test_convert_pandas.py
----------------------------------------------------------------------
diff --git a/python/pyarrow/tests/test_convert_pandas.py b/python/pyarrow/tests/test_convert_pandas.py
index 960653d..953fa2c 100644
--- a/python/pyarrow/tests/test_convert_pandas.py
+++ b/python/pyarrow/tests/test_convert_pandas.py
@@ -81,7 +81,11 @@ class TestPandasConversion(unittest.TestCase):
         arr = A.Array.from_pandas(values, timestamps_to_ms=timestamps_to_ms,
                                   field=field)
         result = arr.to_pandas()
-        tm.assert_series_equal(pd.Series(result), pd.Series(values), check_names=False)
+
+        assert arr.null_count == pd.isnull(values).sum()
+
+        tm.assert_series_equal(pd.Series(result), pd.Series(values),
+                               check_names=False)
 
     def test_float_no_nulls(self):
         data = {}

http://git-wip-us.apache.org/repos/asf/arrow/blob/6b3ae2ae/python/src/pyarrow/adapters/pandas.cc
----------------------------------------------------------------------
diff --git a/python/src/pyarrow/adapters/pandas.cc b/python/src/pyarrow/adapters/pandas.cc
index cadb53e..c707ada 100644
--- a/python/src/pyarrow/adapters/pandas.cc
+++ b/python/src/pyarrow/adapters/pandas.cc
@@ -34,6 +34,7 @@
 #include <unordered_map>
 
 #include "arrow/api.h"
+#include "arrow/loader.h"
 #include "arrow/status.h"
 #include "arrow/type_fwd.h"
 #include "arrow/type_traits.h"
@@ -610,6 +611,7 @@ class PandasBlock {
     DOUBLE,
     BOOL,
     DATETIME,
+    DATETIME_WITH_TZ,
     CATEGORICAL
   };
 
@@ -1157,7 +1159,7 @@ class DataFrameBlockCreator {
       }
 
       int block_placement = 0;
-      if (column_type == Type::DICTIONARY) {
+      if (output_type == PandasBlock::CATEGORICAL) {
         std::shared_ptr<PandasBlock> block;
         RETURN_NOT_OK(MakeCategoricalBlock(col->type(), table_->num_rows(), &block));
         categorical_blocks_[i] = block;
@@ -1518,15 +1520,16 @@ inline Status ArrowSerializer<TYPE>::Convert(std::shared_ptr<Array>* out) {
     null_count = ValuesToBitmap<TYPE>(PyArray_DATA(arr_), length_, null_bitmap_data_);
   }
 
-  // For readability
-  constexpr int64_t kOffset = 0;
-
   RETURN_NOT_OK(ConvertData());
   std::shared_ptr<DataType> type;
   RETURN_NOT_OK(MakeDataType(&type));
-  RETURN_NOT_OK(
-      MakePrimitiveArray(type, length_, data_, null_bitmap_, null_count, kOffset, out));
-  return Status::OK();
+
+  std::vector<arrow::FieldMetadata> fields(1);
+  fields[0].length = length_;
+  fields[0].null_count = null_count;
+  fields[0].offset = 0;
+
+  return arrow::LoadArray(type, fields, {null_bitmap_, data_}, out);
 }
 
 template <>