You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2017/07/11 05:39:26 UTC

[1/2] arrow git commit: ARROW-1199: [C++] Implement mutable POD struct for Array data

Repository: arrow
Updated Branches:
  refs/heads/master ad57ea8ec -> 845207118


http://git-wip-us.apache.org/repos/asf/arrow/blob/84520711/cpp/src/arrow/ipc/reader.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/reader.cc b/cpp/src/arrow/ipc/reader.cc
index ea16bf0..ae46207 100644
--- a/cpp/src/arrow/ipc/reader.cc
+++ b/cpp/src/arrow/ipc/reader.cc
@@ -35,6 +35,7 @@
 #include "arrow/tensor.h"
 #include "arrow/type.h"
 #include "arrow/util/logging.h"
+#include "arrow/visitor_inline.h"
 
 namespace arrow {
 
@@ -45,12 +46,13 @@ namespace ipc {
 // ----------------------------------------------------------------------
 // Record batch read path
 
-class IpcComponentSource : public ArrayComponentSource {
+/// Accessor class for flatbuffers metadata
+class IpcComponentSource {
  public:
   IpcComponentSource(const flatbuf::RecordBatch* metadata, io::RandomAccessFile* file)
       : metadata_(metadata), file_(file) {}
 
-  Status GetBuffer(int buffer_index, std::shared_ptr<Buffer>* out) override {
+  Status GetBuffer(int buffer_index, std::shared_ptr<Buffer>* out) {
     const flatbuf::Buffer* buffer = metadata_->buffers()->Get(buffer_index);
 
     if (buffer->length() == 0) {
@@ -61,7 +63,7 @@ class IpcComponentSource : public ArrayComponentSource {
     }
   }
 
-  Status GetFieldMetadata(int field_index, FieldMetadata* field) override {
+  Status GetFieldMetadata(int field_index, internal::ArrayData* out) {
     auto nodes = metadata_->nodes();
     // pop off a field
     if (field_index >= static_cast<int>(nodes->size())) {
@@ -69,9 +71,9 @@ class IpcComponentSource : public ArrayComponentSource {
     }
     const flatbuf::FieldNode* node = nodes->Get(field_index);
 
-    field->length = node->length();
-    field->null_count = node->null_count();
-    field->offset = 0;
+    out->length = node->length();
+    out->null_count = node->null_count();
+    out->offset = 0;
     return Status::OK();
   }
 
@@ -80,26 +82,204 @@ class IpcComponentSource : public ArrayComponentSource {
   io::RandomAccessFile* file_;
 };
 
+/// Bookkeeping struct for loading array objects from their constituent pieces of raw data
+///
+/// The field_index and buffer_index are incremented in the ArrayLoader
+/// based on how much of the batch is "consumed" (through nested data
+/// reconstruction, for example)
+struct ArrayLoaderContext {
+  IpcComponentSource* source;
+  int buffer_index;
+  int field_index;
+  int max_recursion_depth;
+};
+
+static Status LoadArray(const std::shared_ptr<DataType>& type,
+    ArrayLoaderContext* context, internal::ArrayData* out);
+
+class ArrayLoader {
+ public:
+  ArrayLoader(const std::shared_ptr<DataType>& type, internal::ArrayData* out,
+      ArrayLoaderContext* context)
+      : type_(type), context_(context), out_(out) {}
+
+  Status Load() {
+    if (context_->max_recursion_depth <= 0) {
+      return Status::Invalid("Max recursion depth reached");
+    }
+
+    out_->type = type_;
+
+    RETURN_NOT_OK(VisitTypeInline(*type_, this));
+    return Status::OK();
+  }
+
+  Status GetBuffer(int buffer_index, std::shared_ptr<Buffer>* out) {
+    return context_->source->GetBuffer(buffer_index, out);
+  }
+
+  Status LoadCommon() {
+    // This only contains the length and null count, which we need to figure
+    // out what to do with the buffers. For example, if null_count == 0, then
+    // we can skip that buffer without reading from shared memory
+    RETURN_NOT_OK(context_->source->GetFieldMetadata(context_->field_index++, out_));
+
+    // extract null_bitmap which is common to all arrays
+    if (out_->null_count == 0) {
+      out_->buffers[0] = nullptr;
+    } else {
+      RETURN_NOT_OK(GetBuffer(context_->buffer_index, &out_->buffers[0]));
+    }
+    context_->buffer_index++;
+    return Status::OK();
+  }
+
+  template <typename TYPE>
+  Status LoadPrimitive() {
+    out_->buffers.resize(2);
+
+    RETURN_NOT_OK(LoadCommon());
+    if (out_->length > 0) {
+      RETURN_NOT_OK(GetBuffer(context_->buffer_index++, &out_->buffers[1]));
+    } else {
+      context_->buffer_index++;
+      out_->buffers[1].reset(new Buffer(nullptr, 0));
+    }
+    return Status::OK();
+  }
+
+  template <typename TYPE>
+  Status LoadBinary() {
+    out_->buffers.resize(3);
+
+    RETURN_NOT_OK(LoadCommon());
+    RETURN_NOT_OK(GetBuffer(context_->buffer_index++, &out_->buffers[1]));
+    return GetBuffer(context_->buffer_index++, &out_->buffers[2]);
+  }
+
+  Status LoadChild(const Field& field, internal::ArrayData* out) {
+    ArrayLoader loader(field.type(), out, context_);
+    --context_->max_recursion_depth;
+    RETURN_NOT_OK(loader.Load());
+    ++context_->max_recursion_depth;
+    return Status::OK();
+  }
+
+  Status LoadChildren(std::vector<std::shared_ptr<Field>> child_fields) {
+    out_->child_data.reserve(static_cast<int>(child_fields.size()));
+
+    for (const auto& child_field : child_fields) {
+      auto field_array = std::make_shared<internal::ArrayData>();
+      RETURN_NOT_OK(LoadChild(*child_field.get(), field_array.get()));
+      out_->child_data.emplace_back(field_array);
+    }
+    return Status::OK();
+  }
+
+  Status Visit(const NullType& type) { return Status::NotImplemented("null"); }
+
+  Status Visit(const DecimalType& type) { return Status::NotImplemented("decimal"); }
+
+  template <typename T>
+  typename std::enable_if<std::is_base_of<FixedWidthType, T>::value &&
+                              !std::is_base_of<FixedSizeBinaryType, T>::value &&
+                              !std::is_base_of<DictionaryType, T>::value,
+      Status>::type
+  Visit(const T& type) {
+    return LoadPrimitive<T>();
+  }
+
+  template <typename T>
+  typename std::enable_if<std::is_base_of<BinaryType, T>::value, Status>::type Visit(
+      const T& type) {
+    return LoadBinary<T>();
+  }
+
+  Status Visit(const FixedSizeBinaryType& type) {
+    out_->buffers.resize(2);
+    RETURN_NOT_OK(LoadCommon());
+    return GetBuffer(context_->buffer_index++, &out_->buffers[1]);
+  }
+
+  Status Visit(const ListType& type) {
+    out_->buffers.resize(2);
+
+    RETURN_NOT_OK(LoadCommon());
+    RETURN_NOT_OK(GetBuffer(context_->buffer_index++, &out_->buffers[1]));
+
+    const int num_children = type.num_children();
+    if (num_children != 1) {
+      std::stringstream ss;
+      ss << "Wrong number of children: " << num_children;
+      return Status::Invalid(ss.str());
+    }
+
+    return LoadChildren(type.children());
+  }
+
+  Status Visit(const StructType& type) {
+    out_->buffers.resize(1);
+    RETURN_NOT_OK(LoadCommon());
+    return LoadChildren(type.children());
+  }
+
+  Status Visit(const UnionType& type) {
+    out_->buffers.resize(3);
+
+    RETURN_NOT_OK(LoadCommon());
+    if (out_->length > 0) {
+      RETURN_NOT_OK(GetBuffer(context_->buffer_index, &out_->buffers[1]));
+      if (type.mode() == UnionMode::DENSE) {
+        RETURN_NOT_OK(GetBuffer(context_->buffer_index + 1, &out_->buffers[2]));
+      }
+    }
+    context_->buffer_index += type.mode() == UnionMode::DENSE ? 2 : 1;
+    return LoadChildren(type.children());
+  }
+
+  Status Visit(const DictionaryType& type) {
+    RETURN_NOT_OK(LoadArray(type.index_type(), context_, out_));
+    out_->type = type_;
+    return Status::OK();
+  }
+
+ private:
+  const std::shared_ptr<DataType>& type_;
+  ArrayLoaderContext* context_;
+
+  // Used in visitor pattern
+  internal::ArrayData* out_;
+};
+
+static Status LoadArray(const std::shared_ptr<DataType>& type,
+    ArrayLoaderContext* context, internal::ArrayData* out) {
+  ArrayLoader loader(type, out, context);
+  return loader.Load();
+}
+
 Status ReadRecordBatch(const Message& metadata, const std::shared_ptr<Schema>& schema,
     io::RandomAccessFile* file, std::shared_ptr<RecordBatch>* out) {
   return ReadRecordBatch(metadata, schema, kMaxNestingDepth, file, out);
 }
 
+// ----------------------------------------------------------------------
+// Array loading
+
 static Status LoadRecordBatchFromSource(const std::shared_ptr<Schema>& schema,
-    int64_t num_rows, int max_recursion_depth, ArrayComponentSource* source,
+    int64_t num_rows, int max_recursion_depth, IpcComponentSource* source,
     std::shared_ptr<RecordBatch>* out) {
-  std::vector<std::shared_ptr<Array>> arrays(schema->num_fields());
-
   ArrayLoaderContext context;
   context.source = source;
   context.field_index = 0;
   context.buffer_index = 0;
   context.max_recursion_depth = max_recursion_depth;
 
+  std::vector<std::shared_ptr<internal::ArrayData>> arrays(schema->num_fields());
   for (int i = 0; i < schema->num_fields(); ++i) {
-    RETURN_NOT_OK(LoadArray(schema->field(i)->type(), &context, &arrays[i]));
-    DCHECK_EQ(num_rows, arrays[i]->length())
-        << "Array length did not match record batch length";
+    auto arr = std::make_shared<internal::ArrayData>();
+    RETURN_NOT_OK(LoadArray(schema->field(i)->type(), &context, arr.get()));
+    DCHECK_EQ(num_rows, arr->length) << "Array length did not match record batch length";
+    arrays[i] = std::move(arr);
   }
 
   *out = std::make_shared<RecordBatch>(schema, num_rows, std::move(arrays));

http://git-wip-us.apache.org/repos/asf/arrow/blob/84520711/cpp/src/arrow/ipc/test-common.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/test-common.h b/cpp/src/arrow/ipc/test-common.h
index 747aca0..6fdf1cc 100644
--- a/cpp/src/arrow/ipc/test-common.h
+++ b/cpp/src/arrow/ipc/test-common.h
@@ -86,7 +86,9 @@ static inline void CompareArraysDetailed(
 static inline void CompareBatchColumnsDetailed(
     const RecordBatch& result, const RecordBatch& expected) {
   for (int i = 0; i < expected.num_columns(); ++i) {
-    CompareArraysDetailed(i, *result.column(i), *expected.column(i));
+    auto left = result.column(i);
+    auto right = expected.column(i);
+    CompareArraysDetailed(i, *left, *right);
   }
 }
 
@@ -471,7 +473,7 @@ Status MakeDictionary(std::shared_ptr<RecordBatch>* out) {
   RETURN_NOT_OK(test::GetBitmapFromBoolVector(is_valid, &null_bitmap));
 
   std::shared_ptr<Array> a3 = std::make_shared<ListArray>(f3_type, length,
-      std::static_pointer_cast<PrimitiveArray>(offsets)->data(),
+      std::static_pointer_cast<PrimitiveArray>(offsets)->values(),
       std::make_shared<DictionaryArray>(f1_type, indices3), null_bitmap, 1);
 
   // Dictionary-encoded list of integer
@@ -487,7 +489,7 @@ Status MakeDictionary(std::shared_ptr<RecordBatch>* out) {
   ArrayFromVector<Int8Type, int8_t>(std::vector<bool>(3, true), list_values4, &values4);
 
   auto dict3 = std::make_shared<ListArray>(f4_value_type, 3,
-      std::static_pointer_cast<PrimitiveArray>(offsets4)->data(), values4);
+      std::static_pointer_cast<PrimitiveArray>(offsets4)->values(), values4);
 
   std::vector<int8_t> indices4_values = {0, 1, 2, 0, 1, 2};
   ArrayFromVector<Int8Type, int8_t>(is_valid, indices4_values, &indices4);

http://git-wip-us.apache.org/repos/asf/arrow/blob/84520711/cpp/src/arrow/ipc/writer.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/writer.cc b/cpp/src/arrow/ipc/writer.cc
index 60b1f47..592bca2 100644
--- a/cpp/src/arrow/ipc/writer.cc
+++ b/cpp/src/arrow/ipc/writer.cc
@@ -30,7 +30,6 @@
 #include "arrow/io/memory.h"
 #include "arrow/ipc/metadata.h"
 #include "arrow/ipc/util.h"
-#include "arrow/loader.h"
 #include "arrow/memory_pool.h"
 #include "arrow/status.h"
 #include "arrow/table.h"
@@ -233,7 +232,7 @@ class RecordBatchSerializer : public ArrayVisitor {
  protected:
   template <typename ArrayType>
   Status VisitFixedWidth(const ArrayType& array) {
-    std::shared_ptr<Buffer> data = array.data();
+    std::shared_ptr<Buffer> data = array.values();
 
     const auto& fw_type = static_cast<const FixedWidthType&>(*array.type());
     const int64_t type_width = fw_type.bit_width() / 8;
@@ -287,7 +286,7 @@ class RecordBatchSerializer : public ArrayVisitor {
   Status VisitBinary(const BinaryArray& array) {
     std::shared_ptr<Buffer> value_offsets;
     RETURN_NOT_OK(GetZeroBasedValueOffsets<BinaryArray>(array, &value_offsets));
-    auto data = array.data();
+    auto data = array.value_data();
 
     int64_t total_data_bytes = 0;
     if (value_offsets) {
@@ -309,7 +308,7 @@ class RecordBatchSerializer : public ArrayVisitor {
   Status Visit(const BooleanArray& array) override {
     std::shared_ptr<Buffer> data;
     RETURN_NOT_OK(
-        GetTruncatedBitmap(array.offset(), array.length(), array.data(), pool_, &data));
+        GetTruncatedBitmap(array.offset(), array.length(), array.values(), pool_, &data));
     buffers_.push_back(data);
     return Status::OK();
   }
@@ -367,7 +366,8 @@ class RecordBatchSerializer : public ArrayVisitor {
 
   Status Visit(const StructArray& array) override {
     --max_recursion_depth_;
-    for (std::shared_ptr<Array> field : array.fields()) {
+    for (int i = 0; i < array.num_fields(); ++i) {
+      std::shared_ptr<Array> field = array.field(i);
       if (array.offset() != 0 || array.length() < field->length()) {
         // If offset is non-zero, slice the child array
         field = field->Slice(array.offset(), array.length());
@@ -450,7 +450,9 @@ class RecordBatchSerializer : public ArrayVisitor {
         RETURN_NOT_OK(VisitArray(*child));
       }
     } else {
-      for (std::shared_ptr<Array> child : array.children()) {
+      for (int i = 0; i < array.num_fields(); ++i) {
+        std::shared_ptr<Array> child = array.child(i);
+
         // Sparse union, slicing is simpler
         if (offset != 0 || length < child->length()) {
           // If offset is non-zero, slice the child array

http://git-wip-us.apache.org/repos/asf/arrow/blob/84520711/cpp/src/arrow/loader.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/loader.cc b/cpp/src/arrow/loader.cc
deleted file mode 100644
index e4e1ba4..0000000
--- a/cpp/src/arrow/loader.cc
+++ /dev/null
@@ -1,297 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "arrow/loader.h"
-
-#include <cstdint>
-#include <memory>
-#include <sstream>
-#include <vector>
-
-#include "arrow/array.h"
-#include "arrow/buffer.h"
-#include "arrow/status.h"
-#include "arrow/type.h"
-#include "arrow/type_traits.h"
-#include "arrow/util/logging.h"
-#include "arrow/util/visibility.h"
-#include "arrow/visitor_inline.h"
-
-namespace arrow {
-
-class ArrayLoader {
- public:
-  ArrayLoader(const std::shared_ptr<DataType>& type, ArrayLoaderContext* context)
-      : type_(type), context_(context) {}
-
-  Status Load(std::shared_ptr<Array>* out) {
-    if (context_->max_recursion_depth <= 0) {
-      return Status::Invalid("Max recursion depth reached");
-    }
-
-    RETURN_NOT_OK(VisitTypeInline(*type_, this));
-
-    *out = std::move(result_);
-    return Status::OK();
-  }
-
-  Status GetBuffer(int buffer_index, std::shared_ptr<Buffer>* out) {
-    return context_->source->GetBuffer(buffer_index, out);
-  }
-
-  Status LoadCommon(FieldMetadata* field_meta, std::shared_ptr<Buffer>* null_bitmap) {
-    // This only contains the length and null count, which we need to figure
-    // out what to do with the buffers. For example, if null_count == 0, then
-    // we can skip that buffer without reading from shared memory
-    RETURN_NOT_OK(
-        context_->source->GetFieldMetadata(context_->field_index++, field_meta));
-
-    // extract null_bitmap which is common to all arrays
-    if (field_meta->null_count == 0) {
-      *null_bitmap = nullptr;
-    } else {
-      RETURN_NOT_OK(GetBuffer(context_->buffer_index, null_bitmap));
-    }
-    context_->buffer_index++;
-    return Status::OK();
-  }
-
-  template <typename TYPE>
-  Status LoadPrimitive() {
-    using ArrayType = typename TypeTraits<TYPE>::ArrayType;
-
-    FieldMetadata field_meta;
-    std::shared_ptr<Buffer> null_bitmap, data;
-
-    RETURN_NOT_OK(LoadCommon(&field_meta, &null_bitmap));
-    if (field_meta.length > 0) {
-      RETURN_NOT_OK(GetBuffer(context_->buffer_index++, &data));
-    } else {
-      context_->buffer_index++;
-      data.reset(new Buffer(nullptr, 0));
-    }
-    result_ = std::make_shared<ArrayType>(type_, field_meta.length, data, null_bitmap,
-        field_meta.null_count, field_meta.offset);
-    return Status::OK();
-  }
-
-  template <typename TYPE>
-  Status LoadBinary() {
-    using CONTAINER = typename TypeTraits<TYPE>::ArrayType;
-
-    FieldMetadata field_meta;
-    std::shared_ptr<Buffer> null_bitmap, offsets, values;
-
-    RETURN_NOT_OK(LoadCommon(&field_meta, &null_bitmap));
-    RETURN_NOT_OK(GetBuffer(context_->buffer_index++, &offsets));
-    RETURN_NOT_OK(GetBuffer(context_->buffer_index++, &values));
-
-    result_ = std::make_shared<CONTAINER>(
-        field_meta.length, offsets, values, null_bitmap, field_meta.null_count);
-    return Status::OK();
-  }
-
-  Status LoadChild(const Field& field, std::shared_ptr<Array>* out) {
-    ArrayLoader loader(field.type(), context_);
-    --context_->max_recursion_depth;
-    RETURN_NOT_OK(loader.Load(out));
-    ++context_->max_recursion_depth;
-    return Status::OK();
-  }
-
-  Status LoadChildren(std::vector<std::shared_ptr<Field>> child_fields,
-      std::vector<std::shared_ptr<Array>>* arrays) {
-    arrays->reserve(static_cast<int>(child_fields.size()));
-
-    for (const auto& child_field : child_fields) {
-      std::shared_ptr<Array> field_array;
-      RETURN_NOT_OK(LoadChild(*child_field.get(), &field_array));
-      arrays->emplace_back(field_array);
-    }
-    return Status::OK();
-  }
-
-  Status Visit(const NullType& type) { return Status::NotImplemented("null"); }
-
-  Status Visit(const DecimalType& type) { return Status::NotImplemented("decimal"); }
-
-  template <typename T>
-  typename std::enable_if<std::is_base_of<FixedWidthType, T>::value &&
-                              !std::is_base_of<FixedSizeBinaryType, T>::value &&
-                              !std::is_base_of<DictionaryType, T>::value,
-      Status>::type
-  Visit(const T& type) {
-    return LoadPrimitive<T>();
-  }
-
-  template <typename T>
-  typename std::enable_if<std::is_base_of<BinaryType, T>::value, Status>::type Visit(
-      const T& type) {
-    return LoadBinary<T>();
-  }
-
-  Status Visit(const FixedSizeBinaryType& type) {
-    FieldMetadata field_meta;
-    std::shared_ptr<Buffer> null_bitmap, data;
-
-    RETURN_NOT_OK(LoadCommon(&field_meta, &null_bitmap));
-    RETURN_NOT_OK(GetBuffer(context_->buffer_index++, &data));
-
-    result_ = std::make_shared<FixedSizeBinaryArray>(
-        type_, field_meta.length, data, null_bitmap, field_meta.null_count);
-    return Status::OK();
-  }
-
-  Status Visit(const ListType& type) {
-    FieldMetadata field_meta;
-    std::shared_ptr<Buffer> null_bitmap, offsets;
-
-    RETURN_NOT_OK(LoadCommon(&field_meta, &null_bitmap));
-    RETURN_NOT_OK(GetBuffer(context_->buffer_index++, &offsets));
-
-    const int num_children = type.num_children();
-    if (num_children != 1) {
-      std::stringstream ss;
-      ss << "Wrong number of children: " << num_children;
-      return Status::Invalid(ss.str());
-    }
-    std::shared_ptr<Array> values_array;
-
-    RETURN_NOT_OK(LoadChild(*type.child(0).get(), &values_array));
-
-    result_ = std::make_shared<ListArray>(type_, field_meta.length, offsets, values_array,
-        null_bitmap, field_meta.null_count);
-    return Status::OK();
-  }
-
-  Status Visit(const StructType& type) {
-    FieldMetadata field_meta;
-    std::shared_ptr<Buffer> null_bitmap;
-    RETURN_NOT_OK(LoadCommon(&field_meta, &null_bitmap));
-
-    std::vector<std::shared_ptr<Array>> fields;
-    RETURN_NOT_OK(LoadChildren(type.children(), &fields));
-
-    result_ = std::make_shared<StructArray>(
-        type_, field_meta.length, fields, null_bitmap, field_meta.null_count);
-    return Status::OK();
-  }
-
-  Status Visit(const UnionType& type) {
-    FieldMetadata field_meta;
-    std::shared_ptr<Buffer> null_bitmap, type_ids, offsets;
-
-    RETURN_NOT_OK(LoadCommon(&field_meta, &null_bitmap));
-    if (field_meta.length > 0) {
-      RETURN_NOT_OK(GetBuffer(context_->buffer_index, &type_ids));
-      if (type.mode() == UnionMode::DENSE) {
-        RETURN_NOT_OK(GetBuffer(context_->buffer_index + 1, &offsets));
-      }
-    }
-    context_->buffer_index += type.mode() == UnionMode::DENSE ? 2 : 1;
-
-    std::vector<std::shared_ptr<Array>> fields;
-    RETURN_NOT_OK(LoadChildren(type.children(), &fields));
-
-    result_ = std::make_shared<UnionArray>(type_, field_meta.length, fields, type_ids,
-        offsets, null_bitmap, field_meta.null_count);
-    return Status::OK();
-  }
-
-  Status Visit(const DictionaryType& type) {
-    std::shared_ptr<Array> indices;
-    RETURN_NOT_OK(LoadArray(type.index_type(), context_, &indices));
-    result_ = std::make_shared<DictionaryArray>(type_, indices);
-    return Status::OK();
-  }
-
-  std::shared_ptr<Array> result() const { return result_; }
-
- private:
-  const std::shared_ptr<DataType> type_;
-  ArrayLoaderContext* context_;
-
-  // Used in visitor pattern
-  std::shared_ptr<Array> result_;
-};
-
-Status LoadArray(const std::shared_ptr<DataType>& type, ArrayComponentSource* source,
-    std::shared_ptr<Array>* out) {
-  ArrayLoaderContext context;
-  context.source = source;
-  context.field_index = context.buffer_index = 0;
-  context.max_recursion_depth = kMaxNestingDepth;
-  return LoadArray(type, &context, out);
-}
-
-Status LoadArray(const std::shared_ptr<DataType>& type, ArrayLoaderContext* context,
-    std::shared_ptr<Array>* out) {
-  ArrayLoader loader(type, context);
-  RETURN_NOT_OK(loader.Load(out));
-
-  return Status::OK();
-}
-
-class InMemorySource : public ArrayComponentSource {
- public:
-  InMemorySource(const std::vector<FieldMetadata>& fields,
-      const std::vector<std::shared_ptr<Buffer>>& buffers)
-      : fields_(fields), buffers_(buffers) {}
-
-  Status GetBuffer(int buffer_index, std::shared_ptr<Buffer>* out) {
-    DCHECK(buffer_index < static_cast<int>(buffers_.size()));
-    *out = buffers_[buffer_index];
-    return Status::OK();
-  }
-
-  Status GetFieldMetadata(int field_index, FieldMetadata* metadata) {
-    DCHECK(field_index < static_cast<int>(fields_.size()));
-    *metadata = fields_[field_index];
-    return Status::OK();
-  }
-
- private:
-  const std::vector<FieldMetadata>& fields_;
-  const std::vector<std::shared_ptr<Buffer>>& buffers_;
-};
-
-Status LoadArray(const std::shared_ptr<DataType>& type,
-    const std::vector<FieldMetadata>& fields,
-    const std::vector<std::shared_ptr<Buffer>>& buffers, std::shared_ptr<Array>* out) {
-  InMemorySource source(fields, buffers);
-  return LoadArray(type, &source, out);
-}
-
-Status MakePrimitiveArray(const std::shared_ptr<DataType>& type, int64_t length,
-    const std::shared_ptr<Buffer>& data, const std::shared_ptr<Buffer>& null_bitmap,
-    int64_t null_count, int64_t offset, std::shared_ptr<Array>* out) {
-  std::vector<std::shared_ptr<Buffer>> buffers = {null_bitmap, data};
-  return MakePrimitiveArray(type, buffers, length, null_count, offset, out);
-}
-
-Status MakePrimitiveArray(const std::shared_ptr<DataType>& type,
-    const std::vector<std::shared_ptr<Buffer>>& buffers, int64_t length,
-    int64_t null_count, int64_t offset, std::shared_ptr<Array>* out) {
-  std::vector<FieldMetadata> fields(1);
-  fields[0].length = length;
-  fields[0].null_count = null_count;
-  fields[0].offset = offset;
-
-  return LoadArray(type, fields, buffers, out);
-}
-
-}  // namespace arrow

http://git-wip-us.apache.org/repos/asf/arrow/blob/84520711/cpp/src/arrow/loader.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/loader.h b/cpp/src/arrow/loader.h
deleted file mode 100644
index f5e3995..0000000
--- a/cpp/src/arrow/loader.h
+++ /dev/null
@@ -1,124 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-// Function for constructing Array array objects from metadata and raw memory
-// buffers
-
-#ifndef ARROW_LOADER_H
-#define ARROW_LOADER_H
-
-#include <cstdint>
-#include <memory>
-#include <string>
-#include <vector>
-
-#include "arrow/status.h"
-#include "arrow/util/visibility.h"
-
-namespace arrow {
-
-class Array;
-class Buffer;
-class DataType;
-
-// ARROW-109: We set this number arbitrarily to help catch user mistakes. For
-// deeply nested schemas, it is expected the user will indicate explicitly the
-// maximum allowed recursion depth
-constexpr int kMaxNestingDepth = 64;
-
-struct ARROW_EXPORT FieldMetadata {
-  FieldMetadata() {}
-  FieldMetadata(int64_t length, int64_t null_count, int64_t offset)
-      : length(length), null_count(null_count), offset(offset) {}
-
-  FieldMetadata(const FieldMetadata& other) {
-    this->length = other.length;
-    this->null_count = other.null_count;
-    this->offset = other.offset;
-  }
-
-  int64_t length;
-  int64_t null_count;
-  int64_t offset;
-};
-
-struct ARROW_EXPORT BufferMetadata {
-  BufferMetadata() {}
-  BufferMetadata(int32_t page, int64_t offset, int64_t length)
-      : page(page), offset(offset), length(length) {}
-
-  /// The shared memory page id where to find this. Set to -1 if unused
-  int32_t page;
-
-  /// The relative offset into the memory page to the starting byte of the buffer
-  int64_t offset;
-
-  /// Absolute length in bytes of the buffer
-  int64_t length;
-};
-
-/// Implement this to create new types of Arrow data loaders
-class ARROW_EXPORT ArrayComponentSource {
- public:
-  virtual ~ArrayComponentSource() = default;
-
-  virtual Status GetBuffer(int buffer_index, std::shared_ptr<Buffer>* out) = 0;
-  virtual Status GetFieldMetadata(int field_index, FieldMetadata* metadata) = 0;
-};
-
-/// Bookkeeping struct for loading array objects from their constituent pieces of raw data
-///
-/// The field_index and buffer_index are incremented in the ArrayLoader
-/// based on how much of the batch is "consumed" (through nested data
-/// reconstruction, for example)
-struct ArrayLoaderContext {
-  ArrayComponentSource* source;
-  int buffer_index;
-  int field_index;
-  int max_recursion_depth;
-};
-
-/// Construct an Array container from type metadata and a collection of memory
-/// buffers
-///
-/// \param[in] field the data type of the array being loaded
-/// \param[in] source an implementation of ArrayComponentSource
-/// \param[out] out the constructed array
-/// \return Status indicating success or failure
-Status ARROW_EXPORT LoadArray(const std::shared_ptr<DataType>& type,
-    ArrayComponentSource* source, std::shared_ptr<Array>* out);
-
-Status ARROW_EXPORT LoadArray(const std::shared_ptr<DataType>& field,
-    ArrayLoaderContext* context, std::shared_ptr<Array>* out);
-
-Status ARROW_EXPORT LoadArray(const std::shared_ptr<DataType>& type,
-    const std::vector<FieldMetadata>& fields,
-    const std::vector<std::shared_ptr<Buffer>>& buffers, std::shared_ptr<Array>* out);
-
-/// Create new arrays for logical types that are backed by primitive arrays.
-Status ARROW_EXPORT MakePrimitiveArray(const std::shared_ptr<DataType>& type,
-    int64_t length, const std::shared_ptr<Buffer>& data,
-    const std::shared_ptr<Buffer>& null_bitmap, int64_t null_count, int64_t offset,
-    std::shared_ptr<Array>* out);
-
-Status ARROW_EXPORT MakePrimitiveArray(const std::shared_ptr<DataType>& type,
-    const std::vector<std::shared_ptr<Buffer>>& buffers, int64_t length,
-    int64_t null_count, int64_t offset, std::shared_ptr<Array>* out);
-
-}  // namespace arrow
-
-#endif  // ARROW_LOADER_H

http://git-wip-us.apache.org/repos/asf/arrow/blob/84520711/cpp/src/arrow/pretty_print.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/pretty_print.cc b/cpp/src/arrow/pretty_print.cc
index 1f4bfa9..93f6ff0 100644
--- a/cpp/src/arrow/pretty_print.cc
+++ b/cpp/src/arrow/pretty_print.cc
@@ -26,6 +26,7 @@
 #include "arrow/table.h"
 #include "arrow/type.h"
 #include "arrow/type_traits.h"
+#include "arrow/util/logging.h"
 #include "arrow/util/string.h"
 #include "arrow/visitor_inline.h"
 
@@ -39,7 +40,7 @@ class ArrayPrinter {
   template <typename T>
   inline typename std::enable_if<IsInteger<T>::value, void>::type WriteDataValues(
       const T& array) {
-    const auto data = array.raw_data();
+    const auto data = array.raw_values();
     for (int i = 0; i < array.length(); ++i) {
       if (i > 0) { (*sink_) << ", "; }
       if (array.IsNull(i)) {
@@ -53,7 +54,7 @@ class ArrayPrinter {
   template <typename T>
   inline typename std::enable_if<IsFloatingPoint<T>::value, void>::type WriteDataValues(
       const T& array) {
-    const auto data = array.raw_data();
+    const auto data = array.raw_values();
     for (int i = 0; i < array.length(); ++i) {
       if (i > 0) { (*sink_) << ", "; }
       if (array.IsNull(i)) {
@@ -187,7 +188,12 @@ class ArrayPrinter {
 
   Status Visit(const StructArray& array) {
     RETURN_NOT_OK(WriteValidityBitmap(array));
-    return PrintChildren(array.fields(), array.offset(), array.length());
+    std::vector<std::shared_ptr<Array>> children;
+    children.reserve(array.num_fields());
+    for (int i = 0; i < array.num_fields(); ++i) {
+      children.emplace_back(array.field(i));
+    }
+    return PrintChildren(children, array.offset(), array.length());
   }
 
   Status Visit(const UnionArray& array) {
@@ -207,7 +213,12 @@ class ArrayPrinter {
     }
 
     // Print the children without any offset, because the type ids are absolute
-    return PrintChildren(array.children(), 0, array.length() + array.offset());
+    std::vector<std::shared_ptr<Array>> children;
+    children.reserve(array.num_fields());
+    for (int i = 0; i < array.num_fields(); ++i) {
+      children.emplace_back(array.child(i));
+    }
+    return PrintChildren(children, 0, array.length() + array.offset());
   }
 
   Status Visit(const DictionaryArray& array) {
@@ -286,4 +297,8 @@ Status PrettyPrint(const RecordBatch& batch, int indent, std::ostream* sink) {
   return Status::OK();
 }
 
+Status ARROW_EXPORT DebugPrint(const Array& arr, int indent) {
+  return PrettyPrint(arr, indent, &std::cout);
+}
+
 }  // namespace arrow

http://git-wip-us.apache.org/repos/asf/arrow/blob/84520711/cpp/src/arrow/pretty_print.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/pretty_print.h b/cpp/src/arrow/pretty_print.h
index f508aa0..a45c8a8 100644
--- a/cpp/src/arrow/pretty_print.h
+++ b/cpp/src/arrow/pretty_print.h
@@ -25,6 +25,7 @@
 
 namespace arrow {
 
+class Array;
 class Status;
 
 struct PrettyPrintOptions {
@@ -34,6 +35,8 @@ struct PrettyPrintOptions {
 Status ARROW_EXPORT PrettyPrint(const RecordBatch& batch, int indent, std::ostream* sink);
 Status ARROW_EXPORT PrettyPrint(const Array& arr, int indent, std::ostream* sink);
 
+Status ARROW_EXPORT DebugPrint(const Array& arr, int indent);
+
 }  // namespace arrow
 
 #endif  // ARROW_PRETTY_PRINT_H

http://git-wip-us.apache.org/repos/asf/arrow/blob/84520711/cpp/src/arrow/python/pandas_convert.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/python/pandas_convert.cc b/cpp/src/arrow/python/pandas_convert.cc
index 2364f13..cdd3f58 100644
--- a/cpp/src/arrow/python/pandas_convert.cc
+++ b/cpp/src/arrow/python/pandas_convert.cc
@@ -34,7 +34,6 @@
 #include <vector>
 
 #include "arrow/array.h"
-#include "arrow/loader.h"
 #include "arrow/status.h"
 #include "arrow/table.h"
 #include "arrow/type_fwd.h"
@@ -340,12 +339,10 @@ class PandasConverter {
       null_count = ValuesToBitmap<traits::npy_type>(arr_, null_bitmap_data_);
     }
 
-    std::vector<FieldMetadata> fields(1);
-    fields[0].length = length_;
-    fields[0].null_count = null_count;
-    fields[0].offset = 0;
-
-    return LoadArray(type_, fields, {null_bitmap_, data}, &out_);
+    BufferVector buffers = {null_bitmap_, data};
+    auto array_data = std::make_shared<internal::ArrayData>(
+        type_, length_, std::move(buffers), null_count, 0);
+    return internal::MakeArray(array_data, &out_);
   }
 
   template <typename T>
@@ -617,9 +614,9 @@ Status PandasConverter::ConvertObjectStrings() {
   RETURN_NOT_OK(builder.Finish(&out_));
 
   if (have_bytes) {
-    const auto& arr = static_cast<const StringArray&>(*out_);
-    out_ = std::make_shared<BinaryArray>(arr.length(), arr.value_offsets(), arr.data(),
-        arr.null_bitmap(), arr.null_count());
+    auto binary_data = out_->data()->ShallowCopy();
+    binary_data->type = ::arrow::binary();
+    out_ = std::make_shared<BinaryArray>(binary_data);
   }
   return Status::OK();
 }
@@ -1223,7 +1220,7 @@ inline void ConvertIntegerWithNulls(const ChunkedArray& data, double* out_values
   for (int c = 0; c < data.num_chunks(); c++) {
     const std::shared_ptr<Array> arr = data.chunk(c);
     auto prim_arr = static_cast<PrimitiveArray*>(arr.get());
-    auto in_values = reinterpret_cast<const T*>(prim_arr->data()->data());
+    auto in_values = reinterpret_cast<const T*>(prim_arr->raw_values());
     // Upcast to double, set NaN as appropriate
 
     for (int i = 0; i < arr->length(); ++i) {
@@ -1237,7 +1234,7 @@ inline void ConvertIntegerNoNullsSameType(const ChunkedArray& data, T* out_value
   for (int c = 0; c < data.num_chunks(); c++) {
     const std::shared_ptr<Array> arr = data.chunk(c);
     auto prim_arr = static_cast<PrimitiveArray*>(arr.get());
-    auto in_values = reinterpret_cast<const T*>(prim_arr->data()->data());
+    auto in_values = reinterpret_cast<const T*>(prim_arr->raw_values());
     memcpy(out_values, in_values, sizeof(T) * arr->length());
     out_values += arr->length();
   }
@@ -1248,7 +1245,7 @@ inline void ConvertIntegerNoNullsCast(const ChunkedArray& data, OutType* out_val
   for (int c = 0; c < data.num_chunks(); c++) {
     const std::shared_ptr<Array> arr = data.chunk(c);
     auto prim_arr = static_cast<PrimitiveArray*>(arr.get());
-    auto in_values = reinterpret_cast<const InType*>(prim_arr->data()->data());
+    auto in_values = reinterpret_cast<const InType*>(prim_arr->raw_values());
     for (int64_t i = 0; i < arr->length(); ++i) {
       *out_values = in_values[i];
     }
@@ -1371,14 +1368,14 @@ inline Status ConvertStruct(const ChunkedArray& data, PyObject** out_values) {
   // ChunkedArray has at least one chunk
   auto arr = static_cast<const StructArray*>(data.chunk(0).get());
   // Use it to cache the struct type and number of fields for all chunks
-  auto num_fields = arr->fields().size();
+  int32_t num_fields = arr->num_fields();
   auto array_type = arr->type();
   std::vector<OwnedRef> fields_data(num_fields);
   OwnedRef dict_item;
   for (int c = 0; c < data.num_chunks(); c++) {
     auto arr = static_cast<const StructArray*>(data.chunk(c).get());
     // Convert the struct arrays first
-    for (size_t i = 0; i < num_fields; i++) {
+    for (int32_t i = 0; i < num_fields; i++) {
       PyObject* numpy_array;
       RETURN_NOT_OK(
           ConvertArrayToPandas(arr->field(static_cast<int>(i)), nullptr, &numpy_array));
@@ -1395,7 +1392,7 @@ inline Status ConvertStruct(const ChunkedArray& data, PyObject** out_values) {
         // Build the new dict object for the row
         dict_item.reset(PyDict_New());
         RETURN_IF_PYERROR();
-        for (size_t field_idx = 0; field_idx < num_fields; ++field_idx) {
+        for (int32_t field_idx = 0; field_idx < num_fields; ++field_idx) {
           OwnedRef field_value;
           auto name = array_type->child(static_cast<int>(field_idx))->name();
           if (!arr->field(static_cast<int>(field_idx))->IsNull(i)) {
@@ -1475,7 +1472,7 @@ inline void ConvertNumericNullable(const ChunkedArray& data, T na_value, T* out_
   for (int c = 0; c < data.num_chunks(); c++) {
     const std::shared_ptr<Array> arr = data.chunk(c);
     auto prim_arr = static_cast<PrimitiveArray*>(arr.get());
-    auto in_values = reinterpret_cast<const T*>(prim_arr->data()->data());
+    auto in_values = reinterpret_cast<const T*>(prim_arr->raw_values());
 
     const uint8_t* valid_bits = arr->null_bitmap_data();
 
@@ -1496,7 +1493,7 @@ inline void ConvertNumericNullableCast(
   for (int c = 0; c < data.num_chunks(); c++) {
     const std::shared_ptr<Array> arr = data.chunk(c);
     auto prim_arr = static_cast<PrimitiveArray*>(arr.get());
-    auto in_values = reinterpret_cast<const InType*>(prim_arr->data()->data());
+    auto in_values = reinterpret_cast<const InType*>(prim_arr->raw_values());
 
     for (int64_t i = 0; i < arr->length(); ++i) {
       *out_values++ = arr->IsNull(i) ? na_value : static_cast<OutType>(in_values[i]);
@@ -1509,7 +1506,7 @@ inline void ConvertDatetimeNanos(const ChunkedArray& data, int64_t* out_values)
   for (int c = 0; c < data.num_chunks(); c++) {
     const std::shared_ptr<Array> arr = data.chunk(c);
     auto prim_arr = static_cast<PrimitiveArray*>(arr.get());
-    auto in_values = reinterpret_cast<const InType*>(prim_arr->data()->data());
+    auto in_values = reinterpret_cast<const InType*>(prim_arr->raw_values());
 
     for (int64_t i = 0; i < arr->length(); ++i) {
       *out_values++ = arr->IsNull(i) ? kPandasTimestampNull
@@ -1838,7 +1835,7 @@ class CategoricalBlock : public PandasBlock {
       const std::shared_ptr<Array> arr = data.chunk(c);
       const auto& dict_arr = static_cast<const DictionaryArray&>(*arr);
       const auto& indices = static_cast<const PrimitiveArray&>(*dict_arr.indices());
-      auto in_values = reinterpret_cast<const T*>(indices.data()->data());
+      auto in_values = reinterpret_cast<const T*>(indices.raw_values());
 
       // Null is -1 in CategoricalBlock
       for (int i = 0; i < arr->length(); ++i) {
@@ -2214,7 +2211,7 @@ class ArrowDeserializer {
     typedef typename arrow_traits<TYPE>::T T;
 
     auto prim_arr = static_cast<PrimitiveArray*>(arr.get());
-    auto in_values = reinterpret_cast<const T*>(prim_arr->data()->data());
+    auto in_values = reinterpret_cast<const T*>(prim_arr->raw_values());
 
     // Zero-Copy. We can pass the data pointer directly to NumPy.
     void* data = const_cast<T*>(in_values);
@@ -2290,7 +2287,7 @@ class ArrowDeserializer {
     for (int c = 0; c < data_.num_chunks(); c++) {
       const std::shared_ptr<Array> arr = data_.chunk(c);
       auto prim_arr = static_cast<PrimitiveArray*>(arr.get());
-      auto in_values = reinterpret_cast<const T*>(prim_arr->data()->data());
+      auto in_values = reinterpret_cast<const T*>(prim_arr->raw_values());
 
       for (int64_t i = 0; i < arr->length(); ++i) {
         *out_values++ = arr->IsNull(i) ? na_value : in_values[i] / kShift;

http://git-wip-us.apache.org/repos/asf/arrow/blob/84520711/cpp/src/arrow/table.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/table.cc b/cpp/src/arrow/table.cc
index c110ec1..aa04243 100644
--- a/cpp/src/arrow/table.cc
+++ b/cpp/src/arrow/table.cc
@@ -146,12 +146,30 @@ void AssertBatchValid(const RecordBatch& batch) {
 
 RecordBatch::RecordBatch(const std::shared_ptr<Schema>& schema, int64_t num_rows,
     const std::vector<std::shared_ptr<Array>>& columns)
-    : schema_(schema), num_rows_(num_rows), columns_(columns) {}
+    : schema_(schema), num_rows_(num_rows), columns_(columns.size()) {
+  for (size_t i = 0; i < columns.size(); ++i) {
+    columns_[i] = columns[i]->data();
+  }
+}
 
 RecordBatch::RecordBatch(const std::shared_ptr<Schema>& schema, int64_t num_rows,
     std::vector<std::shared_ptr<Array>>&& columns)
+    : schema_(schema), num_rows_(num_rows), columns_(columns.size()) {
+  for (size_t i = 0; i < columns.size(); ++i) {
+    columns_[i] = columns[i]->data();
+  }
+}
+
+RecordBatch::RecordBatch(const std::shared_ptr<Schema>& schema, int64_t num_rows,
+    std::vector<std::shared_ptr<internal::ArrayData>>&& columns)
     : schema_(schema), num_rows_(num_rows), columns_(std::move(columns)) {}
 
+std::shared_ptr<Array> RecordBatch::column(int i) const {
+  std::shared_ptr<Array> result;
+  DCHECK(MakeArray(columns_[i], &result).ok());
+  return result;
+}
+
 const std::string& RecordBatch::column_name(int i) const {
   return schema_->field(i)->name();
 }
@@ -185,30 +203,36 @@ std::shared_ptr<RecordBatch> RecordBatch::Slice(int64_t offset) const {
 }
 
 std::shared_ptr<RecordBatch> RecordBatch::Slice(int64_t offset, int64_t length) const {
-  std::vector<std::shared_ptr<Array>> arrays;
+  std::vector<std::shared_ptr<internal::ArrayData>> arrays;
   arrays.reserve(num_columns());
   for (const auto& field : columns_) {
-    arrays.emplace_back(field->Slice(offset, length));
+    int64_t col_length = std::min(field->length - offset, length);
+    int64_t col_offset = field->offset + offset;
+
+    auto new_data = std::make_shared<internal::ArrayData>(*field);
+    new_data->length = col_length;
+    new_data->offset = col_offset;
+    new_data->null_count = kUnknownNullCount;
+    arrays.emplace_back(new_data);
   }
-
   int64_t num_rows = std::min(num_rows_ - offset, length);
-  return std::make_shared<RecordBatch>(schema_, num_rows, arrays);
+  return std::make_shared<RecordBatch>(schema_, num_rows, std::move(arrays));
 }
 
 Status RecordBatch::Validate() const {
   for (int i = 0; i < num_columns(); ++i) {
-    const Array& arr = *columns_[i];
-    if (arr.length() != num_rows_) {
+    const internal::ArrayData& arr = *columns_[i];
+    if (arr.length != num_rows_) {
       std::stringstream ss;
-      ss << "Number of rows in column " << i << " did not match batch: " << arr.length()
+      ss << "Number of rows in column " << i << " did not match batch: " << arr.length
          << " vs " << num_rows_;
       return Status::Invalid(ss.str());
     }
     const auto& schema_type = *schema_->field(i)->type();
-    if (!arr.type()->Equals(schema_type)) {
+    if (!arr.type->Equals(schema_type)) {
       std::stringstream ss;
-      ss << "Column " << i << " type not match schema: " << arr.type()->ToString()
-         << " vs " << schema_type.ToString();
+      ss << "Column " << i << " type not match schema: " << arr.type->ToString() << " vs "
+         << schema_type.ToString();
       return Status::Invalid(ss.str());
     }
   }

http://git-wip-us.apache.org/repos/asf/arrow/blob/84520711/cpp/src/arrow/table.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/table.h b/cpp/src/arrow/table.h
index 67710a8..18315f3 100644
--- a/cpp/src/arrow/table.h
+++ b/cpp/src/arrow/table.h
@@ -28,6 +28,12 @@
 
 namespace arrow {
 
+namespace internal {
+
+struct ArrayData;
+
+}  // namespace internal
+
 class Array;
 class Column;
 class Schema;
@@ -106,15 +112,29 @@ class ARROW_EXPORT Column {
 // corresponding sequence of equal-length Arrow arrays
 class ARROW_EXPORT RecordBatch {
  public:
-  // num_rows is a parameter to allow for record batches of a particular size not
-  // having any materialized columns. Each array should have the same length as
-  // num_rows
+  /// num_rows is a parameter to allow for record batches of a particular size not
+  /// having any materialized columns. Each array should have the same length as
+  /// num_rows
+
   RecordBatch(const std::shared_ptr<Schema>& schema, int64_t num_rows,
       const std::vector<std::shared_ptr<Array>>& columns);
 
+  /// \brief Deprecated move constructor for a vector of Array instances
   RecordBatch(const std::shared_ptr<Schema>& schema, int64_t num_rows,
       std::vector<std::shared_ptr<Array>>&& columns);
 
+  /// \brief Construct record batch from vector of internal data structures
+  ///
+  /// This class is only provided with an rvalue-reference for the input data,
+  /// and is intended for internal use, or advanced users.
+  ///
+  /// \param schema the record batch schema
+  /// \param num_rows the number of semantic rows in the record batch. This
+  /// should be equal to the length of each field
+  /// \param columns the data for the batch's columns
+  RecordBatch(const std::shared_ptr<Schema>& schema, int64_t num_rows,
+      std::vector<std::shared_ptr<internal::ArrayData>>&& columns);
+
   bool Equals(const RecordBatch& other) const;
 
   bool ApproxEquals(const RecordBatch& other) const;
@@ -124,9 +144,9 @@ class ARROW_EXPORT RecordBatch {
 
   // @returns: the i-th column
   // Note: Does not boundscheck
-  std::shared_ptr<Array> column(int i) const { return columns_[i]; }
+  std::shared_ptr<Array> column(int i) const;
 
-  const std::vector<std::shared_ptr<Array>>& columns() const { return columns_; }
+  std::shared_ptr<internal::ArrayData> column_data(int i) const { return columns_[i]; }
 
   const std::string& column_name(int i) const;
 
@@ -147,7 +167,7 @@ class ARROW_EXPORT RecordBatch {
  private:
   std::shared_ptr<Schema> schema_;
   int64_t num_rows_;
-  std::vector<std::shared_ptr<Array>> columns_;
+  std::vector<std::shared_ptr<internal::ArrayData>> columns_;
 };
 
 // Immutable container of fixed-length columns conforming to a particular schema

http://git-wip-us.apache.org/repos/asf/arrow/blob/84520711/python/doc/source/development.rst
----------------------------------------------------------------------
diff --git a/python/doc/source/development.rst b/python/doc/source/development.rst
index 8a70180..b5aba6c 100644
--- a/python/doc/source/development.rst
+++ b/python/doc/source/development.rst
@@ -267,6 +267,7 @@ Now, we build and install Arrow C++ libraries
          -DCMAKE_INSTALL_PREFIX=%ARROW_HOME% ^
          -DCMAKE_BUILD_TYPE=Release ^
          -DARROW_BUILD_TESTS=off ^
+         -DARROW_ZLIB_VENDORED=off ^
          -DARROW_PYTHON=on ..
    cmake --build . --target INSTALL --config Release
    cd ..\..
@@ -282,7 +283,6 @@ Now, we build parquet-cpp and install the result in the same place:
    cmake -G "Visual Studio 14 2015 Win64" ^
          -DCMAKE_INSTALL_PREFIX=%PARQUET_HOME% ^
          -DCMAKE_BUILD_TYPE=Release ^
-         -DPARQUET_ZLIB_VENDORED=off ^
          -DPARQUET_BUILD_TESTS=off ..
    cmake --build . --target INSTALL --config Release
    popd

http://git-wip-us.apache.org/repos/asf/arrow/blob/84520711/python/pyarrow/array.pxi
----------------------------------------------------------------------
diff --git a/python/pyarrow/array.pxi b/python/pyarrow/array.pxi
index bf87173..ae9ff88 100644
--- a/python/pyarrow/array.pxi
+++ b/python/pyarrow/array.pxi
@@ -1084,8 +1084,8 @@ cdef class StructValue(ArrayValue):
             CStructArray* ap
             vector[shared_ptr[CField]] child_fields = self.type.type.children()
         ap = <CStructArray*> self.sp_array.get()
-        child_arrays = ap.fields()
-        wrapped_arrays = (pyarrow_wrap_array(child) for child in child_arrays)
+        wrapped_arrays = (pyarrow_wrap_array(ap.field(i))
+                          for i in range(ap.num_fields()))
         child_names = (child.get().name() for child in child_fields)
         # Return the struct as a dict
         return {
@@ -1214,6 +1214,9 @@ cdef class Array:
         self.ap = sp_array.get()
         self.type = pyarrow_wrap_data_type(self.sp_array.get().type())
 
+    def _debug_print(self):
+        check_status(DebugPrint(deref(self.ap), 0))
+
     @staticmethod
     def from_pandas(obj, mask=None, DataType type=None,
                     timestamps_to_ms=False,

http://git-wip-us.apache.org/repos/asf/arrow/blob/84520711/python/pyarrow/includes/libarrow.pxd
----------------------------------------------------------------------
diff --git a/python/pyarrow/includes/libarrow.pxd b/python/pyarrow/includes/libarrow.pxd
index cc46c76..2db1dd1 100644
--- a/python/pyarrow/includes/libarrow.pxd
+++ b/python/pyarrow/includes/libarrow.pxd
@@ -91,12 +91,16 @@ cdef extern from "arrow/api.h" namespace "arrow" nogil:
         int64_t null_count()
         Type type_id()
 
+        int num_fields()
+
         c_bool Equals(const CArray& arr)
         c_bool IsNull(int i)
 
         shared_ptr[CArray] Slice(int64_t offset)
         shared_ptr[CArray] Slice(int64_t offset, int64_t length)
 
+    CStatus DebugPrint(const CArray& arr, int indent)
+
     cdef cppclass CFixedWidthType" arrow::FixedWidthType"(CDataType):
         int bit_width()
 

http://git-wip-us.apache.org/repos/asf/arrow/blob/84520711/python/pyarrow/tests/test_convert_pandas.py
----------------------------------------------------------------------
diff --git a/python/pyarrow/tests/test_convert_pandas.py b/python/pyarrow/tests/test_convert_pandas.py
index ac4ad82..4c51d71 100644
--- a/python/pyarrow/tests/test_convert_pandas.py
+++ b/python/pyarrow/tests/test_convert_pandas.py
@@ -477,6 +477,23 @@ class TestPandasConversion(unittest.TestCase):
             field = schema.field_by_name(column)
             self._check_array_roundtrip(df[column], type=field.type)
 
+    def test_column_of_arrays_to_py(self):
+        # Test regression in ARROW-1199 not caught in above test
+        dtype = 'i1'
+        arr = np.array([
+            np.arange(10, dtype=dtype),
+            np.arange(5, dtype=dtype),
+            None,
+            np.arange(1, dtype=dtype)
+        ])
+        type_ = pa.list_(pa.int8())
+        parr = pa.Array.from_pandas(arr, type=type_)
+
+        assert parr[0].as_py() == list(range(10))
+        assert parr[1].as_py() == list(range(5))
+        assert parr[2].as_py() is None
+        assert parr[3].as_py() == [0]
+
     def test_column_of_lists(self):
         df, schema = dataframe_with_lists()
         self._check_pandas_roundtrip(df, schema=schema, expected_schema=schema)


[2/2] arrow git commit: ARROW-1199: [C++] Implement mutable POD struct for Array data

Posted by we...@apache.org.
ARROW-1199: [C++] Implement mutable POD struct for Array data

This data structure provides a new internal data structure that is a self-contained representation of the memory and metadata inside an Arrow array data structure.

This class is designed for easy internal data manipulation, analytical data processing, and data transport to and from IPC messages. For example, we could cast from int64 to float64 like so:

```c++
Int64Array arr = GetMyData();
std::shared_ptr<internal::ArrayData> new_data = arr->data()->ShallowCopy();
new_data->type = arrow::float64();
Float64Array double_arr(new_data);
```

This object is also useful in an analytics setting where memory may be reused. For example, if we had a group of operations all returning doubles, say:

```
Log(Sqrt(Expr(arr))
```

Then the low-level implementations of each of these functions could have the signatures

void Log(const ArrayData& values, ArrayData* out);

As another example a function may consume one or more memory buffers in an input array and replace them with newly-allocated data, changing the output data type as well.

I did quite a bit of refactoring and code simplification that was enabled by this patch. I note that performance in IPC loading of very wide record batches is about 15% slower, but in smaller record batches it is about the same in microbenchmarks. This code path could possibly be made faster with some performance analysis work.

Author: Wes McKinney <we...@twosigma.com>

Closes #824 from wesm/array-data-internals and squashes the following commits:

f1acbae1 [Wes McKinney] MSVC fixes
dcdf2b29 [Wes McKinney] Fix glib per C++ API changes
d0a8ee2b [Wes McKinney] Fix logic error in UnsafeSetNotNull
d17f886c [Wes McKinney] Construct dictionary indices in ctor
bba42530 [Wes McKinney] Set correct type when creating BinaryArray
ba3b2992 [Wes McKinney] Various fixes, Python fixes, add Array operator<< to std::ostream for debugging
0b8af24a [Wes McKinney] Write field metadata directly into output object
05058638 [Wes McKinney] Fix up cmake
75bc6b4f [Wes McKinney] Delete cruft from array/loader.h and consolidate in arrow/ipc
24df1b97 [Wes McKinney] Review comments, add some doxygen comments
6e2e5720 [Wes McKinney] Preallocate vector of shared_ptr
05b806b2 [Wes McKinney] Tests passing again
5bdd6a99 [Wes McKinney] bug fixes
7894496e [Wes McKinney] Some fixes
bf91a75a [Wes McKinney] Refactor to use shared_ptr, not yet working
130f0c1a [Wes McKinney] Use std::move instead of std::forward
a9b4031b [Wes McKinney] Add move constructors to reduce unnecessary copying
475a3db6 [Wes McKinney] Bug fixes, test suite passing again
16918279 [Wes McKinney] Array internals refactoring to use POD struct for all buffers, auxiliary metadata


Project: http://git-wip-us.apache.org/repos/asf/arrow/repo
Commit: http://git-wip-us.apache.org/repos/asf/arrow/commit/84520711
Tree: http://git-wip-us.apache.org/repos/asf/arrow/tree/84520711
Diff: http://git-wip-us.apache.org/repos/asf/arrow/diff/84520711

Branch: refs/heads/master
Commit: 8452071180c075d7d829d9c0a49376adb45971e0
Parents: ad57ea8
Author: Wes McKinney <we...@twosigma.com>
Authored: Tue Jul 11 01:39:20 2017 -0400
Committer: Wes McKinney <we...@twosigma.com>
Committed: Tue Jul 11 01:39:20 2017 -0400

----------------------------------------------------------------------
 c_glib/arrow-glib/array.cpp                 |   9 +-
 c_glib/arrow-glib/record-batch.cpp          |   3 +-
 cpp/CMakeLists.txt                          |   1 -
 cpp/cmake_modules/FindLz4.cmake             |  35 +--
 cpp/cmake_modules/FindZSTD.cmake            |  35 +--
 cpp/src/arrow/CMakeLists.txt                |   1 -
 cpp/src/arrow/api.h                         |   1 -
 cpp/src/arrow/array-test.cc                 |  71 +++--
 cpp/src/arrow/array.cc                      | 375 ++++++++++++++++-------
 cpp/src/arrow/array.h                       | 345 ++++++++++++++++-----
 cpp/src/arrow/builder.cc                    |  13 +-
 cpp/src/arrow/compare.cc                    | 155 +++++++---
 cpp/src/arrow/ipc/feather-test.cc           |  19 +-
 cpp/src/arrow/ipc/feather.cc                |   5 +-
 cpp/src/arrow/ipc/json-internal.cc          |  20 +-
 cpp/src/arrow/ipc/metadata.h                |  37 ++-
 cpp/src/arrow/ipc/reader.cc                 | 204 +++++++++++-
 cpp/src/arrow/ipc/test-common.h             |   8 +-
 cpp/src/arrow/ipc/writer.cc                 |  14 +-
 cpp/src/arrow/loader.cc                     | 297 ------------------
 cpp/src/arrow/loader.h                      | 124 --------
 cpp/src/arrow/pretty_print.cc               |  23 +-
 cpp/src/arrow/pretty_print.h                |   3 +
 cpp/src/arrow/python/pandas_convert.cc      |  41 ++-
 cpp/src/arrow/table.cc                      |  46 ++-
 cpp/src/arrow/table.h                       |  32 +-
 python/doc/source/development.rst           |   2 +-
 python/pyarrow/array.pxi                    |   7 +-
 python/pyarrow/includes/libarrow.pxd        |   4 +
 python/pyarrow/tests/test_convert_pandas.py |  17 +
 30 files changed, 1101 insertions(+), 846 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/arrow/blob/84520711/c_glib/arrow-glib/array.cpp
----------------------------------------------------------------------
diff --git a/c_glib/arrow-glib/array.cpp b/c_glib/arrow-glib/array.cpp
index 92a748d..ab62bba 100644
--- a/c_glib/arrow-glib/array.cpp
+++ b/c_glib/arrow-glib/array.cpp
@@ -38,7 +38,7 @@ garrow_array_get_values_raw(std::shared_ptr<arrow::Array> arrow_array,
   auto arrow_specific_array =
     std::static_pointer_cast<typename arrow::TypeTraits<T>::ArrayType>(arrow_array);
   *length = arrow_specific_array->length();
-  return arrow_specific_array->raw_data();
+  return arrow_specific_array->raw_values();
 };
 
 G_BEGIN_DECLS
@@ -490,7 +490,7 @@ garrow_primitive_array_get_buffer(GArrowPrimitiveArray *array)
   auto arrow_array = garrow_array_get_raw(GARROW_ARRAY(array));
   auto arrow_primitive_array =
     static_cast<arrow::PrimitiveArray *>(arrow_array.get());
-  auto arrow_data = arrow_primitive_array->data();
+  auto arrow_data = arrow_primitive_array->values();
   return garrow_buffer_new_raw(&arrow_data);
 }
 
@@ -1425,7 +1425,7 @@ garrow_binary_array_get_buffer(GArrowBinaryArray *array)
   auto arrow_array = garrow_array_get_raw(GARROW_ARRAY(array));
   auto arrow_binary_array =
     static_cast<arrow::BinaryArray *>(arrow_array.get());
-  auto arrow_data = arrow_binary_array->data();
+  auto arrow_data = arrow_binary_array->value_data();
   return garrow_buffer_new_raw(&arrow_data);
 }
 
@@ -1681,7 +1681,8 @@ garrow_struct_array_get_fields(GArrowStructArray *array)
     static_cast<const arrow::StructArray *>(arrow_array.get());
 
   GList *fields = NULL;
-  for (auto arrow_field : arrow_struct_array->fields()) {
+  for (int i = 0; i < arrow_struct_array->num_fields(); ++i) {
+    auto arrow_field = arrow_struct_array->field(i);
     GArrowArray *field = garrow_array_new_raw(&arrow_field);
     fields = g_list_prepend(fields, field);
   }

http://git-wip-us.apache.org/repos/asf/arrow/blob/84520711/c_glib/arrow-glib/record-batch.cpp
----------------------------------------------------------------------
diff --git a/c_glib/arrow-glib/record-batch.cpp b/c_glib/arrow-glib/record-batch.cpp
index cd030de..f381af0 100644
--- a/c_glib/arrow-glib/record-batch.cpp
+++ b/c_glib/arrow-glib/record-batch.cpp
@@ -219,7 +219,8 @@ garrow_record_batch_get_columns(GArrowRecordBatch *record_batch)
   const auto arrow_record_batch = garrow_record_batch_get_raw(record_batch);
 
   GList *columns = NULL;
-  for (auto arrow_column : arrow_record_batch->columns()) {
+  for (int i = 0; i < arrow_record_batch->num_columns(); ++i) {
+    auto arrow_column = arrow_record_batch->column(i);
     GArrowArray *column = garrow_array_new_raw(&arrow_column);
     columns = g_list_prepend(columns, column);
   }

http://git-wip-us.apache.org/repos/asf/arrow/blob/84520711/cpp/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt
index 002a07e..e67c7f6 100644
--- a/cpp/CMakeLists.txt
+++ b/cpp/CMakeLists.txt
@@ -650,7 +650,6 @@ set(ARROW_SRCS
   src/arrow/buffer.cc
   src/arrow/builder.cc
   src/arrow/compare.cc
-  src/arrow/loader.cc
   src/arrow/memory_pool.cc
   src/arrow/pretty_print.cc
   src/arrow/status.cc

http://git-wip-us.apache.org/repos/asf/arrow/blob/84520711/cpp/cmake_modules/FindLz4.cmake
----------------------------------------------------------------------
diff --git a/cpp/cmake_modules/FindLz4.cmake b/cpp/cmake_modules/FindLz4.cmake
index e25b013..07707cf 100644
--- a/cpp/cmake_modules/FindLz4.cmake
+++ b/cpp/cmake_modules/FindLz4.cmake
@@ -39,32 +39,15 @@ set(LZ4_STATIC_LIB_SUFFIX
 set(LZ4_STATIC_LIB_NAME
   ${CMAKE_STATIC_LIBRARY_PREFIX}lz4${LZ4_STATIC_LIB_SUFFIX})
 
-if ( _lz4_roots )
-  find_path(LZ4_INCLUDE_DIR NAMES lz4.h
-    PATHS ${_lz4_roots}
-    NO_DEFAULT_PATH
-    PATH_SUFFIXES "include" )
-  find_library(LZ4_SHARED_LIB NAMES lz4
-    PATHS ${_lz4_roots}
-    NO_DEFAULT_PATH
-    PATH_SUFFIXES "lib" )
-  find_library(LZ4_STATIC_LIB NAMES ${LZ4_STATIC_LIB_NAME}
-    PATHS ${_lz4_roots}
-    NO_DEFAULT_PATH
-    PATH_SUFFIXES "lib" )
-else()
-  find_path(LZ4_INCLUDE_DIR lz4.h
-    # make sure we don't accidentally pick up a different version
-    NO_CMAKE_SYSTEM_PATH
-    NO_SYSTEM_ENVIRONMENT_PATH)
-  find_library(LZ4_SHARED_LIB lz4
-    NO_CMAKE_SYSTEM_PATH
-    NO_SYSTEM_ENVIRONMENT_PATH)
-  find_library(LZ4_STATIC_LIB ${LZ4_STATIC_LIB_NAME}
-    NO_CMAKE_SYSTEM_PATH
-    NO_SYSTEM_ENVIRONMENT_PATH)
-endif()
+find_path(LZ4_INCLUDE_DIR NAMES lz4.h
+  PATHS ${_lz4_roots}
+  NO_DEFAULT_PATH
+  PATH_SUFFIXES "include" )
+find_library(LZ4_STATIC_LIB NAMES ${LZ4_STATIC_LIB_NAME} lib${LZ4_STATIC_LIB_NAME}
+  PATHS ${_lz4_roots}
+  NO_DEFAULT_PATH
+  PATH_SUFFIXES "lib" )
 
 include(FindPackageHandleStandardArgs)
 find_package_handle_standard_args(LZ4 REQUIRED_VARS
-  LZ4_SHARED_LIB LZ4_STATIC_LIB LZ4_INCLUDE_DIR)
+  LZ4_STATIC_LIB LZ4_INCLUDE_DIR)

http://git-wip-us.apache.org/repos/asf/arrow/blob/84520711/cpp/cmake_modules/FindZSTD.cmake
----------------------------------------------------------------------
diff --git a/cpp/cmake_modules/FindZSTD.cmake b/cpp/cmake_modules/FindZSTD.cmake
index 1fda29e..02a0c39 100644
--- a/cpp/cmake_modules/FindZSTD.cmake
+++ b/cpp/cmake_modules/FindZSTD.cmake
@@ -39,32 +39,15 @@ set(ZSTD_STATIC_LIB_SUFFIX
 set(ZSTD_STATIC_LIB_NAME
   ${CMAKE_STATIC_LIBRARY_PREFIX}zstd${ZSTD_STATIC_LIB_SUFFIX})
 
-if ( _zstd_roots )
-  find_path(ZSTD_INCLUDE_DIR NAMES zstd.h
-    PATHS ${_zstd_roots}
-    NO_DEFAULT_PATH
-    PATH_SUFFIXES "include" )
-  find_library(ZSTD_SHARED_LIB NAMES zstd
-    PATHS ${_zstd_roots}
-    NO_DEFAULT_PATH
-    PATH_SUFFIXES "lib" )
-  find_library(ZSTD_STATIC_LIB NAMES ${ZSTD_STATIC_LIB_NAME}
-    PATHS ${_zstd_roots}
-    NO_DEFAULT_PATH
-    PATH_SUFFIXES "lib" )
-else()
-  find_path(ZSTD_INCLUDE_DIR zstd.h
-    # make sure we don't accidentally pick up a different version
-    NO_CMAKE_SYSTEM_PATH
-    NO_SYSTEM_ENVIRONMENT_PATH)
-  find_library(ZSTD_SHARED_LIB zstd
-    NO_CMAKE_SYSTEM_PATH
-    NO_SYSTEM_ENVIRONMENT_PATH)
-  find_library(ZSTD_STATIC_LIB ${ZSTD_STATIC_LIB_NAME}
-    NO_CMAKE_SYSTEM_PATH
-    NO_SYSTEM_ENVIRONMENT_PATH)
-endif()
+find_path(ZSTD_INCLUDE_DIR NAMES zstd.h
+  PATHS ${_zstd_roots}
+  NO_DEFAULT_PATH
+  PATH_SUFFIXES "include" )
+find_library(ZSTD_STATIC_LIB NAMES ${ZSTD_STATIC_LIB_NAME} lib${ZSTD_STATIC_LIB_NAME}
+  PATHS ${_zstd_roots}
+  NO_DEFAULT_PATH
+  PATH_SUFFIXES "lib" )
 
 include(FindPackageHandleStandardArgs)
 find_package_handle_standard_args(ZSTD REQUIRED_VARS
-  ZSTD_SHARED_LIB ZSTD_STATIC_LIB ZSTD_INCLUDE_DIR)
+  ZSTD_STATIC_LIB ZSTD_INCLUDE_DIR)

http://git-wip-us.apache.org/repos/asf/arrow/blob/84520711/cpp/src/arrow/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/CMakeLists.txt b/cpp/src/arrow/CMakeLists.txt
index cb5282c..55fab2d 100644
--- a/cpp/src/arrow/CMakeLists.txt
+++ b/cpp/src/arrow/CMakeLists.txt
@@ -23,7 +23,6 @@ install(FILES
   buffer.h
   builder.h
   compare.h
-  loader.h
   memory_pool.h
   pretty_print.h
   status.h

http://git-wip-us.apache.org/repos/asf/arrow/blob/84520711/cpp/src/arrow/api.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/api.h b/cpp/src/arrow/api.h
index aa0da75..731f239 100644
--- a/cpp/src/arrow/api.h
+++ b/cpp/src/arrow/api.h
@@ -24,7 +24,6 @@
 #include "arrow/buffer.h"
 #include "arrow/builder.h"
 #include "arrow/compare.h"
-#include "arrow/loader.h"
 #include "arrow/memory_pool.h"
 #include "arrow/pretty_print.h"
 #include "arrow/status.h"

http://git-wip-us.apache.org/repos/asf/arrow/blob/84520711/cpp/src/arrow/array-test.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/array-test.cc b/cpp/src/arrow/array-test.cc
index 7ae03cf..bfdb923 100644
--- a/cpp/src/arrow/array-test.cc
+++ b/cpp/src/arrow/array-test.cc
@@ -351,7 +351,7 @@ void TestPrimitiveBuilder<PBoolean>::Check(
 
   for (int64_t i = 0; i < result->length(); ++i) {
     if (nullable) { ASSERT_EQ(valid_bytes_[i] == 0, result->IsNull(i)) << i; }
-    bool actual = BitUtil::GetBit(result->data()->data(), i);
+    bool actual = BitUtil::GetBit(result->values()->data(), i);
     ASSERT_EQ(draws_[i] != 0, actual) << i;
   }
   ASSERT_TRUE(result->Equals(*expected));
@@ -778,8 +778,8 @@ TEST_F(TestStringArray, CompareNullByteSlots) {
 
   // The validity bitmaps are the same, the data is different, but the unequal
   // portion is masked out
-  StringArray equal_array(3, a1.value_offsets(), a1.data(), a2.null_bitmap(), 1);
-  StringArray equal_array2(3, a3.value_offsets(), a3.data(), a2.null_bitmap(), 1);
+  StringArray equal_array(3, a1.value_offsets(), a1.value_data(), a2.null_bitmap(), 1);
+  StringArray equal_array2(3, a3.value_offsets(), a3.value_data(), a2.null_bitmap(), 1);
 
   ASSERT_TRUE(equal_array.Equals(equal_array2));
   ASSERT_TRUE(a2.RangeEquals(equal_array2, 0, 3, 0));
@@ -846,7 +846,7 @@ TEST_F(TestStringBuilder, TestScalarAppend) {
 
   ASSERT_EQ(reps * N, result_->length());
   ASSERT_EQ(reps, result_->null_count());
-  ASSERT_EQ(reps * 6, result_->data()->size());
+  ASSERT_EQ(reps * 6, result_->value_data()->size());
 
   int32_t length;
   int32_t pos = 0;
@@ -1011,7 +1011,7 @@ TEST_F(TestBinaryBuilder, TestScalarAppend) {
   ASSERT_OK(ValidateArray(*result_));
   ASSERT_EQ(reps * N, result_->length());
   ASSERT_EQ(reps, result_->null_count());
-  ASSERT_EQ(reps * 6, result_->data()->size());
+  ASSERT_EQ(reps * 6, result_->value_data()->size());
 
   int32_t length;
   for (int i = 0; i < N * reps; ++i) {
@@ -1200,8 +1200,8 @@ TEST_F(TestFWBinaryArray, EqualsRangeEquals) {
   const auto& a1 = static_cast<const FixedSizeBinaryArray&>(*array1);
   const auto& a2 = static_cast<const FixedSizeBinaryArray&>(*array2);
 
-  FixedSizeBinaryArray equal1(type, 2, a1.data(), a1.null_bitmap(), 1);
-  FixedSizeBinaryArray equal2(type, 2, a2.data(), a1.null_bitmap(), 1);
+  FixedSizeBinaryArray equal1(type, 2, a1.values(), a1.null_bitmap(), 1);
+  FixedSizeBinaryArray equal2(type, 2, a2.values(), a1.null_bitmap(), 1);
 
   ASSERT_TRUE(equal1.Equals(equal2));
   ASSERT_TRUE(equal1.RangeEquals(equal2, 0, 2, 0));
@@ -1224,7 +1224,7 @@ TEST_F(TestFWBinaryArray, ZeroSize) {
   const auto& fw_array = static_cast<const FixedSizeBinaryArray&>(*array);
 
   // data is never allocated
-  ASSERT_TRUE(fw_array.data() == nullptr);
+  ASSERT_TRUE(fw_array.values() == nullptr);
   ASSERT_EQ(0, fw_array.byte_width());
 
   ASSERT_EQ(6, array->length());
@@ -1524,8 +1524,7 @@ TYPED_TEST(TestDictionaryBuilder, Basic) {
   ASSERT_OK(dict_builder.Append(static_cast<typename TypeParam::c_type>(2)));
   std::shared_ptr<Array> dict_array;
   ASSERT_OK(dict_builder.Finish(&dict_array));
-  auto dtype =
-      std::make_shared<DictionaryType>(std::make_shared<TypeParam>(), dict_array);
+  auto dtype = std::make_shared<DictionaryType>(uint8(), dict_array);
 
   UInt8Builder int_builder(default_memory_pool());
   ASSERT_OK(int_builder.Append(0));
@@ -1558,8 +1557,7 @@ TYPED_TEST(TestDictionaryBuilder, ArrayConversion) {
   ASSERT_OK(dict_builder.Append(static_cast<typename TypeParam::c_type>(2)));
   std::shared_ptr<Array> dict_array;
   ASSERT_OK(dict_builder.Finish(&dict_array));
-  auto dtype =
-      std::make_shared<DictionaryType>(std::make_shared<TypeParam>(), dict_array);
+  auto dtype = std::make_shared<DictionaryType>(uint8(), dict_array);
 
   UInt8Builder int_builder(default_memory_pool());
   ASSERT_OK(int_builder.Append(0));
@@ -1601,8 +1599,7 @@ TYPED_TEST(TestDictionaryBuilder, DoubleTableSize) {
     // Finalize expected data
     std::shared_ptr<Array> dict_array;
     ASSERT_OK(dict_builder.Finish(&dict_array));
-    auto dtype =
-        std::make_shared<DictionaryType>(std::make_shared<TypeParam>(), dict_array);
+    auto dtype = std::make_shared<DictionaryType>(uint16(), dict_array);
     std::shared_ptr<Array> int_array;
     ASSERT_OK(int_builder.Finish(&int_array));
 
@@ -1627,7 +1624,7 @@ TEST(TestStringDictionaryBuilder, Basic) {
   ASSERT_OK(str_builder.Append("test2"));
   std::shared_ptr<Array> str_array;
   ASSERT_OK(str_builder.Finish(&str_array));
-  auto dtype = std::make_shared<DictionaryType>(utf8(), str_array);
+  auto dtype = std::make_shared<DictionaryType>(uint8(), str_array);
 
   UInt8Builder int_builder(default_memory_pool());
   ASSERT_OK(int_builder.Append(0));
@@ -1668,7 +1665,7 @@ TEST(TestStringDictionaryBuilder, DoubleTableSize) {
   // Finalize expected data
   std::shared_ptr<Array> str_array;
   ASSERT_OK(str_builder.Finish(&str_array));
-  auto dtype = std::make_shared<DictionaryType>(utf8(), str_array);
+  auto dtype = std::make_shared<DictionaryType>(uint16(), str_array);
   std::shared_ptr<Array> int_array;
   ASSERT_OK(int_builder.Finish(&int_array));
 
@@ -1781,7 +1778,7 @@ TEST_F(TestListBuilder, TestAppendNull) {
   ASSERT_EQ(0, result_->value_offset(1));
   ASSERT_EQ(0, result_->value_offset(2));
 
-  Int32Array* values = static_cast<Int32Array*>(result_->values().get());
+  auto values = result_->values();
   ASSERT_EQ(0, values->length());
 }
 
@@ -1802,7 +1799,7 @@ void ValidateBasicListArray(const ListArray* result, const vector<int32_t>& valu
   }
 
   ASSERT_EQ(7, result->values()->length());
-  Int32Array* varr = static_cast<Int32Array*>(result->values().get());
+  auto varr = std::dynamic_pointer_cast<Int32Array>(result->values());
 
   for (size_t i = 0; i < values.size(); ++i) {
     ASSERT_EQ(values[i], varr->Value(i));
@@ -1972,25 +1969,27 @@ TEST(TestDictionary, Validate) {
   std::shared_ptr<DataType> dict_type = dictionary(int16(), dict);
 
   std::shared_ptr<Array> indices;
-  vector<uint8_t> indices_values = {1, 2, 0, 0, 2, 0};
-  ArrayFromVector<UInt8Type, uint8_t>(is_valid, indices_values, &indices);
-
-  std::shared_ptr<Array> indices2;
-  vector<float> indices2_values = {1., 2., 0., 0., 2., 0.};
-  ArrayFromVector<FloatType, float>(is_valid, indices2_values, &indices2);
-
-  std::shared_ptr<Array> indices3;
-  vector<int64_t> indices3_values = {1, 2, 0, 0, 2, 0};
-  ArrayFromVector<Int64Type, int64_t>(is_valid, indices3_values, &indices3);
+  vector<int16_t> indices_values = {1, 2, 0, 0, 2, 0};
+  ArrayFromVector<Int16Type, int16_t>(is_valid, indices_values, &indices);
 
   std::shared_ptr<Array> arr = std::make_shared<DictionaryArray>(dict_type, indices);
-  std::shared_ptr<Array> arr2 = std::make_shared<DictionaryArray>(dict_type, indices2);
-  std::shared_ptr<Array> arr3 = std::make_shared<DictionaryArray>(dict_type, indices3);
 
   // Only checking index type for now
   ASSERT_OK(ValidateArray(*arr));
-  ASSERT_RAISES(Invalid, ValidateArray(*arr2));
-  ASSERT_OK(ValidateArray(*arr3));
+
+  // TODO(wesm) In ARROW-1199, there is now a DCHECK to compare the indices
+  // type with the dict_type. How can we test for this?
+
+  // std::shared_ptr<Array> indices2;
+  // vector<float> indices2_values = {1., 2., 0., 0., 2., 0.};
+  // ArrayFromVector<FloatType, float>(is_valid, indices2_values, &indices2);
+
+  // std::shared_ptr<Array> indices3;
+  // vector<int64_t> indices3_values = {1, 2, 0, 0, 2, 0};
+  // ArrayFromVector<Int64Type, int64_t>(is_valid, indices3_values, &indices3);
+  // std::shared_ptr<Array> arr2 = std::make_shared<DictionaryArray>(dict_type, indices2);
+  // std::shared_ptr<Array> arr3 = std::make_shared<DictionaryArray>(dict_type, indices3);
+  // ASSERT_OK(ValidateArray(*arr3));
 }
 
 // ----------------------------------------------------------------------
@@ -2003,9 +2002,9 @@ void ValidateBasicStructArray(const StructArray* result,
   ASSERT_EQ(4, result->length());
   ASSERT_OK(ValidateArray(*result));
 
-  auto list_char_arr = static_cast<ListArray*>(result->field(0).get());
-  auto char_arr = static_cast<Int8Array*>(list_char_arr->values().get());
-  auto int32_arr = static_cast<Int32Array*>(result->field(1).get());
+  auto list_char_arr = std::dynamic_pointer_cast<ListArray>(result->field(0));
+  auto char_arr = std::dynamic_pointer_cast<Int8Array>(list_char_arr->values());
+  auto int32_arr = std::dynamic_pointer_cast<Int32Array>(result->field(1));
 
   ASSERT_EQ(0, result->null_count());
   ASSERT_EQ(1, list_char_arr->null_count());
@@ -2086,7 +2085,7 @@ TEST_F(TestStructBuilder, TestAppendNull) {
 
   ASSERT_OK(ValidateArray(*result_));
 
-  ASSERT_EQ(2, static_cast<int>(result_->fields().size()));
+  ASSERT_EQ(2, static_cast<int>(result_->num_fields()));
   ASSERT_EQ(2, result_->length());
   ASSERT_EQ(2, result_->field(0)->length());
   ASSERT_EQ(2, result_->field(1)->length());

http://git-wip-us.apache.org/repos/asf/arrow/blob/84520711/cpp/src/arrow/array.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/array.cc b/cpp/src/arrow/array.cc
index c5acf3e..48a3bd5 100644
--- a/cpp/src/arrow/array.cc
+++ b/cpp/src/arrow/array.cc
@@ -24,46 +24,34 @@
 
 #include "arrow/buffer.h"
 #include "arrow/compare.h"
+#include "arrow/pretty_print.h"
 #include "arrow/status.h"
 #include "arrow/type_traits.h"
 #include "arrow/util/bit-util.h"
 #include "arrow/util/decimal.h"
 #include "arrow/util/logging.h"
+#include "arrow/util/macros.h"
 #include "arrow/visitor.h"
 #include "arrow/visitor_inline.h"
 
 namespace arrow {
 
-// When slicing, we do not know the null count of the sliced range without
-// doing some computation. To avoid doing this eagerly, we set the null count
-// to -1 (any negative number will do). When Array::null_count is called the
-// first time, the null count will be computed. See ARROW-33
-constexpr int64_t kUnknownNullCount = -1;
+using internal::ArrayData;
 
 // ----------------------------------------------------------------------
 // Base array class
 
-Array::Array(const std::shared_ptr<DataType>& type, int64_t length,
-    const std::shared_ptr<Buffer>& null_bitmap, int64_t null_count, int64_t offset)
-    : type_(type),
-      length_(length),
-      offset_(offset),
-      null_count_(null_count),
-      null_bitmap_(null_bitmap),
-      null_bitmap_data_(nullptr) {
-  if (null_count_ == 0) { null_bitmap_ = nullptr; }
-  if (null_bitmap_) { null_bitmap_data_ = null_bitmap_->data(); }
-}
-
 int64_t Array::null_count() const {
-  if (null_count_ < 0) {
-    if (null_bitmap_) {
-      null_count_ = length_ - CountSetBits(null_bitmap_data_, offset_, length_);
+  if (ARROW_PREDICT_FALSE(data_->null_count < 0)) {
+    if (data_->buffers[0]) {
+      data_->null_count =
+          data_->length - CountSetBits(null_bitmap_data_, data_->offset, data_->length);
+
     } else {
-      null_count_ = 0;
+      data_->null_count = 0;
     }
   }
-  return null_count_;
+  return data_->null_count;
 }
 
 bool Array::Equals(const Array& arr) const {
@@ -115,15 +103,34 @@ static inline void ConformSliceParams(
 }
 
 std::shared_ptr<Array> Array::Slice(int64_t offset) const {
-  int64_t slice_length = length_ - offset;
+  int64_t slice_length = data_->length - offset;
   return Slice(offset, slice_length);
 }
 
-NullArray::NullArray(int64_t length) : Array(null(), length, nullptr, length) {}
+std::ostream& operator<<(std::ostream& os, const Array& x) {
+  DCHECK(PrettyPrint(x, 0, &os).ok());
+  return os;
+}
+
+static inline std::shared_ptr<ArrayData> SliceData(
+    const ArrayData& data, int64_t offset, int64_t length) {
+  ConformSliceParams(data.offset, data.length, &offset, &length);
+
+  auto new_data = data.ShallowCopy();
+  new_data->length = length;
+  new_data->offset = offset;
+  new_data->null_count = kUnknownNullCount;
+  return new_data;
+}
+
+NullArray::NullArray(int64_t length) {
+  BufferVector buffers = {nullptr};
+  SetData(std::make_shared<ArrayData>(null(), length, std::move(buffers), length));
+}
 
 std::shared_ptr<Array> NullArray::Slice(int64_t offset, int64_t length) const {
-  DCHECK_LE(offset, length_);
-  length = std::min(length_ - offset, length);
+  DCHECK_LE(offset, data_->length);
+  length = std::min(data_->length - offset, length);
   return std::make_shared<NullArray>(length);
 }
 
@@ -132,40 +139,78 @@ std::shared_ptr<Array> NullArray::Slice(int64_t offset, int64_t length) const {
 
 PrimitiveArray::PrimitiveArray(const std::shared_ptr<DataType>& type, int64_t length,
     const std::shared_ptr<Buffer>& data, const std::shared_ptr<Buffer>& null_bitmap,
-    int64_t null_count, int64_t offset)
-    : Array(type, length, null_bitmap, null_count, offset) {
-  data_ = data;
-  raw_data_ = data == nullptr ? nullptr : data_->data();
+    int64_t null_count, int64_t offset) {
+  BufferVector buffers = {null_bitmap, data};
+  SetData(
+      std::make_shared<ArrayData>(type, length, std::move(buffers), null_count, offset));
+}
+
+template <typename T>
+NumericArray<T>::NumericArray(const std::shared_ptr<internal::ArrayData>& data)
+    : PrimitiveArray(data) {
+  DCHECK_EQ(data->type->id(), T::type_id);
 }
 
 template <typename T>
 std::shared_ptr<Array> NumericArray<T>::Slice(int64_t offset, int64_t length) const {
-  ConformSliceParams(offset_, length_, &offset, &length);
-  return std::make_shared<NumericArray<T>>(
-      type_, length, data_, null_bitmap_, kUnknownNullCount, offset);
+  return std::make_shared<NumericArray<T>>(SliceData(*data_, offset, length));
 }
 
 // ----------------------------------------------------------------------
 // BooleanArray
 
+BooleanArray::BooleanArray(const std::shared_ptr<internal::ArrayData>& data)
+    : PrimitiveArray(data) {
+  DCHECK_EQ(data->type->id(), Type::BOOL);
+}
+
 BooleanArray::BooleanArray(int64_t length, const std::shared_ptr<Buffer>& data,
     const std::shared_ptr<Buffer>& null_bitmap, int64_t null_count, int64_t offset)
-    : PrimitiveArray(std::make_shared<BooleanType>(), length, data, null_bitmap,
-          null_count, offset) {}
+    : PrimitiveArray(boolean(), length, data, null_bitmap, null_count, offset) {}
 
 std::shared_ptr<Array> BooleanArray::Slice(int64_t offset, int64_t length) const {
-  ConformSliceParams(offset_, length_, &offset, &length);
-  return std::make_shared<BooleanArray>(
-      length, data_, null_bitmap_, kUnknownNullCount, offset);
+  return std::make_shared<BooleanArray>(SliceData(*data_, offset, length));
 }
 
 // ----------------------------------------------------------------------
 // ListArray
 
+ListArray::ListArray(const std::shared_ptr<ArrayData>& data) {
+  DCHECK_EQ(data->type->id(), Type::LIST);
+  SetData(data);
+}
+
+ListArray::ListArray(const std::shared_ptr<DataType>& type, int64_t length,
+    const std::shared_ptr<Buffer>& value_offsets, const std::shared_ptr<Array>& values,
+    const std::shared_ptr<Buffer>& null_bitmap, int64_t null_count, int64_t offset) {
+  BufferVector buffers = {null_bitmap, value_offsets};
+  auto internal_data =
+      std::make_shared<ArrayData>(type, length, std::move(buffers), null_count, offset);
+  internal_data->child_data.emplace_back(values->data());
+  SetData(internal_data);
+}
+
+void ListArray::SetData(const std::shared_ptr<ArrayData>& data) {
+  this->Array::SetData(data);
+  auto value_offsets = data->buffers[1];
+  raw_value_offsets_ = value_offsets == nullptr
+                           ? nullptr
+                           : reinterpret_cast<const int32_t*>(value_offsets->data());
+  DCHECK(internal::MakeArray(data_->child_data[0], &values_).ok());
+}
+
+std::shared_ptr<DataType> ListArray::value_type() const {
+  return static_cast<const ListType&>(*type()).value_type();
+}
+
+std::shared_ptr<Array> ListArray::values() const {
+  return values_;
+}
+
 std::shared_ptr<Array> ListArray::Slice(int64_t offset, int64_t length) const {
-  ConformSliceParams(offset_, length_, &offset, &length);
-  return std::make_shared<ListArray>(
-      type_, length, value_offsets_, values_, null_bitmap_, kUnknownNullCount, offset);
+  ConformSliceParams(data_->offset, data_->length, &offset, &length);
+  return std::make_shared<ListArray>(type(), length, value_offsets(), values(),
+      null_bitmap(), kUnknownNullCount, offset);
 }
 
 // ----------------------------------------------------------------------
@@ -174,6 +219,21 @@ std::shared_ptr<Array> ListArray::Slice(int64_t offset, int64_t length) const {
 static std::shared_ptr<DataType> kBinary = std::make_shared<BinaryType>();
 static std::shared_ptr<DataType> kString = std::make_shared<StringType>();
 
+BinaryArray::BinaryArray(const std::shared_ptr<internal::ArrayData>& data) {
+  DCHECK_EQ(data->type->id(), Type::BINARY);
+  SetData(data);
+}
+
+void BinaryArray::SetData(const std::shared_ptr<ArrayData>& data) {
+  auto value_offsets = data->buffers[1];
+  auto value_data = data->buffers[2];
+  this->Array::SetData(data);
+  raw_data_ = value_data == nullptr ? nullptr : value_data->data();
+  raw_value_offsets_ = value_offsets == nullptr
+                           ? nullptr
+                           : reinterpret_cast<const int32_t*>(value_offsets->data());
+}
+
 BinaryArray::BinaryArray(int64_t length, const std::shared_ptr<Buffer>& value_offsets,
     const std::shared_ptr<Buffer>& data, const std::shared_ptr<Buffer>& null_bitmap,
     int64_t null_count, int64_t offset)
@@ -182,22 +242,19 @@ BinaryArray::BinaryArray(int64_t length, const std::shared_ptr<Buffer>& value_of
 
 BinaryArray::BinaryArray(const std::shared_ptr<DataType>& type, int64_t length,
     const std::shared_ptr<Buffer>& value_offsets, const std::shared_ptr<Buffer>& data,
-    const std::shared_ptr<Buffer>& null_bitmap, int64_t null_count, int64_t offset)
-    : Array(type, length, null_bitmap, null_count, offset),
-      value_offsets_(value_offsets),
-      raw_value_offsets_(nullptr),
-      data_(data),
-      raw_data_(nullptr) {
-  if (value_offsets_ != nullptr) {
-    raw_value_offsets_ = reinterpret_cast<const int32_t*>(value_offsets_->data());
-  }
-  if (data_ != nullptr) { raw_data_ = data_->data(); }
+    const std::shared_ptr<Buffer>& null_bitmap, int64_t null_count, int64_t offset) {
+  BufferVector buffers = {null_bitmap, value_offsets, data};
+  SetData(
+      std::make_shared<ArrayData>(type, length, std::move(buffers), null_count, offset));
 }
 
 std::shared_ptr<Array> BinaryArray::Slice(int64_t offset, int64_t length) const {
-  ConformSliceParams(offset_, length_, &offset, &length);
-  return std::make_shared<BinaryArray>(
-      length, value_offsets_, data_, null_bitmap_, kUnknownNullCount, offset);
+  return std::make_shared<BinaryArray>(SliceData(*data_, offset, length));
+}
+
+StringArray::StringArray(const std::shared_ptr<internal::ArrayData>& data) {
+  DCHECK_EQ(data->type->id(), Type::STRING);
+  SetData(data);
 }
 
 StringArray::StringArray(int64_t length, const std::shared_ptr<Buffer>& value_offsets,
@@ -207,14 +264,18 @@ StringArray::StringArray(int64_t length, const std::shared_ptr<Buffer>& value_of
 }
 
 std::shared_ptr<Array> StringArray::Slice(int64_t offset, int64_t length) const {
-  ConformSliceParams(offset_, length_, &offset, &length);
-  return std::make_shared<StringArray>(
-      length, value_offsets_, data_, null_bitmap_, kUnknownNullCount, offset);
+  return std::make_shared<StringArray>(SliceData(*data_, offset, length));
 }
 
 // ----------------------------------------------------------------------
 // Fixed width binary
 
+FixedSizeBinaryArray::FixedSizeBinaryArray(
+    const std::shared_ptr<internal::ArrayData>& data) {
+  DCHECK_EQ(data->type->id(), Type::FIXED_SIZE_BINARY);
+  SetData(data);
+}
+
 FixedSizeBinaryArray::FixedSizeBinaryArray(const std::shared_ptr<DataType>& type,
     int64_t length, const std::shared_ptr<Buffer>& data,
     const std::shared_ptr<Buffer>& null_bitmap, int64_t null_count, int64_t offset)
@@ -222,34 +283,52 @@ FixedSizeBinaryArray::FixedSizeBinaryArray(const std::shared_ptr<DataType>& type
       byte_width_(static_cast<const FixedSizeBinaryType&>(*type).byte_width()) {}
 
 std::shared_ptr<Array> FixedSizeBinaryArray::Slice(int64_t offset, int64_t length) const {
-  ConformSliceParams(offset_, length_, &offset, &length);
-  return std::make_shared<FixedSizeBinaryArray>(
-      type_, length, data_, null_bitmap_, kUnknownNullCount, offset);
+  return std::make_shared<FixedSizeBinaryArray>(SliceData(*data_, offset, length));
 }
 
 const uint8_t* FixedSizeBinaryArray::GetValue(int64_t i) const {
-  return raw_data_ + (i + offset_) * byte_width_;
+  return raw_values_ + (i + data_->offset) * byte_width_;
 }
 
 // ----------------------------------------------------------------------
 // Decimal
+
+DecimalArray::DecimalArray(const std::shared_ptr<internal::ArrayData>& data) {
+  DCHECK_EQ(data->type->id(), Type::DECIMAL);
+  SetData(data);
+}
+
+void DecimalArray::SetData(const std::shared_ptr<ArrayData>& data) {
+  auto fixed_size_data = data->buffers[1];
+  auto sign_bitmap = data->buffers[2];
+  this->Array::SetData(data);
+
+  raw_values_ = fixed_size_data != nullptr ? fixed_size_data->data() : nullptr;
+  sign_bitmap_data_ = sign_bitmap != nullptr ? sign_bitmap->data() : nullptr;
+}
+
 DecimalArray::DecimalArray(const std::shared_ptr<DataType>& type, int64_t length,
     const std::shared_ptr<Buffer>& data, const std::shared_ptr<Buffer>& null_bitmap,
-    int64_t null_count, int64_t offset, const std::shared_ptr<Buffer>& sign_bitmap)
-    : FixedSizeBinaryArray(type, length, data, null_bitmap, null_count, offset),
-      sign_bitmap_(sign_bitmap),
-      sign_bitmap_data_(sign_bitmap != nullptr ? sign_bitmap->data() : nullptr) {}
+    int64_t null_count, int64_t offset, const std::shared_ptr<Buffer>& sign_bitmap) {
+  BufferVector buffers = {null_bitmap, data, sign_bitmap};
+  SetData(
+      std::make_shared<ArrayData>(type, length, std::move(buffers), null_count, offset));
+}
 
 bool DecimalArray::IsNegative(int64_t i) const {
   return sign_bitmap_data_ != nullptr ? BitUtil::GetBit(sign_bitmap_data_, i) : false;
 }
 
+const uint8_t* DecimalArray::GetValue(int64_t i) const {
+  return raw_values_ + (i + data_->offset) * byte_width();
+}
+
 std::string DecimalArray::FormatValue(int64_t i) const {
-  const auto type_ = std::dynamic_pointer_cast<DecimalType>(type());
-  const int precision = type_->precision();
-  const int scale = type_->scale();
-  const int byte_width = byte_width_;
-  const uint8_t* bytes = GetValue(i);
+  const auto& type_ = static_cast<const DecimalType&>(*type());
+  const int precision = type_.precision();
+  const int scale = type_.scale();
+  const int byte_width = type_.byte_width();
+  const uint8_t* bytes = raw_values_ + (i + data_->offset) * byte_width;
   switch (byte_width) {
     case 4: {
       decimal::Decimal32 value;
@@ -274,73 +353,110 @@ std::string DecimalArray::FormatValue(int64_t i) const {
 }
 
 std::shared_ptr<Array> DecimalArray::Slice(int64_t offset, int64_t length) const {
-  ConformSliceParams(offset_, length_, &offset, &length);
-  return std::make_shared<DecimalArray>(
-      type_, length, data_, null_bitmap_, kUnknownNullCount, offset, sign_bitmap_);
+  return std::make_shared<DecimalArray>(SliceData(*data_, offset, length));
 }
 
 // ----------------------------------------------------------------------
 // Struct
 
+StructArray::StructArray(const std::shared_ptr<ArrayData>& data) {
+  DCHECK_EQ(data->type->id(), Type::STRUCT);
+  SetData(data);
+}
+
 StructArray::StructArray(const std::shared_ptr<DataType>& type, int64_t length,
     const std::vector<std::shared_ptr<Array>>& children,
-    std::shared_ptr<Buffer> null_bitmap, int64_t null_count, int64_t offset)
-    : Array(type, length, null_bitmap, null_count, offset) {
-  type_ = type;
-  children_ = children;
+    std::shared_ptr<Buffer> null_bitmap, int64_t null_count, int64_t offset) {
+  BufferVector buffers = {null_bitmap};
+  SetData(
+      std::make_shared<ArrayData>(type, length, std::move(buffers), null_count, offset));
+  for (const auto& child : children) {
+    data_->child_data.push_back(child->data());
+  }
 }
 
 std::shared_ptr<Array> StructArray::field(int pos) const {
-  DCHECK_GT(children_.size(), 0);
-  return children_[pos];
+  std::shared_ptr<Array> result;
+  DCHECK(internal::MakeArray(data_->child_data[pos], &result).ok());
+  return result;
 }
 
 std::shared_ptr<Array> StructArray::Slice(int64_t offset, int64_t length) const {
-  ConformSliceParams(offset_, length_, &offset, &length);
-  return std::make_shared<StructArray>(
-      type_, length, children_, null_bitmap_, kUnknownNullCount, offset);
+  return std::make_shared<StructArray>(SliceData(*data_, offset, length));
 }
 
 // ----------------------------------------------------------------------
 // UnionArray
 
+void UnionArray::SetData(const std::shared_ptr<ArrayData>& data) {
+  this->Array::SetData(data);
+
+  auto type_ids = data_->buffers[1];
+  auto value_offsets = data_->buffers[2];
+  raw_type_ids_ =
+      type_ids == nullptr ? nullptr : reinterpret_cast<const uint8_t*>(type_ids->data());
+  raw_value_offsets_ = value_offsets == nullptr
+                           ? nullptr
+                           : reinterpret_cast<const int32_t*>(value_offsets->data());
+}
+
+UnionArray::UnionArray(const std::shared_ptr<ArrayData>& data) {
+  DCHECK_EQ(data->type->id(), Type::UNION);
+  SetData(data);
+}
+
 UnionArray::UnionArray(const std::shared_ptr<DataType>& type, int64_t length,
     const std::vector<std::shared_ptr<Array>>& children,
     const std::shared_ptr<Buffer>& type_ids, const std::shared_ptr<Buffer>& value_offsets,
-    const std::shared_ptr<Buffer>& null_bitmap, int64_t null_count, int64_t offset)
-    : Array(type, length, null_bitmap, null_count, offset),
-      children_(children),
-      type_ids_(type_ids),
-      raw_type_ids_(nullptr),
-      value_offsets_(value_offsets),
-      raw_value_offsets_(nullptr) {
-  if (type_ids) { raw_type_ids_ = reinterpret_cast<const uint8_t*>(type_ids->data()); }
-  if (value_offsets) {
-    raw_value_offsets_ = reinterpret_cast<const int32_t*>(value_offsets->data());
+    const std::shared_ptr<Buffer>& null_bitmap, int64_t null_count, int64_t offset) {
+  BufferVector buffers = {null_bitmap, type_ids, value_offsets};
+  auto internal_data =
+      std::make_shared<ArrayData>(type, length, std::move(buffers), null_count, offset);
+  for (const auto& child : children) {
+    internal_data->child_data.push_back(child->data());
   }
+  SetData(internal_data);
 }
 
 std::shared_ptr<Array> UnionArray::child(int pos) const {
-  DCHECK_GT(children_.size(), 0);
-  return children_[pos];
+  std::shared_ptr<Array> result;
+  DCHECK(internal::MakeArray(data_->child_data[pos], &result).ok());
+  return result;
 }
 
 std::shared_ptr<Array> UnionArray::Slice(int64_t offset, int64_t length) const {
-  ConformSliceParams(offset_, length_, &offset, &length);
-  return std::make_shared<UnionArray>(type_, length, children_, type_ids_, value_offsets_,
-      null_bitmap_, kUnknownNullCount, offset);
+  return std::make_shared<UnionArray>(SliceData(*data_, offset, length));
 }
 
 // ----------------------------------------------------------------------
 // DictionaryArray
 
+DictionaryArray::DictionaryArray(const std::shared_ptr<ArrayData>& data)
+    : dict_type_(static_cast<const DictionaryType*>(data->type.get())) {
+  DCHECK_EQ(data->type->id(), Type::DICTIONARY);
+  SetData(data);
+}
+
 DictionaryArray::DictionaryArray(
     const std::shared_ptr<DataType>& type, const std::shared_ptr<Array>& indices)
-    : Array(type, indices->length(), indices->null_bitmap(), indices->null_count(),
-          indices->offset()),
-      dict_type_(static_cast<const DictionaryType*>(type.get())),
-      indices_(indices) {
+    : dict_type_(static_cast<const DictionaryType*>(type.get())) {
   DCHECK_EQ(type->id(), Type::DICTIONARY);
+  DCHECK_EQ(indices->type_id(), dict_type_->index_type()->id());
+  auto data = indices->data()->ShallowCopy();
+  data->type = type;
+  SetData(data);
+}
+
+void DictionaryArray::SetData(const std::shared_ptr<ArrayData>& data) {
+  this->Array::SetData(data);
+  auto indices_data = data_->ShallowCopy();
+  indices_data->type = dict_type_->index_type();
+  std::shared_ptr<Array> result;
+  DCHECK(internal::MakeArray(indices_data, &indices_).ok());
+}
+
+std::shared_ptr<Array> DictionaryArray::indices() const {
+  return indices_;
 }
 
 std::shared_ptr<Array> DictionaryArray::dictionary() const {
@@ -348,8 +464,7 @@ std::shared_ptr<Array> DictionaryArray::dictionary() const {
 }
 
 std::shared_ptr<Array> DictionaryArray::Slice(int64_t offset, int64_t length) const {
-  std::shared_ptr<Array> sliced_indices = indices_->Slice(offset, length);
-  return std::make_shared<DictionaryArray>(type_, sliced_indices);
+  return std::make_shared<DictionaryArray>(SliceData(*data_, offset, length));
 }
 
 // ----------------------------------------------------------------------
@@ -367,6 +482,8 @@ struct ValidateVisitor {
 
   Status Visit(const PrimitiveArray& array) { return Status::OK(); }
 
+  Status Visit(const DecimalArray& array) { return Status::OK(); }
+
   Status Visit(const BinaryArray& array) {
     // TODO(wesm): what to do here?
     return Status::OK();
@@ -435,11 +552,12 @@ struct ValidateVisitor {
       return Status::Invalid("Null count exceeds the length of this struct");
     }
 
-    if (array.fields().size() > 0) {
+    if (array.num_fields() > 0) {
       // Validate fields
-      int64_t array_length = array.fields()[0]->length();
+      int64_t array_length = array.field(0)->length();
       size_t idx = 0;
-      for (auto it : array.fields()) {
+      for (int i = 0; i < array.num_fields(); ++i) {
+        auto it = array.field(i);
         if (it->length() != array_length) {
           std::stringstream ss;
           ss << "Length is not equal from field " << it->type()->ToString()
@@ -488,6 +606,51 @@ Status ValidateArray(const Array& array) {
 }
 
 // ----------------------------------------------------------------------
+// Loading from ArrayData
+
+namespace internal {
+
+class ArrayDataWrapper {
+ public:
+  ArrayDataWrapper(const std::shared_ptr<ArrayData>& data, std::shared_ptr<Array>* out)
+      : data_(data), out_(out) {}
+
+  template <typename T>
+  Status Visit(const T& type) {
+    using ArrayType = typename TypeTraits<T>::ArrayType;
+    *out_ = std::make_shared<ArrayType>(data_);
+    return Status::OK();
+  }
+
+  const std::shared_ptr<ArrayData>& data_;
+  std::shared_ptr<Array>* out_;
+};
+
+Status MakeArray(const std::shared_ptr<ArrayData>& data, std::shared_ptr<Array>* out) {
+  ArrayDataWrapper wrapper_visitor(data, out);
+  return VisitTypeInline(*data->type, &wrapper_visitor);
+}
+
+}  // namespace internal
+
+Status MakePrimitiveArray(const std::shared_ptr<DataType>& type, int64_t length,
+    const std::shared_ptr<Buffer>& data, const std::shared_ptr<Buffer>& null_bitmap,
+    int64_t null_count, int64_t offset, std::shared_ptr<Array>* out) {
+  BufferVector buffers = {null_bitmap, data};
+  auto internal_data = std::make_shared<internal::ArrayData>(
+      type, length, std::move(buffers), null_count, offset);
+  return internal::MakeArray(internal_data, out);
+}
+
+Status MakePrimitiveArray(const std::shared_ptr<DataType>& type,
+    const std::vector<std::shared_ptr<Buffer>>& buffers, int64_t length,
+    int64_t null_count, int64_t offset, std::shared_ptr<Array>* out) {
+  auto internal_data =
+      std::make_shared<internal::ArrayData>(type, length, buffers, null_count, offset);
+  return internal::MakeArray(internal_data, out);
+}
+
+// ----------------------------------------------------------------------
 // Instantiate templates
 
 template class ARROW_TEMPLATE_EXPORT NumericArray<UInt8Type>;

http://git-wip-us.apache.org/repos/asf/arrow/blob/84520711/cpp/src/arrow/array.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/array.h b/cpp/src/arrow/array.h
index 59269ad..80284cd 100644
--- a/cpp/src/arrow/array.h
+++ b/cpp/src/arrow/array.h
@@ -35,13 +35,125 @@
 
 namespace arrow {
 
+using BufferVector = std::vector<std::shared_ptr<Buffer>>;
+
+// When slicing, we do not know the null count of the sliced range without
+// doing some computation. To avoid doing this eagerly, we set the null count
+// to -1 (any negative number will do). When Array::null_count is called the
+// first time, the null count will be computed. See ARROW-33
+constexpr int64_t kUnknownNullCount = -1;
+
 class MemoryPool;
-class MutableBuffer;
 class Status;
 
 template <typename T>
 struct Decimal;
 
+// ----------------------------------------------------------------------
+// Generic array data container
+
+namespace internal {
+
+/// \brief Mutable internal container for generic Arrow array data
+///
+/// This data structure is a self-contained representation of the memory and
+/// metadata inside an Arrow array data structure (called vectors in Java). The
+/// classes arrow::Array and its subclasses provide strongly-typed accessors
+/// with support for the visitor pattern and other affordances.
+///
+/// This class is designed for easy internal data manipulation, analytical data
+/// processing, and data transport to and from IPC messages. For example, we
+/// could cast from int64 to float64 like so:
+///
+/// Int64Array arr = GetMyData();
+/// auto new_data = arr->data()->ShallowCopy();
+/// new_data->type = arrow::float64();
+/// Float64Array double_arr(new_data);
+///
+/// This object is also useful in an analytics setting where memory may be
+/// reused. For example, if we had a group of operations all returning doubles,
+/// say:
+///
+/// Log(Sqrt(Expr(arr))
+///
+/// Then the low-level implementations of each of these functions could have
+/// the signatures
+///
+/// void Log(const ArrayData& values, ArrayData* out);
+///
+/// As another example a function may consume one or more memory buffers in an
+/// input array and replace them with newly-allocated data, changing the output
+/// data type as well.
+struct ARROW_EXPORT ArrayData {
+  ArrayData() {}
+
+  ArrayData(const std::shared_ptr<DataType>& type, int64_t length,
+      const std::vector<std::shared_ptr<Buffer>>& buffers,
+      int64_t null_count = kUnknownNullCount, int64_t offset = 0)
+      : type(type),
+        length(length),
+        buffers(buffers),
+        null_count(null_count),
+        offset(offset) {}
+
+  ArrayData(const std::shared_ptr<DataType>& type, int64_t length,
+      std::vector<std::shared_ptr<Buffer>>&& buffers,
+      int64_t null_count = kUnknownNullCount, int64_t offset = 0)
+      : type(type),
+        length(length),
+        buffers(std::move(buffers)),
+        null_count(null_count),
+        offset(offset) {}
+
+  // Move constructor
+  ArrayData(ArrayData&& other) noexcept
+      : type(std::move(other.type)),
+        length(other.length),
+        buffers(std::move(other.buffers)),
+        null_count(other.null_count),
+        offset(other.offset),
+        child_data(std::move(other.child_data)) {}
+
+  ArrayData(const ArrayData& other) noexcept
+      : type(other.type),
+        length(other.length),
+        buffers(other.buffers),
+        null_count(other.null_count),
+        offset(other.offset),
+        child_data(other.child_data) {}
+
+  // Move assignment
+  ArrayData& operator=(ArrayData&& other) {
+    type = std::move(other.type);
+    length = other.length;
+    buffers = std::move(other.buffers);
+    null_count = other.null_count;
+    offset = other.offset;
+    child_data = std::move(other.child_data);
+    return *this;
+  }
+
+  std::shared_ptr<ArrayData> ShallowCopy() const {
+    return std::make_shared<ArrayData>(*this);
+  }
+
+  std::shared_ptr<DataType> type;
+  int64_t length;
+  std::vector<std::shared_ptr<Buffer>> buffers;
+  int64_t null_count;
+  int64_t offset;
+  std::vector<std::shared_ptr<ArrayData>> child_data;
+};
+
+Status ARROW_EXPORT MakeArray(
+    const std::shared_ptr<ArrayData>& data, std::shared_ptr<Array>* out);
+
+}  // namespace internal
+
+// ----------------------------------------------------------------------
+// User array accessor types
+
+/// \brief Array base type
 /// Immutable data array with some logical type and some length.
 ///
 /// Any memory is owned by the respective Buffer instance (or its parents).
@@ -54,24 +166,20 @@ struct Decimal;
 /// be computed on the first call to null_count()
 class ARROW_EXPORT Array {
  public:
-  Array(const std::shared_ptr<DataType>& type, int64_t length,
-      const std::shared_ptr<Buffer>& null_bitmap = nullptr, int64_t null_count = 0,
-      int64_t offset = 0);
-
   virtual ~Array() = default;
 
   /// Determine if a slot is null. For inner loops. Does *not* boundscheck
   bool IsNull(int64_t i) const {
     return null_bitmap_data_ != nullptr &&
-           BitUtil::BitNotSet(null_bitmap_data_, i + offset_);
+           BitUtil::BitNotSet(null_bitmap_data_, i + data_->offset);
   }
 
   /// Size in the number of elements this array contains.
-  int64_t length() const { return length_; }
+  int64_t length() const { return data_->length; }
 
   /// A relative position into another array's data, to enable zero-copy
   /// slicing. This value defaults to zero
-  int64_t offset() const { return offset_; }
+  int64_t offset() const { return data_->offset; }
 
   /// The number of null entries in the array. If the null count was not known
   /// at time of construction (and set to a negative value), then the null
@@ -79,14 +187,14 @@ class ARROW_EXPORT Array {
   /// function
   int64_t null_count() const;
 
-  std::shared_ptr<DataType> type() const { return type_; }
-  Type::type type_id() const { return type_->id(); }
+  std::shared_ptr<DataType> type() const { return data_->type; }
+  Type::type type_id() const { return data_->type->id(); }
 
   /// Buffer for the null bitmap.
   ///
   /// Note that for `null_count == 0`, this can be a `nullptr`.
   /// This buffer does not account for any slice offset
-  std::shared_ptr<Buffer> null_bitmap() const { return null_bitmap_; }
+  std::shared_ptr<Buffer> null_bitmap() const { return data_->buffers[0]; }
 
   /// Raw pointer to the null bitmap.
   ///
@@ -124,49 +232,77 @@ class ARROW_EXPORT Array {
   /// Slice from offset until end of the array
   std::shared_ptr<Array> Slice(int64_t offset) const;
 
- protected:
-  std::shared_ptr<DataType> type_;
-  int64_t length_;
-  int64_t offset_;
+  std::shared_ptr<internal::ArrayData> data() const { return data_; }
+
+  int num_fields() const { return static_cast<int>(data_->child_data.size()); }
 
-  // This member is marked mutable so that it can be modified when null_count()
-  // is called from a const context and the null count has to be computed (if
-  // it is not already known)
-  mutable int64_t null_count_;
+ protected:
+  Array() {}
 
-  std::shared_ptr<Buffer> null_bitmap_;
+  std::shared_ptr<internal::ArrayData> data_;
   const uint8_t* null_bitmap_data_;
 
+  /// Protected method for constructors
+  inline void SetData(const std::shared_ptr<internal::ArrayData>& data) {
+    if (data->buffers.size() > 0 && data->buffers[0]) {
+      null_bitmap_data_ = data->buffers[0]->data();
+    } else {
+      null_bitmap_data_ = nullptr;
+    }
+    data_ = data;
+  }
+
  private:
-  Array() {}
   DISALLOW_COPY_AND_ASSIGN(Array);
 };
 
+ARROW_EXPORT std::ostream& operator<<(std::ostream& os, const Array& x);
+
+class ARROW_EXPORT FlatArray : public Array {
+ protected:
+  using Array::Array;
+};
+
 /// Degenerate null type Array
-class ARROW_EXPORT NullArray : public Array {
+class ARROW_EXPORT NullArray : public FlatArray {
  public:
   using TypeClass = NullType;
 
+  explicit NullArray(const std::shared_ptr<internal::ArrayData>& data) { SetData(data); }
+
   explicit NullArray(int64_t length);
 
   std::shared_ptr<Array> Slice(int64_t offset, int64_t length) const override;
 };
 
 /// Base class for fixed-size logical types
-class ARROW_EXPORT PrimitiveArray : public Array {
+class ARROW_EXPORT PrimitiveArray : public FlatArray {
  public:
   PrimitiveArray(const std::shared_ptr<DataType>& type, int64_t length,
       const std::shared_ptr<Buffer>& data,
       const std::shared_ptr<Buffer>& null_bitmap = nullptr, int64_t null_count = 0,
       int64_t offset = 0);
 
-  /// The memory containing this array's data
-  /// This buffer does not account for any slice offset
-  std::shared_ptr<Buffer> data() const { return data_; }
+  /// Does not account for any slice offset
+  std::shared_ptr<Buffer> values() const { return data_->buffers[1]; }
+
+  /// Does not account for any slice offset
+  const uint8_t* raw_values() const { return raw_values_; }
 
  protected:
-  std::shared_ptr<Buffer> data_;
-  const uint8_t* raw_data_;
+  PrimitiveArray() {}
+
+  inline void SetData(const std::shared_ptr<internal::ArrayData>& data) {
+    auto values = data->buffers[1];
+    this->Array::SetData(data);
+    raw_values_ = values == nullptr ? nullptr : values->data();
+  }
+
+  explicit inline PrimitiveArray(const std::shared_ptr<internal::ArrayData>& data) {
+    SetData(data);
+  }
+
+  const uint8_t* raw_values_;
 };
 
 template <typename TYPE>
@@ -175,7 +311,7 @@ class ARROW_EXPORT NumericArray : public PrimitiveArray {
   using TypeClass = TYPE;
   using value_type = typename TypeClass::c_type;
 
-  using PrimitiveArray::PrimitiveArray;
+  explicit NumericArray(const std::shared_ptr<internal::ArrayData>& data);
 
   // Only enable this constructor without a type argument for types without additional
   // metadata
@@ -188,20 +324,23 @@ class ARROW_EXPORT NumericArray : public PrimitiveArray {
       : PrimitiveArray(TypeTraits<T1>::type_singleton(), length, data, null_bitmap,
             null_count, offset) {}
 
-  const value_type* raw_data() const {
-    return reinterpret_cast<const value_type*>(raw_data_) + offset_;
+  const value_type* raw_values() const {
+    return reinterpret_cast<const value_type*>(raw_values_) + data_->offset;
   }
 
   std::shared_ptr<Array> Slice(int64_t offset, int64_t length) const override;
 
-  value_type Value(int64_t i) const { return raw_data()[i]; }
+  value_type Value(int64_t i) const { return raw_values()[i]; }
+
+ protected:
+  using PrimitiveArray::PrimitiveArray;
 };
 
 class ARROW_EXPORT BooleanArray : public PrimitiveArray {
  public:
   using TypeClass = BooleanType;
 
-  using PrimitiveArray::PrimitiveArray;
+  explicit BooleanArray(const std::shared_ptr<internal::ArrayData>& data);
 
   BooleanArray(int64_t length, const std::shared_ptr<Buffer>& data,
       const std::shared_ptr<Buffer>& null_bitmap = nullptr, int64_t null_count = 0,
@@ -210,8 +349,12 @@ class ARROW_EXPORT BooleanArray : public PrimitiveArray {
   std::shared_ptr<Array> Slice(int64_t offset, int64_t length) const override;
 
   bool Value(int64_t i) const {
-    return BitUtil::GetBit(reinterpret_cast<const uint8_t*>(raw_data_), i + offset_);
+    return BitUtil::GetBit(
+        reinterpret_cast<const uint8_t*>(raw_values_), i + data_->offset);
   }
+
+ protected:
+  using PrimitiveArray::PrimitiveArray;
 };
 
 // ----------------------------------------------------------------------
@@ -221,52 +364,50 @@ class ARROW_EXPORT ListArray : public Array {
  public:
   using TypeClass = ListType;
 
+  explicit ListArray(const std::shared_ptr<internal::ArrayData>& data);
+
   ListArray(const std::shared_ptr<DataType>& type, int64_t length,
       const std::shared_ptr<Buffer>& value_offsets, const std::shared_ptr<Array>& values,
       const std::shared_ptr<Buffer>& null_bitmap = nullptr, int64_t null_count = 0,
-      int64_t offset = 0)
-      : Array(type, length, null_bitmap, null_count, offset) {
-    value_offsets_ = value_offsets;
-    raw_value_offsets_ = value_offsets == nullptr
-                             ? nullptr
-                             : reinterpret_cast<const int32_t*>(value_offsets_->data());
-    values_ = values;
-  }
+      int64_t offset = 0);
 
-  // Return a shared pointer in case the requestor desires to share ownership
-  // with this array.
-  std::shared_ptr<Array> values() const { return values_; }
+  /// \brief Return array object containing the list's values
+  std::shared_ptr<Array> values() const;
 
   /// Note that this buffer does not account for any slice offset
-  std::shared_ptr<Buffer> value_offsets() const { return value_offsets_; }
+  std::shared_ptr<Buffer> value_offsets() const { return data_->buffers[1]; }
 
-  std::shared_ptr<DataType> value_type() const { return values_->type(); }
+  std::shared_ptr<DataType> value_type() const;
 
   /// Return pointer to raw value offsets accounting for any slice offset
-  const int32_t* raw_value_offsets() const { return raw_value_offsets_ + offset_; }
+  const int32_t* raw_value_offsets() const { return raw_value_offsets_ + data_->offset; }
 
   // Neither of these functions will perform boundschecking
-  int32_t value_offset(int64_t i) const { return raw_value_offsets_[i + offset_]; }
+  int32_t value_offset(int64_t i) const { return raw_value_offsets_[i + data_->offset]; }
   int32_t value_length(int64_t i) const {
-    i += offset_;
+    i += data_->offset;
     return raw_value_offsets_[i + 1] - raw_value_offsets_[i];
   }
 
   std::shared_ptr<Array> Slice(int64_t offset, int64_t length) const override;
 
  protected:
-  std::shared_ptr<Buffer> value_offsets_;
+  void SetData(const std::shared_ptr<internal::ArrayData>& data);
   const int32_t* raw_value_offsets_;
+
+ private:
   std::shared_ptr<Array> values_;
 };
 
 // ----------------------------------------------------------------------
 // Binary and String
 
-class ARROW_EXPORT BinaryArray : public Array {
+class ARROW_EXPORT BinaryArray : public FlatArray {
  public:
   using TypeClass = BinaryType;
 
+  explicit BinaryArray(const std::shared_ptr<internal::ArrayData>& data);
+
   BinaryArray(int64_t length, const std::shared_ptr<Buffer>& value_offsets,
       const std::shared_ptr<Buffer>& data,
       const std::shared_ptr<Buffer>& null_bitmap = nullptr, int64_t null_count = 0,
@@ -277,7 +418,7 @@ class ARROW_EXPORT BinaryArray : public Array {
   // pointer + offset
   const uint8_t* GetValue(int64_t i, int32_t* out_length) const {
     // Account for base offset
-    i += offset_;
+    i += data_->offset;
 
     const int32_t pos = raw_value_offsets_[i];
     *out_length = raw_value_offsets_[i + 1] - pos;
@@ -285,23 +426,29 @@ class ARROW_EXPORT BinaryArray : public Array {
   }
 
   /// Note that this buffer does not account for any slice offset
-  std::shared_ptr<Buffer> data() const { return data_; }
+  std::shared_ptr<Buffer> value_offsets() const { return data_->buffers[1]; }
 
   /// Note that this buffer does not account for any slice offset
-  std::shared_ptr<Buffer> value_offsets() const { return value_offsets_; }
+  std::shared_ptr<Buffer> value_data() const { return data_->buffers[2]; }
 
-  const int32_t* raw_value_offsets() const { return raw_value_offsets_ + offset_; }
+  const int32_t* raw_value_offsets() const { return raw_value_offsets_ + data_->offset; }
 
   // Neither of these functions will perform boundschecking
-  int32_t value_offset(int64_t i) const { return raw_value_offsets_[i + offset_]; }
+  int32_t value_offset(int64_t i) const { return raw_value_offsets_[i + data_->offset]; }
   int32_t value_length(int64_t i) const {
-    i += offset_;
+    i += data_->offset;
     return raw_value_offsets_[i + 1] - raw_value_offsets_[i];
   }
 
   std::shared_ptr<Array> Slice(int64_t offset, int64_t length) const override;
 
  protected:
+  // For subclasses
+  BinaryArray() {}
+
+  /// Protected method for constructors
+  void SetData(const std::shared_ptr<internal::ArrayData>& data);
+
   // Constructor that allows sub-classes/builders to propagate there logical type up the
   // class hierarchy.
   BinaryArray(const std::shared_ptr<DataType>& type, int64_t length,
@@ -309,10 +456,7 @@ class ARROW_EXPORT BinaryArray : public Array {
       const std::shared_ptr<Buffer>& null_bitmap = nullptr, int64_t null_count = 0,
       int64_t offset = 0);
 
-  std::shared_ptr<Buffer> value_offsets_;
   const int32_t* raw_value_offsets_;
-
-  std::shared_ptr<Buffer> data_;
   const uint8_t* raw_data_;
 };
 
@@ -320,6 +464,8 @@ class ARROW_EXPORT StringArray : public BinaryArray {
  public:
   using TypeClass = StringType;
 
+  explicit StringArray(const std::shared_ptr<internal::ArrayData>& data);
+
   StringArray(int64_t length, const std::shared_ptr<Buffer>& value_offsets,
       const std::shared_ptr<Buffer>& data,
       const std::shared_ptr<Buffer>& null_bitmap = nullptr, int64_t null_count = 0,
@@ -343,6 +489,8 @@ class ARROW_EXPORT FixedSizeBinaryArray : public PrimitiveArray {
  public:
   using TypeClass = FixedSizeBinaryType;
 
+  explicit FixedSizeBinaryArray(const std::shared_ptr<internal::ArrayData>& data);
+
   FixedSizeBinaryArray(const std::shared_ptr<DataType>& type, int64_t length,
       const std::shared_ptr<Buffer>& data,
       const std::shared_ptr<Buffer>& null_bitmap = nullptr, int64_t null_count = 0,
@@ -352,20 +500,28 @@ class ARROW_EXPORT FixedSizeBinaryArray : public PrimitiveArray {
 
   int32_t byte_width() const { return byte_width_; }
 
-  const uint8_t* raw_data() const { return raw_data_; }
+  const uint8_t* raw_values() const { return raw_values_; }
 
   std::shared_ptr<Array> Slice(int64_t offset, int64_t length) const override;
 
  protected:
+  inline void SetData(const std::shared_ptr<internal::ArrayData>& data) {
+    this->PrimitiveArray::SetData(data);
+    byte_width_ = static_cast<const FixedSizeBinaryType&>(*type()).byte_width();
+  }
+
   int32_t byte_width_;
 };
 
 // ----------------------------------------------------------------------
 // DecimalArray
-class ARROW_EXPORT DecimalArray : public FixedSizeBinaryArray {
+class ARROW_EXPORT DecimalArray : public FlatArray {
  public:
   using TypeClass = Type;
 
+  /// \brief Construct DecimalArray from internal::ArrayData instance
+  explicit DecimalArray(const std::shared_ptr<internal::ArrayData>& data);
+
   DecimalArray(const std::shared_ptr<DataType>& type, int64_t length,
       const std::shared_ptr<Buffer>& data,
       const std::shared_ptr<Buffer>& null_bitmap = nullptr, int64_t null_count = 0,
@@ -373,13 +529,27 @@ class ARROW_EXPORT DecimalArray : public FixedSizeBinaryArray {
 
   bool IsNegative(int64_t i) const;
 
+  const uint8_t* GetValue(int64_t i) const;
+
   std::string FormatValue(int64_t i) const;
 
   std::shared_ptr<Array> Slice(int64_t offset, int64_t length) const override;
 
- private:
+  /// \brief The main decimal data
+  /// For 32/64-bit decimal this is everything
+  std::shared_ptr<Buffer> values() const { return data_->buffers[1]; }
+
   /// Only needed for 128 bit Decimals
-  std::shared_ptr<Buffer> sign_bitmap_;
+  std::shared_ptr<Buffer> sign_bitmap() const { return data_->buffers[2]; }
+
+  int32_t byte_width() const {
+    return static_cast<const DecimalType&>(*type()).byte_width();
+  }
+  const uint8_t* raw_values() const { return raw_values_; }
+
+ private:
+  void SetData(const std::shared_ptr<internal::ArrayData>& data);
+  const uint8_t* raw_values_;
   const uint8_t* sign_bitmap_data_;
 };
 
@@ -390,6 +560,8 @@ class ARROW_EXPORT StructArray : public Array {
  public:
   using TypeClass = StructType;
 
+  explicit StructArray(const std::shared_ptr<internal::ArrayData>& data);
+
   StructArray(const std::shared_ptr<DataType>& type, int64_t length,
       const std::vector<std::shared_ptr<Array>>& children,
       std::shared_ptr<Buffer> null_bitmap = nullptr, int64_t null_count = 0,
@@ -399,13 +571,7 @@ class ARROW_EXPORT StructArray : public Array {
   // with this array.
   std::shared_ptr<Array> field(int pos) const;
 
-  const std::vector<std::shared_ptr<Array>>& fields() const { return children_; }
-
   std::shared_ptr<Array> Slice(int64_t offset, int64_t length) const override;
-
- protected:
-  // The child arrays corresponding to each field of the struct data type.
-  std::vector<std::shared_ptr<Array>> children_;
 };
 
 // ----------------------------------------------------------------------
@@ -416,6 +582,8 @@ class ARROW_EXPORT UnionArray : public Array {
   using TypeClass = UnionType;
   using type_id_t = uint8_t;
 
+  explicit UnionArray(const std::shared_ptr<internal::ArrayData>& data);
+
   UnionArray(const std::shared_ptr<DataType>& type, int64_t length,
       const std::vector<std::shared_ptr<Array>>& children,
       const std::shared_ptr<Buffer>& type_ids,
@@ -424,29 +592,24 @@ class ARROW_EXPORT UnionArray : public Array {
       int64_t offset = 0);
 
   /// Note that this buffer does not account for any slice offset
-  std::shared_ptr<Buffer> type_ids() const { return type_ids_; }
+  std::shared_ptr<Buffer> type_ids() const { return data_->buffers[1]; }
 
   /// Note that this buffer does not account for any slice offset
-  std::shared_ptr<Buffer> value_offsets() const { return value_offsets_; }
+  std::shared_ptr<Buffer> value_offsets() const { return data_->buffers[2]; }
 
-  const type_id_t* raw_type_ids() const { return raw_type_ids_ + offset_; }
-  const int32_t* raw_value_offsets() const { return raw_value_offsets_ + offset_; }
+  const type_id_t* raw_type_ids() const { return raw_type_ids_ + data_->offset; }
+  const int32_t* raw_value_offsets() const { return raw_value_offsets_ + data_->offset; }
 
-  UnionMode mode() const { return static_cast<const UnionType&>(*type_.get()).mode(); }
+  UnionMode mode() const { return static_cast<const UnionType&>(*type()).mode(); }
 
   std::shared_ptr<Array> child(int pos) const;
 
-  const std::vector<std::shared_ptr<Array>>& children() const { return children_; }
-
   std::shared_ptr<Array> Slice(int64_t offset, int64_t length) const override;
 
  protected:
-  std::vector<std::shared_ptr<Array>> children_;
+  void SetData(const std::shared_ptr<internal::ArrayData>& data);
 
-  std::shared_ptr<Buffer> type_ids_;
   const type_id_t* raw_type_ids_;
-
-  std::shared_ptr<Buffer> value_offsets_;
   const int32_t* raw_value_offsets_;
 };
 
@@ -472,17 +635,21 @@ class ARROW_EXPORT DictionaryArray : public Array {
  public:
   using TypeClass = DictionaryType;
 
+  explicit DictionaryArray(const std::shared_ptr<internal::ArrayData>& data);
+
   DictionaryArray(
       const std::shared_ptr<DataType>& type, const std::shared_ptr<Array>& indices);
 
-  std::shared_ptr<Array> indices() const { return indices_; }
+  std::shared_ptr<Array> indices() const;
   std::shared_ptr<Array> dictionary() const;
 
   const DictionaryType* dict_type() const { return dict_type_; }
 
   std::shared_ptr<Array> Slice(int64_t offset, int64_t length) const override;
 
- protected:
+ private:
+  void SetData(const std::shared_ptr<internal::ArrayData>& data);
+
   const DictionaryType* dict_type_;
   std::shared_ptr<Array> indices_;
 };
@@ -517,6 +684,16 @@ ARROW_EXTERN_TEMPLATE NumericArray<TimestampType>;
 /// \return Status
 Status ARROW_EXPORT ValidateArray(const Array& array);
 
+/// Create new arrays for logical types that are backed by primitive arrays.
+Status ARROW_EXPORT MakePrimitiveArray(const std::shared_ptr<DataType>& type,
+    int64_t length, const std::shared_ptr<Buffer>& data,
+    const std::shared_ptr<Buffer>& null_bitmap, int64_t null_count, int64_t offset,
+    std::shared_ptr<Array>* out);
+
+Status ARROW_EXPORT MakePrimitiveArray(const std::shared_ptr<DataType>& type,
+    const std::vector<std::shared_ptr<Buffer>>& buffers, int64_t length,
+    int64_t null_count, int64_t offset, std::shared_ptr<Array>* out);
+
 }  // namespace arrow
 
 #endif

http://git-wip-us.apache.org/repos/asf/arrow/blob/84520711/cpp/src/arrow/builder.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/builder.cc b/cpp/src/arrow/builder.cc
index a57f75a..c3bc745 100644
--- a/cpp/src/arrow/builder.cc
+++ b/cpp/src/arrow/builder.cc
@@ -150,15 +150,15 @@ void ArrayBuilder::UnsafeSetNotNull(int64_t length) {
   const int64_t new_length = length + length_;
 
   // Fill up the bytes until we have a byte alignment
-  int64_t pad_to_byte = 8 - (length_ % 8);
+  int64_t pad_to_byte = std::min<int64_t>(8 - (length_ % 8), length);
   if (pad_to_byte == 8) { pad_to_byte = 0; }
-  for (int64_t i = 0; i < pad_to_byte; ++i) {
+  for (int64_t i = length_; i < length_ + pad_to_byte; ++i) {
     BitUtil::SetBit(null_bitmap_data_, i);
   }
 
   // Fast bitsetting
   int64_t fast_length = (length - pad_to_byte) / 8;
-  memset(null_bitmap_data_ + ((length_ + pad_to_byte) / 8), 255,
+  memset(null_bitmap_data_ + ((length_ + pad_to_byte) / 8), 0xFF,
       static_cast<size_t>(fast_length));
 
   // Trailing bytes
@@ -700,11 +700,11 @@ template <typename T>
 Status DictionaryBuilder<T>::Finish(std::shared_ptr<Array>* out) {
   std::shared_ptr<Array> dictionary;
   RETURN_NOT_OK(dict_builder_.Finish(&dictionary));
-  auto type = std::make_shared<DictionaryType>(type_, dictionary);
 
   std::shared_ptr<Array> values;
   RETURN_NOT_OK(values_builder_.Finish(&values));
 
+  auto type = std::make_shared<DictionaryType>(values->type(), dictionary);
   *out = std::make_shared<DictionaryArray>(type, values);
   return Status::OK();
 }
@@ -1031,6 +1031,7 @@ Status ListBuilder::Finish(std::shared_ptr<Array>* out) {
 void ListBuilder::Reset() {
   capacity_ = length_ = null_count_ = 0;
   null_bitmap_ = nullptr;
+  values_ = nullptr;
 }
 
 ArrayBuilder* ListBuilder::value_builder() const {
@@ -1061,7 +1062,7 @@ Status BinaryBuilder::Finish(std::shared_ptr<Array>* out) {
   auto values = std::dynamic_pointer_cast<UInt8Array>(list->values());
 
   *out = std::make_shared<BinaryArray>(list->length(), list->value_offsets(),
-      values->data(), list->null_bitmap(), list->null_count());
+      values->values(), list->null_bitmap(), list->null_count());
   return Status::OK();
 }
 
@@ -1086,7 +1087,7 @@ Status StringBuilder::Finish(std::shared_ptr<Array>* out) {
   auto values = std::dynamic_pointer_cast<UInt8Array>(list->values());
 
   *out = std::make_shared<StringArray>(list->length(), list->value_offsets(),
-      values->data(), list->null_bitmap(), list->null_count());
+      values->values(), list->null_bitmap(), list->null_count());
   return Status::OK();
 }
 

http://git-wip-us.apache.org/repos/asf/arrow/blob/84520711/cpp/src/arrow/compare.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/compare.cc b/cpp/src/arrow/compare.cc
index 390a406..23f5a19 100644
--- a/cpp/src/arrow/compare.cc
+++ b/cpp/src/arrow/compare.cc
@@ -83,8 +83,8 @@ class RangeEqualsVisitor {
       }
 
       if (end_offset - begin_offset > 0 &&
-          std::memcmp(left.data()->data() + begin_offset,
-              right.data()->data() + right_begin_offset,
+          std::memcmp(left.value_data()->data() + begin_offset,
+              right.value_data()->data() + right_begin_offset,
               static_cast<size_t>(end_offset - begin_offset))) {
         return false;
       }
@@ -126,7 +126,7 @@ class RangeEqualsVisitor {
          ++i, ++o_i) {
       if (left.IsNull(i) != right.IsNull(o_i)) { return false; }
       if (left.IsNull(i)) continue;
-      for (int j = 0; j < static_cast<int>(left.fields().size()); ++j) {
+      for (int j = 0; j < left.num_fields(); ++j) {
         // TODO: really we should be comparing stretches of non-null data rather
         // than looking at one value at a time.
         const int64_t left_abs_index = i + left.offset();
@@ -188,7 +188,7 @@ class RangeEqualsVisitor {
         }
       } else {
         const int32_t offset = left.raw_value_offsets()[i];
-        const int32_t o_offset = right.raw_value_offsets()[i];
+        const int32_t o_offset = right.raw_value_offsets()[o_i];
         if (!left.child(child_num)->RangeEquals(
                 offset, offset + 1, o_offset, right.child(child_num))) {
           return false;
@@ -211,9 +211,9 @@ class RangeEqualsVisitor {
     const uint8_t* left_data = nullptr;
     const uint8_t* right_data = nullptr;
 
-    if (left.data()) { left_data = left.raw_data() + left.offset() * width; }
+    if (left.values()) { left_data = left.raw_values() + left.offset() * width; }
 
-    if (right.data()) { right_data = right.raw_data() + right.offset() * width; }
+    if (right.values()) { right_data = right.raw_values() + right.offset() * width; }
 
     for (int64_t i = left_start_idx_, o_i = right_start_idx_; i < left_end_idx_;
          ++i, ++o_i) {
@@ -241,9 +241,9 @@ class RangeEqualsVisitor {
     const uint8_t* left_data = nullptr;
     const uint8_t* right_data = nullptr;
 
-    if (left.data()) { left_data = left.raw_data() + left.offset() * width; }
+    if (left.values()) { left_data = left.raw_values() + left.offset() * width; }
 
-    if (right.data()) { right_data = right.raw_data() + right.offset() * width; }
+    if (right.values()) { right_data = right.raw_values() + right.offset() * width; }
 
     for (int64_t i = left_start_idx_, o_i = right_start_idx_; i < left_end_idx_;
          ++i, ++o_i) {
@@ -317,6 +317,95 @@ class RangeEqualsVisitor {
   bool result_;
 };
 
+static bool IsEqualPrimitive(const PrimitiveArray& left, const PrimitiveArray& right) {
+  const auto& size_meta = dynamic_cast<const FixedWidthType&>(*left.type());
+  const int byte_width = size_meta.bit_width() / 8;
+
+  const uint8_t* left_data = nullptr;
+  const uint8_t* right_data = nullptr;
+
+  if (left.values()) { left_data = left.values()->data() + left.offset() * byte_width; }
+  if (right.values()) {
+    right_data = right.values()->data() + right.offset() * byte_width;
+  }
+
+  if (left.null_count() > 0) {
+    for (int64_t i = 0; i < left.length(); ++i) {
+      bool left_null = left.IsNull(i);
+      if (!left_null && (memcmp(left_data, right_data, byte_width) || right.IsNull(i))) {
+        return false;
+      }
+      left_data += byte_width;
+      right_data += byte_width;
+    }
+    return true;
+  } else {
+    return memcmp(left_data, right_data,
+               static_cast<size_t>(byte_width * left.length())) == 0;
+  }
+}
+
+template <typename T>
+static inline bool CompareBuiltIn(
+    const Array& left, const Array& right, const T* ldata, const T* rdata) {
+  if (left.null_count() > 0) {
+    for (int64_t i = 0; i < left.length(); ++i) {
+      if (left.IsNull(i) != right.IsNull(i)) {
+        return false;
+      } else if (!left.IsNull(i) && (ldata[i] != rdata[i])) {
+        return false;
+      }
+    }
+    return true;
+  } else {
+    return memcmp(ldata, rdata, sizeof(T) * left.length()) == 0;
+  }
+}
+
+static bool IsEqualDecimal(const DecimalArray& left, const DecimalArray& right) {
+  const int64_t loffset = left.offset();
+  const int64_t roffset = right.offset();
+
+  const uint8_t* left_data = nullptr;
+  const uint8_t* right_data = nullptr;
+
+  if (left.values()) { left_data = left.values()->data(); }
+  if (right.values()) { right_data = right.values()->data(); }
+
+  const int32_t byte_width = left.byte_width();
+  if (byte_width == 4) {
+    return CompareBuiltIn<int32_t>(left, right,
+        reinterpret_cast<const int32_t*>(left_data) + loffset,
+        reinterpret_cast<const int32_t*>(right_data) + roffset);
+  } else if (byte_width == 8) {
+    return CompareBuiltIn<int64_t>(left, right,
+        reinterpret_cast<const int64_t*>(left_data) + loffset,
+        reinterpret_cast<const int64_t*>(right_data) + roffset);
+  } else {
+    // 128-bit
+
+    // Must also compare sign bitmap
+    const uint8_t* left_sign = nullptr;
+    const uint8_t* right_sign = nullptr;
+    if (left.sign_bitmap()) { left_sign = left.sign_bitmap()->data(); }
+    if (right.sign_bitmap()) { right_sign = right.sign_bitmap()->data(); }
+
+    for (int64_t i = 0; i < left.length(); ++i) {
+      bool left_null = left.IsNull(i);
+      if (!left_null && (memcmp(left_data, right_data, byte_width) || right.IsNull(i))) {
+        return false;
+      }
+      if (BitUtil::GetBit(left_sign, i + loffset) !=
+          BitUtil::GetBit(right_sign, i + roffset)) {
+        return false;
+      }
+      left_data += byte_width;
+      right_data += byte_width;
+    }
+    return true;
+  }
+}
+
 class ArrayEqualsVisitor : public RangeEqualsVisitor {
  public:
   explicit ArrayEqualsVisitor(const Array& right)
@@ -331,8 +420,8 @@ class ArrayEqualsVisitor : public RangeEqualsVisitor {
     const auto& right = static_cast<const BooleanArray&>(right_);
 
     if (left.null_count() > 0) {
-      const uint8_t* left_data = left.data()->data();
-      const uint8_t* right_data = right.data()->data();
+      const uint8_t* left_data = left.values()->data();
+      const uint8_t* right_data = right.values()->data();
 
       for (int64_t i = 0; i < left.length(); ++i) {
         if (!left.IsNull(i) &&
@@ -344,45 +433,23 @@ class ArrayEqualsVisitor : public RangeEqualsVisitor {
       }
       result_ = true;
     } else {
-      result_ = BitmapEquals(left.data()->data(), left.offset(), right.data()->data(),
+      result_ = BitmapEquals(left.values()->data(), left.offset(), right.values()->data(),
           right.offset(), left.length());
     }
     return Status::OK();
   }
 
-  bool IsEqualPrimitive(const PrimitiveArray& left) {
-    const auto& right = static_cast<const PrimitiveArray&>(right_);
-    const auto& size_meta = dynamic_cast<const FixedWidthType&>(*left.type());
-    const int byte_width = size_meta.bit_width() / 8;
-
-    const uint8_t* left_data = nullptr;
-    const uint8_t* right_data = nullptr;
-
-    if (left.data()) { left_data = left.data()->data() + left.offset() * byte_width; }
-
-    if (right.data()) { right_data = right.data()->data() + right.offset() * byte_width; }
-
-    if (left.null_count() > 0) {
-      for (int64_t i = 0; i < left.length(); ++i) {
-        if (!left.IsNull(i) && memcmp(left_data, right_data, byte_width)) {
-          return false;
-        }
-        left_data += byte_width;
-        right_data += byte_width;
-      }
-      return true;
-    } else {
-      return memcmp(left_data, right_data,
-                 static_cast<size_t>(byte_width * left.length())) == 0;
-    }
-  }
-
   template <typename T>
   typename std::enable_if<std::is_base_of<PrimitiveArray, T>::value &&
                               !std::is_base_of<BooleanArray, T>::value,
       Status>::type
   Visit(const T& left) {
-    result_ = IsEqualPrimitive(left);
+    result_ = IsEqualPrimitive(left, static_cast<const PrimitiveArray&>(right_));
+    return Status::OK();
+  }
+
+  Status Visit(const DecimalArray& left) {
+    result_ = IsEqualDecimal(left, static_cast<const DecimalArray&>(right_));
     return Status::OK();
   }
 
@@ -417,11 +484,11 @@ class ArrayEqualsVisitor : public RangeEqualsVisitor {
     bool equal_offsets = ValueOffsetsEqual<BinaryArray>(left);
     if (!equal_offsets) { return false; }
 
-    if (!left.data() && !(right.data())) { return true; }
+    if (!left.value_data() && !(right.value_data())) { return true; }
     if (left.value_offset(left.length()) == 0) { return true; }
 
-    const uint8_t* left_data = left.data()->data();
-    const uint8_t* right_data = right.data()->data();
+    const uint8_t* left_data = left.value_data()->data();
+    const uint8_t* right_data = right.value_data()->data();
 
     if (left.null_count() == 0) {
       // Fast path for null count 0, single memcmp
@@ -491,8 +558,8 @@ inline bool FloatingApproxEquals(
     const NumericArray<TYPE>& left, const NumericArray<TYPE>& right) {
   using T = typename TYPE::c_type;
 
-  const T* left_data = left.raw_data();
-  const T* right_data = right.raw_data();
+  const T* left_data = left.raw_values();
+  const T* right_data = right.raw_values();
 
   static constexpr T EPSILON = static_cast<T>(1E-5);
 

http://git-wip-us.apache.org/repos/asf/arrow/blob/84520711/cpp/src/arrow/ipc/feather-test.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/feather-test.cc b/cpp/src/arrow/ipc/feather-test.cc
index 807ea4e..a7793f2 100644
--- a/cpp/src/arrow/ipc/feather-test.cc
+++ b/cpp/src/arrow/ipc/feather-test.cc
@@ -28,7 +28,6 @@
 #include "arrow/ipc/feather-internal.h"
 #include "arrow/ipc/feather.h"
 #include "arrow/ipc/test-common.h"
-#include "arrow/loader.h"
 #include "arrow/pretty_print.h"
 #include "arrow/test-util.h"
 
@@ -365,25 +364,19 @@ TEST_F(TestTableWriter, TimeTypes) {
   std::shared_ptr<Array> date_array;
   ArrayFromVector<Date32Type, int32_t>(is_valid, date_values_vec, &date_array);
 
-  std::vector<FieldMetadata> fields(1);
-  fields[0].length = values->length();
-  fields[0].null_count = values->null_count();
-  fields[0].offset = 0;
-
   const auto& prim_values = static_cast<const PrimitiveArray&>(*values);
   std::vector<std::shared_ptr<Buffer>> buffers = {
-      prim_values.null_bitmap(), prim_values.data()};
+      prim_values.null_bitmap(), prim_values.values()};
 
-  std::vector<std::shared_ptr<Array>> arrays;
-  arrays.push_back(date_array);
+  std::vector<std::shared_ptr<internal::ArrayData>> arrays;
+  arrays.push_back(date_array->data());
 
   for (int i = 1; i < schema->num_fields(); ++i) {
-    std::shared_ptr<Array> arr;
-    ASSERT_OK(LoadArray(schema->field(i)->type(), fields, buffers, &arr));
-    arrays.push_back(arr);
+    arrays.emplace_back(std::make_shared<internal::ArrayData>(
+        schema->field(i)->type(), values->length(), buffers, values->null_count(), 0));
   }
 
-  RecordBatch batch(schema, values->length(), arrays);
+  RecordBatch batch(schema, values->length(), std::move(arrays));
   CheckBatch(batch);
 }
 

http://git-wip-us.apache.org/repos/asf/arrow/blob/84520711/cpp/src/arrow/ipc/feather.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/feather.cc b/cpp/src/arrow/ipc/feather.cc
index bc7c431..37b01c5 100644
--- a/cpp/src/arrow/ipc/feather.cc
+++ b/cpp/src/arrow/ipc/feather.cc
@@ -33,7 +33,6 @@
 #include "arrow/io/file.h"
 #include "arrow/ipc/feather-internal.h"
 #include "arrow/ipc/feather_generated.h"
-#include "arrow/loader.h"
 #include "arrow/status.h"
 #include "arrow/table.h"
 #include "arrow/util/bit-util.h"
@@ -565,7 +564,7 @@ class TableWriter::TableWriterImpl : public ArrayVisitor {
           &bytes_written));
       meta->total_bytes += bytes_written;
 
-      if (bin_values.data()) { values_buffer = bin_values.data()->data(); }
+      if (bin_values.value_data()) { values_buffer = bin_values.value_data()->data(); }
     } else {
       const auto& prim_values = static_cast<const PrimitiveArray&>(values);
       const auto& fw_type = static_cast<const FixedWidthType&>(*values.type());
@@ -577,7 +576,7 @@ class TableWriter::TableWriterImpl : public ArrayVisitor {
         values_bytes = values.length() * fw_type.bit_width() / 8;
       }
 
-      if (prim_values.data()) { values_buffer = prim_values.data()->data(); }
+      if (prim_values.values()) { values_buffer = prim_values.values()->data(); }
     }
     RETURN_NOT_OK(
         WritePadded(stream_.get(), values_buffer, values_bytes, &bytes_written));

http://git-wip-us.apache.org/repos/asf/arrow/blob/84520711/cpp/src/arrow/ipc/json-internal.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/json-internal.cc b/cpp/src/arrow/ipc/json-internal.cc
index beebb4f..69e4ae8 100644
--- a/cpp/src/arrow/ipc/json-internal.cc
+++ b/cpp/src/arrow/ipc/json-internal.cc
@@ -414,7 +414,7 @@ class ArrayWriter {
   template <typename T>
   typename std::enable_if<IsSignedInt<T>::value, void>::type WriteDataValues(
       const T& arr) {
-    const auto data = arr.raw_data();
+    const auto data = arr.raw_values();
     for (int i = 0; i < arr.length(); ++i) {
       writer_->Int64(data[i]);
     }
@@ -423,7 +423,7 @@ class ArrayWriter {
   template <typename T>
   typename std::enable_if<IsUnsignedInt<T>::value, void>::type WriteDataValues(
       const T& arr) {
-    const auto data = arr.raw_data();
+    const auto data = arr.raw_values();
     for (int i = 0; i < arr.length(); ++i) {
       writer_->Uint64(data[i]);
     }
@@ -432,7 +432,7 @@ class ArrayWriter {
   template <typename T>
   typename std::enable_if<IsFloatingPoint<T>::value, void>::type WriteDataValues(
       const T& arr) {
-    const auto data = arr.raw_data();
+    const auto data = arr.raw_values();
     for (int i = 0; i < arr.length(); ++i) {
       writer_->Double(data[i]);
     }
@@ -558,7 +558,12 @@ class ArrayWriter {
   Status Visit(const StructArray& array) {
     WriteValidityField(array);
     const auto& type = static_cast<const StructType&>(*array.type());
-    return WriteChildren(type.children(), array.fields());
+    std::vector<std::shared_ptr<Array>> children;
+    children.reserve(array.num_fields());
+    for (int i = 0; i < array.num_fields(); ++i) {
+      children.emplace_back(array.field(i));
+    }
+    return WriteChildren(type.children(), children);
   }
 
   Status Visit(const UnionArray& array) {
@@ -569,7 +574,12 @@ class ArrayWriter {
     if (type.mode() == UnionMode::DENSE) {
       WriteIntegerField("OFFSET", array.raw_value_offsets(), array.length());
     }
-    return WriteChildren(type.children(), array.children());
+    std::vector<std::shared_ptr<Array>> children;
+    children.reserve(array.num_fields());
+    for (int i = 0; i < array.num_fields(); ++i) {
+      children.emplace_back(array.child(i));
+    }
+    return WriteChildren(type.children(), children);
   }
 
  private:

http://git-wip-us.apache.org/repos/asf/arrow/blob/84520711/cpp/src/arrow/ipc/metadata.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/metadata.h b/cpp/src/arrow/ipc/metadata.h
index ec7bc39..257bbd8 100644
--- a/cpp/src/arrow/ipc/metadata.h
+++ b/cpp/src/arrow/ipc/metadata.h
@@ -26,7 +26,6 @@
 #include <unordered_map>
 #include <vector>
 
-#include "arrow/loader.h"
 #include "arrow/util/macros.h"
 #include "arrow/util/visibility.h"
 
@@ -54,6 +53,42 @@ enum class MetadataVersion : char { V1, V2, V3 };
 
 static constexpr const char* kArrowMagicBytes = "ARROW1";
 
+// ARROW-109: We set this number arbitrarily to help catch user mistakes. For
+// deeply nested schemas, it is expected the user will indicate explicitly the
+// maximum allowed recursion depth
+constexpr int kMaxNestingDepth = 64;
+
+struct ARROW_EXPORT FieldMetadata {
+  FieldMetadata() {}
+  FieldMetadata(int64_t length, int64_t null_count, int64_t offset)
+      : length(length), null_count(null_count), offset(offset) {}
+
+  FieldMetadata(const FieldMetadata& other) {
+    this->length = other.length;
+    this->null_count = other.null_count;
+    this->offset = other.offset;
+  }
+
+  int64_t length;
+  int64_t null_count;
+  int64_t offset;
+};
+
+struct ARROW_EXPORT BufferMetadata {
+  BufferMetadata() {}
+  BufferMetadata(int32_t page, int64_t offset, int64_t length)
+      : page(page), offset(offset), length(length) {}
+
+  /// The shared memory page id where to find this. Set to -1 if unused
+  int32_t page;
+
+  /// The relative offset into the memory page to the starting byte of the buffer
+  int64_t offset;
+
+  /// Absolute length in bytes of the buffer
+  int64_t length;
+};
+
 struct FileBlock {
   FileBlock() {}
   FileBlock(int64_t offset, int32_t metadata_length, int64_t body_length)