You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2017/07/11 05:39:27 UTC

[2/2] arrow git commit: ARROW-1199: [C++] Implement mutable POD struct for Array data

ARROW-1199: [C++] Implement mutable POD struct for Array data

This data structure provides a new internal data structure that is a self-contained representation of the memory and metadata inside an Arrow array data structure.

This class is designed for easy internal data manipulation, analytical data processing, and data transport to and from IPC messages. For example, we could cast from int64 to float64 like so:

```c++
Int64Array arr = GetMyData();
std::shared_ptr<internal::ArrayData> new_data = arr->data()->ShallowCopy();
new_data->type = arrow::float64();
Float64Array double_arr(new_data);
```

This object is also useful in an analytics setting where memory may be reused. For example, if we had a group of operations all returning doubles, say:

```
Log(Sqrt(Expr(arr))
```

Then the low-level implementations of each of these functions could have the signatures

void Log(const ArrayData& values, ArrayData* out);

As another example a function may consume one or more memory buffers in an input array and replace them with newly-allocated data, changing the output data type as well.

I did quite a bit of refactoring and code simplification that was enabled by this patch. I note that performance in IPC loading of very wide record batches is about 15% slower, but in smaller record batches it is about the same in microbenchmarks. This code path could possibly be made faster with some performance analysis work.

Author: Wes McKinney <we...@twosigma.com>

Closes #824 from wesm/array-data-internals and squashes the following commits:

f1acbae1 [Wes McKinney] MSVC fixes
dcdf2b29 [Wes McKinney] Fix glib per C++ API changes
d0a8ee2b [Wes McKinney] Fix logic error in UnsafeSetNotNull
d17f886c [Wes McKinney] Construct dictionary indices in ctor
bba42530 [Wes McKinney] Set correct type when creating BinaryArray
ba3b2992 [Wes McKinney] Various fixes, Python fixes, add Array operator<< to std::ostream for debugging
0b8af24a [Wes McKinney] Write field metadata directly into output object
05058638 [Wes McKinney] Fix up cmake
75bc6b4f [Wes McKinney] Delete cruft from array/loader.h and consolidate in arrow/ipc
24df1b97 [Wes McKinney] Review comments, add some doxygen comments
6e2e5720 [Wes McKinney] Preallocate vector of shared_ptr
05b806b2 [Wes McKinney] Tests passing again
5bdd6a99 [Wes McKinney] bug fixes
7894496e [Wes McKinney] Some fixes
bf91a75a [Wes McKinney] Refactor to use shared_ptr, not yet working
130f0c1a [Wes McKinney] Use std::move instead of std::forward
a9b4031b [Wes McKinney] Add move constructors to reduce unnecessary copying
475a3db6 [Wes McKinney] Bug fixes, test suite passing again
16918279 [Wes McKinney] Array internals refactoring to use POD struct for all buffers, auxiliary metadata


Project: http://git-wip-us.apache.org/repos/asf/arrow/repo
Commit: http://git-wip-us.apache.org/repos/asf/arrow/commit/84520711
Tree: http://git-wip-us.apache.org/repos/asf/arrow/tree/84520711
Diff: http://git-wip-us.apache.org/repos/asf/arrow/diff/84520711

Branch: refs/heads/master
Commit: 8452071180c075d7d829d9c0a49376adb45971e0
Parents: ad57ea8
Author: Wes McKinney <we...@twosigma.com>
Authored: Tue Jul 11 01:39:20 2017 -0400
Committer: Wes McKinney <we...@twosigma.com>
Committed: Tue Jul 11 01:39:20 2017 -0400

----------------------------------------------------------------------
 c_glib/arrow-glib/array.cpp                 |   9 +-
 c_glib/arrow-glib/record-batch.cpp          |   3 +-
 cpp/CMakeLists.txt                          |   1 -
 cpp/cmake_modules/FindLz4.cmake             |  35 +--
 cpp/cmake_modules/FindZSTD.cmake            |  35 +--
 cpp/src/arrow/CMakeLists.txt                |   1 -
 cpp/src/arrow/api.h                         |   1 -
 cpp/src/arrow/array-test.cc                 |  71 +++--
 cpp/src/arrow/array.cc                      | 375 ++++++++++++++++-------
 cpp/src/arrow/array.h                       | 345 ++++++++++++++++-----
 cpp/src/arrow/builder.cc                    |  13 +-
 cpp/src/arrow/compare.cc                    | 155 +++++++---
 cpp/src/arrow/ipc/feather-test.cc           |  19 +-
 cpp/src/arrow/ipc/feather.cc                |   5 +-
 cpp/src/arrow/ipc/json-internal.cc          |  20 +-
 cpp/src/arrow/ipc/metadata.h                |  37 ++-
 cpp/src/arrow/ipc/reader.cc                 | 204 +++++++++++-
 cpp/src/arrow/ipc/test-common.h             |   8 +-
 cpp/src/arrow/ipc/writer.cc                 |  14 +-
 cpp/src/arrow/loader.cc                     | 297 ------------------
 cpp/src/arrow/loader.h                      | 124 --------
 cpp/src/arrow/pretty_print.cc               |  23 +-
 cpp/src/arrow/pretty_print.h                |   3 +
 cpp/src/arrow/python/pandas_convert.cc      |  41 ++-
 cpp/src/arrow/table.cc                      |  46 ++-
 cpp/src/arrow/table.h                       |  32 +-
 python/doc/source/development.rst           |   2 +-
 python/pyarrow/array.pxi                    |   7 +-
 python/pyarrow/includes/libarrow.pxd        |   4 +
 python/pyarrow/tests/test_convert_pandas.py |  17 +
 30 files changed, 1101 insertions(+), 846 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/arrow/blob/84520711/c_glib/arrow-glib/array.cpp
----------------------------------------------------------------------
diff --git a/c_glib/arrow-glib/array.cpp b/c_glib/arrow-glib/array.cpp
index 92a748d..ab62bba 100644
--- a/c_glib/arrow-glib/array.cpp
+++ b/c_glib/arrow-glib/array.cpp
@@ -38,7 +38,7 @@ garrow_array_get_values_raw(std::shared_ptr<arrow::Array> arrow_array,
   auto arrow_specific_array =
     std::static_pointer_cast<typename arrow::TypeTraits<T>::ArrayType>(arrow_array);
   *length = arrow_specific_array->length();
-  return arrow_specific_array->raw_data();
+  return arrow_specific_array->raw_values();
 };
 
 G_BEGIN_DECLS
@@ -490,7 +490,7 @@ garrow_primitive_array_get_buffer(GArrowPrimitiveArray *array)
   auto arrow_array = garrow_array_get_raw(GARROW_ARRAY(array));
   auto arrow_primitive_array =
     static_cast<arrow::PrimitiveArray *>(arrow_array.get());
-  auto arrow_data = arrow_primitive_array->data();
+  auto arrow_data = arrow_primitive_array->values();
   return garrow_buffer_new_raw(&arrow_data);
 }
 
@@ -1425,7 +1425,7 @@ garrow_binary_array_get_buffer(GArrowBinaryArray *array)
   auto arrow_array = garrow_array_get_raw(GARROW_ARRAY(array));
   auto arrow_binary_array =
     static_cast<arrow::BinaryArray *>(arrow_array.get());
-  auto arrow_data = arrow_binary_array->data();
+  auto arrow_data = arrow_binary_array->value_data();
   return garrow_buffer_new_raw(&arrow_data);
 }
 
@@ -1681,7 +1681,8 @@ garrow_struct_array_get_fields(GArrowStructArray *array)
     static_cast<const arrow::StructArray *>(arrow_array.get());
 
   GList *fields = NULL;
-  for (auto arrow_field : arrow_struct_array->fields()) {
+  for (int i = 0; i < arrow_struct_array->num_fields(); ++i) {
+    auto arrow_field = arrow_struct_array->field(i);
     GArrowArray *field = garrow_array_new_raw(&arrow_field);
     fields = g_list_prepend(fields, field);
   }

http://git-wip-us.apache.org/repos/asf/arrow/blob/84520711/c_glib/arrow-glib/record-batch.cpp
----------------------------------------------------------------------
diff --git a/c_glib/arrow-glib/record-batch.cpp b/c_glib/arrow-glib/record-batch.cpp
index cd030de..f381af0 100644
--- a/c_glib/arrow-glib/record-batch.cpp
+++ b/c_glib/arrow-glib/record-batch.cpp
@@ -219,7 +219,8 @@ garrow_record_batch_get_columns(GArrowRecordBatch *record_batch)
   const auto arrow_record_batch = garrow_record_batch_get_raw(record_batch);
 
   GList *columns = NULL;
-  for (auto arrow_column : arrow_record_batch->columns()) {
+  for (int i = 0; i < arrow_record_batch->num_columns(); ++i) {
+    auto arrow_column = arrow_record_batch->column(i);
     GArrowArray *column = garrow_array_new_raw(&arrow_column);
     columns = g_list_prepend(columns, column);
   }

http://git-wip-us.apache.org/repos/asf/arrow/blob/84520711/cpp/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt
index 002a07e..e67c7f6 100644
--- a/cpp/CMakeLists.txt
+++ b/cpp/CMakeLists.txt
@@ -650,7 +650,6 @@ set(ARROW_SRCS
   src/arrow/buffer.cc
   src/arrow/builder.cc
   src/arrow/compare.cc
-  src/arrow/loader.cc
   src/arrow/memory_pool.cc
   src/arrow/pretty_print.cc
   src/arrow/status.cc

http://git-wip-us.apache.org/repos/asf/arrow/blob/84520711/cpp/cmake_modules/FindLz4.cmake
----------------------------------------------------------------------
diff --git a/cpp/cmake_modules/FindLz4.cmake b/cpp/cmake_modules/FindLz4.cmake
index e25b013..07707cf 100644
--- a/cpp/cmake_modules/FindLz4.cmake
+++ b/cpp/cmake_modules/FindLz4.cmake
@@ -39,32 +39,15 @@ set(LZ4_STATIC_LIB_SUFFIX
 set(LZ4_STATIC_LIB_NAME
   ${CMAKE_STATIC_LIBRARY_PREFIX}lz4${LZ4_STATIC_LIB_SUFFIX})
 
-if ( _lz4_roots )
-  find_path(LZ4_INCLUDE_DIR NAMES lz4.h
-    PATHS ${_lz4_roots}
-    NO_DEFAULT_PATH
-    PATH_SUFFIXES "include" )
-  find_library(LZ4_SHARED_LIB NAMES lz4
-    PATHS ${_lz4_roots}
-    NO_DEFAULT_PATH
-    PATH_SUFFIXES "lib" )
-  find_library(LZ4_STATIC_LIB NAMES ${LZ4_STATIC_LIB_NAME}
-    PATHS ${_lz4_roots}
-    NO_DEFAULT_PATH
-    PATH_SUFFIXES "lib" )
-else()
-  find_path(LZ4_INCLUDE_DIR lz4.h
-    # make sure we don't accidentally pick up a different version
-    NO_CMAKE_SYSTEM_PATH
-    NO_SYSTEM_ENVIRONMENT_PATH)
-  find_library(LZ4_SHARED_LIB lz4
-    NO_CMAKE_SYSTEM_PATH
-    NO_SYSTEM_ENVIRONMENT_PATH)
-  find_library(LZ4_STATIC_LIB ${LZ4_STATIC_LIB_NAME}
-    NO_CMAKE_SYSTEM_PATH
-    NO_SYSTEM_ENVIRONMENT_PATH)
-endif()
+find_path(LZ4_INCLUDE_DIR NAMES lz4.h
+  PATHS ${_lz4_roots}
+  NO_DEFAULT_PATH
+  PATH_SUFFIXES "include" )
+find_library(LZ4_STATIC_LIB NAMES ${LZ4_STATIC_LIB_NAME} lib${LZ4_STATIC_LIB_NAME}
+  PATHS ${_lz4_roots}
+  NO_DEFAULT_PATH
+  PATH_SUFFIXES "lib" )
 
 include(FindPackageHandleStandardArgs)
 find_package_handle_standard_args(LZ4 REQUIRED_VARS
-  LZ4_SHARED_LIB LZ4_STATIC_LIB LZ4_INCLUDE_DIR)
+  LZ4_STATIC_LIB LZ4_INCLUDE_DIR)

http://git-wip-us.apache.org/repos/asf/arrow/blob/84520711/cpp/cmake_modules/FindZSTD.cmake
----------------------------------------------------------------------
diff --git a/cpp/cmake_modules/FindZSTD.cmake b/cpp/cmake_modules/FindZSTD.cmake
index 1fda29e..02a0c39 100644
--- a/cpp/cmake_modules/FindZSTD.cmake
+++ b/cpp/cmake_modules/FindZSTD.cmake
@@ -39,32 +39,15 @@ set(ZSTD_STATIC_LIB_SUFFIX
 set(ZSTD_STATIC_LIB_NAME
   ${CMAKE_STATIC_LIBRARY_PREFIX}zstd${ZSTD_STATIC_LIB_SUFFIX})
 
-if ( _zstd_roots )
-  find_path(ZSTD_INCLUDE_DIR NAMES zstd.h
-    PATHS ${_zstd_roots}
-    NO_DEFAULT_PATH
-    PATH_SUFFIXES "include" )
-  find_library(ZSTD_SHARED_LIB NAMES zstd
-    PATHS ${_zstd_roots}
-    NO_DEFAULT_PATH
-    PATH_SUFFIXES "lib" )
-  find_library(ZSTD_STATIC_LIB NAMES ${ZSTD_STATIC_LIB_NAME}
-    PATHS ${_zstd_roots}
-    NO_DEFAULT_PATH
-    PATH_SUFFIXES "lib" )
-else()
-  find_path(ZSTD_INCLUDE_DIR zstd.h
-    # make sure we don't accidentally pick up a different version
-    NO_CMAKE_SYSTEM_PATH
-    NO_SYSTEM_ENVIRONMENT_PATH)
-  find_library(ZSTD_SHARED_LIB zstd
-    NO_CMAKE_SYSTEM_PATH
-    NO_SYSTEM_ENVIRONMENT_PATH)
-  find_library(ZSTD_STATIC_LIB ${ZSTD_STATIC_LIB_NAME}
-    NO_CMAKE_SYSTEM_PATH
-    NO_SYSTEM_ENVIRONMENT_PATH)
-endif()
+find_path(ZSTD_INCLUDE_DIR NAMES zstd.h
+  PATHS ${_zstd_roots}
+  NO_DEFAULT_PATH
+  PATH_SUFFIXES "include" )
+find_library(ZSTD_STATIC_LIB NAMES ${ZSTD_STATIC_LIB_NAME} lib${ZSTD_STATIC_LIB_NAME}
+  PATHS ${_zstd_roots}
+  NO_DEFAULT_PATH
+  PATH_SUFFIXES "lib" )
 
 include(FindPackageHandleStandardArgs)
 find_package_handle_standard_args(ZSTD REQUIRED_VARS
-  ZSTD_SHARED_LIB ZSTD_STATIC_LIB ZSTD_INCLUDE_DIR)
+  ZSTD_STATIC_LIB ZSTD_INCLUDE_DIR)

http://git-wip-us.apache.org/repos/asf/arrow/blob/84520711/cpp/src/arrow/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/CMakeLists.txt b/cpp/src/arrow/CMakeLists.txt
index cb5282c..55fab2d 100644
--- a/cpp/src/arrow/CMakeLists.txt
+++ b/cpp/src/arrow/CMakeLists.txt
@@ -23,7 +23,6 @@ install(FILES
   buffer.h
   builder.h
   compare.h
-  loader.h
   memory_pool.h
   pretty_print.h
   status.h

http://git-wip-us.apache.org/repos/asf/arrow/blob/84520711/cpp/src/arrow/api.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/api.h b/cpp/src/arrow/api.h
index aa0da75..731f239 100644
--- a/cpp/src/arrow/api.h
+++ b/cpp/src/arrow/api.h
@@ -24,7 +24,6 @@
 #include "arrow/buffer.h"
 #include "arrow/builder.h"
 #include "arrow/compare.h"
-#include "arrow/loader.h"
 #include "arrow/memory_pool.h"
 #include "arrow/pretty_print.h"
 #include "arrow/status.h"

http://git-wip-us.apache.org/repos/asf/arrow/blob/84520711/cpp/src/arrow/array-test.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/array-test.cc b/cpp/src/arrow/array-test.cc
index 7ae03cf..bfdb923 100644
--- a/cpp/src/arrow/array-test.cc
+++ b/cpp/src/arrow/array-test.cc
@@ -351,7 +351,7 @@ void TestPrimitiveBuilder<PBoolean>::Check(
 
   for (int64_t i = 0; i < result->length(); ++i) {
     if (nullable) { ASSERT_EQ(valid_bytes_[i] == 0, result->IsNull(i)) << i; }
-    bool actual = BitUtil::GetBit(result->data()->data(), i);
+    bool actual = BitUtil::GetBit(result->values()->data(), i);
     ASSERT_EQ(draws_[i] != 0, actual) << i;
   }
   ASSERT_TRUE(result->Equals(*expected));
@@ -778,8 +778,8 @@ TEST_F(TestStringArray, CompareNullByteSlots) {
 
   // The validity bitmaps are the same, the data is different, but the unequal
   // portion is masked out
-  StringArray equal_array(3, a1.value_offsets(), a1.data(), a2.null_bitmap(), 1);
-  StringArray equal_array2(3, a3.value_offsets(), a3.data(), a2.null_bitmap(), 1);
+  StringArray equal_array(3, a1.value_offsets(), a1.value_data(), a2.null_bitmap(), 1);
+  StringArray equal_array2(3, a3.value_offsets(), a3.value_data(), a2.null_bitmap(), 1);
 
   ASSERT_TRUE(equal_array.Equals(equal_array2));
   ASSERT_TRUE(a2.RangeEquals(equal_array2, 0, 3, 0));
@@ -846,7 +846,7 @@ TEST_F(TestStringBuilder, TestScalarAppend) {
 
   ASSERT_EQ(reps * N, result_->length());
   ASSERT_EQ(reps, result_->null_count());
-  ASSERT_EQ(reps * 6, result_->data()->size());
+  ASSERT_EQ(reps * 6, result_->value_data()->size());
 
   int32_t length;
   int32_t pos = 0;
@@ -1011,7 +1011,7 @@ TEST_F(TestBinaryBuilder, TestScalarAppend) {
   ASSERT_OK(ValidateArray(*result_));
   ASSERT_EQ(reps * N, result_->length());
   ASSERT_EQ(reps, result_->null_count());
-  ASSERT_EQ(reps * 6, result_->data()->size());
+  ASSERT_EQ(reps * 6, result_->value_data()->size());
 
   int32_t length;
   for (int i = 0; i < N * reps; ++i) {
@@ -1200,8 +1200,8 @@ TEST_F(TestFWBinaryArray, EqualsRangeEquals) {
   const auto& a1 = static_cast<const FixedSizeBinaryArray&>(*array1);
   const auto& a2 = static_cast<const FixedSizeBinaryArray&>(*array2);
 
-  FixedSizeBinaryArray equal1(type, 2, a1.data(), a1.null_bitmap(), 1);
-  FixedSizeBinaryArray equal2(type, 2, a2.data(), a1.null_bitmap(), 1);
+  FixedSizeBinaryArray equal1(type, 2, a1.values(), a1.null_bitmap(), 1);
+  FixedSizeBinaryArray equal2(type, 2, a2.values(), a1.null_bitmap(), 1);
 
   ASSERT_TRUE(equal1.Equals(equal2));
   ASSERT_TRUE(equal1.RangeEquals(equal2, 0, 2, 0));
@@ -1224,7 +1224,7 @@ TEST_F(TestFWBinaryArray, ZeroSize) {
   const auto& fw_array = static_cast<const FixedSizeBinaryArray&>(*array);
 
   // data is never allocated
-  ASSERT_TRUE(fw_array.data() == nullptr);
+  ASSERT_TRUE(fw_array.values() == nullptr);
   ASSERT_EQ(0, fw_array.byte_width());
 
   ASSERT_EQ(6, array->length());
@@ -1524,8 +1524,7 @@ TYPED_TEST(TestDictionaryBuilder, Basic) {
   ASSERT_OK(dict_builder.Append(static_cast<typename TypeParam::c_type>(2)));
   std::shared_ptr<Array> dict_array;
   ASSERT_OK(dict_builder.Finish(&dict_array));
-  auto dtype =
-      std::make_shared<DictionaryType>(std::make_shared<TypeParam>(), dict_array);
+  auto dtype = std::make_shared<DictionaryType>(uint8(), dict_array);
 
   UInt8Builder int_builder(default_memory_pool());
   ASSERT_OK(int_builder.Append(0));
@@ -1558,8 +1557,7 @@ TYPED_TEST(TestDictionaryBuilder, ArrayConversion) {
   ASSERT_OK(dict_builder.Append(static_cast<typename TypeParam::c_type>(2)));
   std::shared_ptr<Array> dict_array;
   ASSERT_OK(dict_builder.Finish(&dict_array));
-  auto dtype =
-      std::make_shared<DictionaryType>(std::make_shared<TypeParam>(), dict_array);
+  auto dtype = std::make_shared<DictionaryType>(uint8(), dict_array);
 
   UInt8Builder int_builder(default_memory_pool());
   ASSERT_OK(int_builder.Append(0));
@@ -1601,8 +1599,7 @@ TYPED_TEST(TestDictionaryBuilder, DoubleTableSize) {
     // Finalize expected data
     std::shared_ptr<Array> dict_array;
     ASSERT_OK(dict_builder.Finish(&dict_array));
-    auto dtype =
-        std::make_shared<DictionaryType>(std::make_shared<TypeParam>(), dict_array);
+    auto dtype = std::make_shared<DictionaryType>(uint16(), dict_array);
     std::shared_ptr<Array> int_array;
     ASSERT_OK(int_builder.Finish(&int_array));
 
@@ -1627,7 +1624,7 @@ TEST(TestStringDictionaryBuilder, Basic) {
   ASSERT_OK(str_builder.Append("test2"));
   std::shared_ptr<Array> str_array;
   ASSERT_OK(str_builder.Finish(&str_array));
-  auto dtype = std::make_shared<DictionaryType>(utf8(), str_array);
+  auto dtype = std::make_shared<DictionaryType>(uint8(), str_array);
 
   UInt8Builder int_builder(default_memory_pool());
   ASSERT_OK(int_builder.Append(0));
@@ -1668,7 +1665,7 @@ TEST(TestStringDictionaryBuilder, DoubleTableSize) {
   // Finalize expected data
   std::shared_ptr<Array> str_array;
   ASSERT_OK(str_builder.Finish(&str_array));
-  auto dtype = std::make_shared<DictionaryType>(utf8(), str_array);
+  auto dtype = std::make_shared<DictionaryType>(uint16(), str_array);
   std::shared_ptr<Array> int_array;
   ASSERT_OK(int_builder.Finish(&int_array));
 
@@ -1781,7 +1778,7 @@ TEST_F(TestListBuilder, TestAppendNull) {
   ASSERT_EQ(0, result_->value_offset(1));
   ASSERT_EQ(0, result_->value_offset(2));
 
-  Int32Array* values = static_cast<Int32Array*>(result_->values().get());
+  auto values = result_->values();
   ASSERT_EQ(0, values->length());
 }
 
@@ -1802,7 +1799,7 @@ void ValidateBasicListArray(const ListArray* result, const vector<int32_t>& valu
   }
 
   ASSERT_EQ(7, result->values()->length());
-  Int32Array* varr = static_cast<Int32Array*>(result->values().get());
+  auto varr = std::dynamic_pointer_cast<Int32Array>(result->values());
 
   for (size_t i = 0; i < values.size(); ++i) {
     ASSERT_EQ(values[i], varr->Value(i));
@@ -1972,25 +1969,27 @@ TEST(TestDictionary, Validate) {
   std::shared_ptr<DataType> dict_type = dictionary(int16(), dict);
 
   std::shared_ptr<Array> indices;
-  vector<uint8_t> indices_values = {1, 2, 0, 0, 2, 0};
-  ArrayFromVector<UInt8Type, uint8_t>(is_valid, indices_values, &indices);
-
-  std::shared_ptr<Array> indices2;
-  vector<float> indices2_values = {1., 2., 0., 0., 2., 0.};
-  ArrayFromVector<FloatType, float>(is_valid, indices2_values, &indices2);
-
-  std::shared_ptr<Array> indices3;
-  vector<int64_t> indices3_values = {1, 2, 0, 0, 2, 0};
-  ArrayFromVector<Int64Type, int64_t>(is_valid, indices3_values, &indices3);
+  vector<int16_t> indices_values = {1, 2, 0, 0, 2, 0};
+  ArrayFromVector<Int16Type, int16_t>(is_valid, indices_values, &indices);
 
   std::shared_ptr<Array> arr = std::make_shared<DictionaryArray>(dict_type, indices);
-  std::shared_ptr<Array> arr2 = std::make_shared<DictionaryArray>(dict_type, indices2);
-  std::shared_ptr<Array> arr3 = std::make_shared<DictionaryArray>(dict_type, indices3);
 
   // Only checking index type for now
   ASSERT_OK(ValidateArray(*arr));
-  ASSERT_RAISES(Invalid, ValidateArray(*arr2));
-  ASSERT_OK(ValidateArray(*arr3));
+
+  // TODO(wesm) In ARROW-1199, there is now a DCHECK to compare the indices
+  // type with the dict_type. How can we test for this?
+
+  // std::shared_ptr<Array> indices2;
+  // vector<float> indices2_values = {1., 2., 0., 0., 2., 0.};
+  // ArrayFromVector<FloatType, float>(is_valid, indices2_values, &indices2);
+
+  // std::shared_ptr<Array> indices3;
+  // vector<int64_t> indices3_values = {1, 2, 0, 0, 2, 0};
+  // ArrayFromVector<Int64Type, int64_t>(is_valid, indices3_values, &indices3);
+  // std::shared_ptr<Array> arr2 = std::make_shared<DictionaryArray>(dict_type, indices2);
+  // std::shared_ptr<Array> arr3 = std::make_shared<DictionaryArray>(dict_type, indices3);
+  // ASSERT_OK(ValidateArray(*arr3));
 }
 
 // ----------------------------------------------------------------------
@@ -2003,9 +2002,9 @@ void ValidateBasicStructArray(const StructArray* result,
   ASSERT_EQ(4, result->length());
   ASSERT_OK(ValidateArray(*result));
 
-  auto list_char_arr = static_cast<ListArray*>(result->field(0).get());
-  auto char_arr = static_cast<Int8Array*>(list_char_arr->values().get());
-  auto int32_arr = static_cast<Int32Array*>(result->field(1).get());
+  auto list_char_arr = std::dynamic_pointer_cast<ListArray>(result->field(0));
+  auto char_arr = std::dynamic_pointer_cast<Int8Array>(list_char_arr->values());
+  auto int32_arr = std::dynamic_pointer_cast<Int32Array>(result->field(1));
 
   ASSERT_EQ(0, result->null_count());
   ASSERT_EQ(1, list_char_arr->null_count());
@@ -2086,7 +2085,7 @@ TEST_F(TestStructBuilder, TestAppendNull) {
 
   ASSERT_OK(ValidateArray(*result_));
 
-  ASSERT_EQ(2, static_cast<int>(result_->fields().size()));
+  ASSERT_EQ(2, static_cast<int>(result_->num_fields()));
   ASSERT_EQ(2, result_->length());
   ASSERT_EQ(2, result_->field(0)->length());
   ASSERT_EQ(2, result_->field(1)->length());

http://git-wip-us.apache.org/repos/asf/arrow/blob/84520711/cpp/src/arrow/array.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/array.cc b/cpp/src/arrow/array.cc
index c5acf3e..48a3bd5 100644
--- a/cpp/src/arrow/array.cc
+++ b/cpp/src/arrow/array.cc
@@ -24,46 +24,34 @@
 
 #include "arrow/buffer.h"
 #include "arrow/compare.h"
+#include "arrow/pretty_print.h"
 #include "arrow/status.h"
 #include "arrow/type_traits.h"
 #include "arrow/util/bit-util.h"
 #include "arrow/util/decimal.h"
 #include "arrow/util/logging.h"
+#include "arrow/util/macros.h"
 #include "arrow/visitor.h"
 #include "arrow/visitor_inline.h"
 
 namespace arrow {
 
-// When slicing, we do not know the null count of the sliced range without
-// doing some computation. To avoid doing this eagerly, we set the null count
-// to -1 (any negative number will do). When Array::null_count is called the
-// first time, the null count will be computed. See ARROW-33
-constexpr int64_t kUnknownNullCount = -1;
+using internal::ArrayData;
 
 // ----------------------------------------------------------------------
 // Base array class
 
-Array::Array(const std::shared_ptr<DataType>& type, int64_t length,
-    const std::shared_ptr<Buffer>& null_bitmap, int64_t null_count, int64_t offset)
-    : type_(type),
-      length_(length),
-      offset_(offset),
-      null_count_(null_count),
-      null_bitmap_(null_bitmap),
-      null_bitmap_data_(nullptr) {
-  if (null_count_ == 0) { null_bitmap_ = nullptr; }
-  if (null_bitmap_) { null_bitmap_data_ = null_bitmap_->data(); }
-}
-
 int64_t Array::null_count() const {
-  if (null_count_ < 0) {
-    if (null_bitmap_) {
-      null_count_ = length_ - CountSetBits(null_bitmap_data_, offset_, length_);
+  if (ARROW_PREDICT_FALSE(data_->null_count < 0)) {
+    if (data_->buffers[0]) {
+      data_->null_count =
+          data_->length - CountSetBits(null_bitmap_data_, data_->offset, data_->length);
+
     } else {
-      null_count_ = 0;
+      data_->null_count = 0;
     }
   }
-  return null_count_;
+  return data_->null_count;
 }
 
 bool Array::Equals(const Array& arr) const {
@@ -115,15 +103,34 @@ static inline void ConformSliceParams(
 }
 
 std::shared_ptr<Array> Array::Slice(int64_t offset) const {
-  int64_t slice_length = length_ - offset;
+  int64_t slice_length = data_->length - offset;
   return Slice(offset, slice_length);
 }
 
-NullArray::NullArray(int64_t length) : Array(null(), length, nullptr, length) {}
+std::ostream& operator<<(std::ostream& os, const Array& x) {
+  DCHECK(PrettyPrint(x, 0, &os).ok());
+  return os;
+}
+
+static inline std::shared_ptr<ArrayData> SliceData(
+    const ArrayData& data, int64_t offset, int64_t length) {
+  ConformSliceParams(data.offset, data.length, &offset, &length);
+
+  auto new_data = data.ShallowCopy();
+  new_data->length = length;
+  new_data->offset = offset;
+  new_data->null_count = kUnknownNullCount;
+  return new_data;
+}
+
+NullArray::NullArray(int64_t length) {
+  BufferVector buffers = {nullptr};
+  SetData(std::make_shared<ArrayData>(null(), length, std::move(buffers), length));
+}
 
 std::shared_ptr<Array> NullArray::Slice(int64_t offset, int64_t length) const {
-  DCHECK_LE(offset, length_);
-  length = std::min(length_ - offset, length);
+  DCHECK_LE(offset, data_->length);
+  length = std::min(data_->length - offset, length);
   return std::make_shared<NullArray>(length);
 }
 
@@ -132,40 +139,78 @@ std::shared_ptr<Array> NullArray::Slice(int64_t offset, int64_t length) const {
 
 PrimitiveArray::PrimitiveArray(const std::shared_ptr<DataType>& type, int64_t length,
     const std::shared_ptr<Buffer>& data, const std::shared_ptr<Buffer>& null_bitmap,
-    int64_t null_count, int64_t offset)
-    : Array(type, length, null_bitmap, null_count, offset) {
-  data_ = data;
-  raw_data_ = data == nullptr ? nullptr : data_->data();
+    int64_t null_count, int64_t offset) {
+  BufferVector buffers = {null_bitmap, data};
+  SetData(
+      std::make_shared<ArrayData>(type, length, std::move(buffers), null_count, offset));
+}
+
+template <typename T>
+NumericArray<T>::NumericArray(const std::shared_ptr<internal::ArrayData>& data)
+    : PrimitiveArray(data) {
+  DCHECK_EQ(data->type->id(), T::type_id);
 }
 
 template <typename T>
 std::shared_ptr<Array> NumericArray<T>::Slice(int64_t offset, int64_t length) const {
-  ConformSliceParams(offset_, length_, &offset, &length);
-  return std::make_shared<NumericArray<T>>(
-      type_, length, data_, null_bitmap_, kUnknownNullCount, offset);
+  return std::make_shared<NumericArray<T>>(SliceData(*data_, offset, length));
 }
 
 // ----------------------------------------------------------------------
 // BooleanArray
 
+BooleanArray::BooleanArray(const std::shared_ptr<internal::ArrayData>& data)
+    : PrimitiveArray(data) {
+  DCHECK_EQ(data->type->id(), Type::BOOL);
+}
+
 BooleanArray::BooleanArray(int64_t length, const std::shared_ptr<Buffer>& data,
     const std::shared_ptr<Buffer>& null_bitmap, int64_t null_count, int64_t offset)
-    : PrimitiveArray(std::make_shared<BooleanType>(), length, data, null_bitmap,
-          null_count, offset) {}
+    : PrimitiveArray(boolean(), length, data, null_bitmap, null_count, offset) {}
 
 std::shared_ptr<Array> BooleanArray::Slice(int64_t offset, int64_t length) const {
-  ConformSliceParams(offset_, length_, &offset, &length);
-  return std::make_shared<BooleanArray>(
-      length, data_, null_bitmap_, kUnknownNullCount, offset);
+  return std::make_shared<BooleanArray>(SliceData(*data_, offset, length));
 }
 
 // ----------------------------------------------------------------------
 // ListArray
 
+ListArray::ListArray(const std::shared_ptr<ArrayData>& data) {
+  DCHECK_EQ(data->type->id(), Type::LIST);
+  SetData(data);
+}
+
+ListArray::ListArray(const std::shared_ptr<DataType>& type, int64_t length,
+    const std::shared_ptr<Buffer>& value_offsets, const std::shared_ptr<Array>& values,
+    const std::shared_ptr<Buffer>& null_bitmap, int64_t null_count, int64_t offset) {
+  BufferVector buffers = {null_bitmap, value_offsets};
+  auto internal_data =
+      std::make_shared<ArrayData>(type, length, std::move(buffers), null_count, offset);
+  internal_data->child_data.emplace_back(values->data());
+  SetData(internal_data);
+}
+
+void ListArray::SetData(const std::shared_ptr<ArrayData>& data) {
+  this->Array::SetData(data);
+  auto value_offsets = data->buffers[1];
+  raw_value_offsets_ = value_offsets == nullptr
+                           ? nullptr
+                           : reinterpret_cast<const int32_t*>(value_offsets->data());
+  DCHECK(internal::MakeArray(data_->child_data[0], &values_).ok());
+}
+
+std::shared_ptr<DataType> ListArray::value_type() const {
+  return static_cast<const ListType&>(*type()).value_type();
+}
+
+std::shared_ptr<Array> ListArray::values() const {
+  return values_;
+}
+
 std::shared_ptr<Array> ListArray::Slice(int64_t offset, int64_t length) const {
-  ConformSliceParams(offset_, length_, &offset, &length);
-  return std::make_shared<ListArray>(
-      type_, length, value_offsets_, values_, null_bitmap_, kUnknownNullCount, offset);
+  ConformSliceParams(data_->offset, data_->length, &offset, &length);
+  return std::make_shared<ListArray>(type(), length, value_offsets(), values(),
+      null_bitmap(), kUnknownNullCount, offset);
 }
 
 // ----------------------------------------------------------------------
@@ -174,6 +219,21 @@ std::shared_ptr<Array> ListArray::Slice(int64_t offset, int64_t length) const {
 static std::shared_ptr<DataType> kBinary = std::make_shared<BinaryType>();
 static std::shared_ptr<DataType> kString = std::make_shared<StringType>();
 
+BinaryArray::BinaryArray(const std::shared_ptr<internal::ArrayData>& data) {
+  DCHECK_EQ(data->type->id(), Type::BINARY);
+  SetData(data);
+}
+
+void BinaryArray::SetData(const std::shared_ptr<ArrayData>& data) {
+  auto value_offsets = data->buffers[1];
+  auto value_data = data->buffers[2];
+  this->Array::SetData(data);
+  raw_data_ = value_data == nullptr ? nullptr : value_data->data();
+  raw_value_offsets_ = value_offsets == nullptr
+                           ? nullptr
+                           : reinterpret_cast<const int32_t*>(value_offsets->data());
+}
+
 BinaryArray::BinaryArray(int64_t length, const std::shared_ptr<Buffer>& value_offsets,
     const std::shared_ptr<Buffer>& data, const std::shared_ptr<Buffer>& null_bitmap,
     int64_t null_count, int64_t offset)
@@ -182,22 +242,19 @@ BinaryArray::BinaryArray(int64_t length, const std::shared_ptr<Buffer>& value_of
 
 BinaryArray::BinaryArray(const std::shared_ptr<DataType>& type, int64_t length,
     const std::shared_ptr<Buffer>& value_offsets, const std::shared_ptr<Buffer>& data,
-    const std::shared_ptr<Buffer>& null_bitmap, int64_t null_count, int64_t offset)
-    : Array(type, length, null_bitmap, null_count, offset),
-      value_offsets_(value_offsets),
-      raw_value_offsets_(nullptr),
-      data_(data),
-      raw_data_(nullptr) {
-  if (value_offsets_ != nullptr) {
-    raw_value_offsets_ = reinterpret_cast<const int32_t*>(value_offsets_->data());
-  }
-  if (data_ != nullptr) { raw_data_ = data_->data(); }
+    const std::shared_ptr<Buffer>& null_bitmap, int64_t null_count, int64_t offset) {
+  BufferVector buffers = {null_bitmap, value_offsets, data};
+  SetData(
+      std::make_shared<ArrayData>(type, length, std::move(buffers), null_count, offset));
 }
 
 std::shared_ptr<Array> BinaryArray::Slice(int64_t offset, int64_t length) const {
-  ConformSliceParams(offset_, length_, &offset, &length);
-  return std::make_shared<BinaryArray>(
-      length, value_offsets_, data_, null_bitmap_, kUnknownNullCount, offset);
+  return std::make_shared<BinaryArray>(SliceData(*data_, offset, length));
+}
+
+StringArray::StringArray(const std::shared_ptr<internal::ArrayData>& data) {
+  DCHECK_EQ(data->type->id(), Type::STRING);
+  SetData(data);
 }
 
 StringArray::StringArray(int64_t length, const std::shared_ptr<Buffer>& value_offsets,
@@ -207,14 +264,18 @@ StringArray::StringArray(int64_t length, const std::shared_ptr<Buffer>& value_of
 }
 
 std::shared_ptr<Array> StringArray::Slice(int64_t offset, int64_t length) const {
-  ConformSliceParams(offset_, length_, &offset, &length);
-  return std::make_shared<StringArray>(
-      length, value_offsets_, data_, null_bitmap_, kUnknownNullCount, offset);
+  return std::make_shared<StringArray>(SliceData(*data_, offset, length));
 }
 
 // ----------------------------------------------------------------------
 // Fixed width binary
 
+FixedSizeBinaryArray::FixedSizeBinaryArray(
+    const std::shared_ptr<internal::ArrayData>& data) {
+  DCHECK_EQ(data->type->id(), Type::FIXED_SIZE_BINARY);
+  SetData(data);
+}
+
 FixedSizeBinaryArray::FixedSizeBinaryArray(const std::shared_ptr<DataType>& type,
     int64_t length, const std::shared_ptr<Buffer>& data,
     const std::shared_ptr<Buffer>& null_bitmap, int64_t null_count, int64_t offset)
@@ -222,34 +283,52 @@ FixedSizeBinaryArray::FixedSizeBinaryArray(const std::shared_ptr<DataType>& type
       byte_width_(static_cast<const FixedSizeBinaryType&>(*type).byte_width()) {}
 
 std::shared_ptr<Array> FixedSizeBinaryArray::Slice(int64_t offset, int64_t length) const {
-  ConformSliceParams(offset_, length_, &offset, &length);
-  return std::make_shared<FixedSizeBinaryArray>(
-      type_, length, data_, null_bitmap_, kUnknownNullCount, offset);
+  return std::make_shared<FixedSizeBinaryArray>(SliceData(*data_, offset, length));
 }
 
 const uint8_t* FixedSizeBinaryArray::GetValue(int64_t i) const {
-  return raw_data_ + (i + offset_) * byte_width_;
+  return raw_values_ + (i + data_->offset) * byte_width_;
 }
 
 // ----------------------------------------------------------------------
 // Decimal
+
+DecimalArray::DecimalArray(const std::shared_ptr<internal::ArrayData>& data) {
+  DCHECK_EQ(data->type->id(), Type::DECIMAL);
+  SetData(data);
+}
+
+void DecimalArray::SetData(const std::shared_ptr<ArrayData>& data) {
+  auto fixed_size_data = data->buffers[1];
+  auto sign_bitmap = data->buffers[2];
+  this->Array::SetData(data);
+
+  raw_values_ = fixed_size_data != nullptr ? fixed_size_data->data() : nullptr;
+  sign_bitmap_data_ = sign_bitmap != nullptr ? sign_bitmap->data() : nullptr;
+}
+
 DecimalArray::DecimalArray(const std::shared_ptr<DataType>& type, int64_t length,
     const std::shared_ptr<Buffer>& data, const std::shared_ptr<Buffer>& null_bitmap,
-    int64_t null_count, int64_t offset, const std::shared_ptr<Buffer>& sign_bitmap)
-    : FixedSizeBinaryArray(type, length, data, null_bitmap, null_count, offset),
-      sign_bitmap_(sign_bitmap),
-      sign_bitmap_data_(sign_bitmap != nullptr ? sign_bitmap->data() : nullptr) {}
+    int64_t null_count, int64_t offset, const std::shared_ptr<Buffer>& sign_bitmap) {
+  BufferVector buffers = {null_bitmap, data, sign_bitmap};
+  SetData(
+      std::make_shared<ArrayData>(type, length, std::move(buffers), null_count, offset));
+}
 
 bool DecimalArray::IsNegative(int64_t i) const {
   return sign_bitmap_data_ != nullptr ? BitUtil::GetBit(sign_bitmap_data_, i) : false;
 }
 
+const uint8_t* DecimalArray::GetValue(int64_t i) const {
+  return raw_values_ + (i + data_->offset) * byte_width();
+}
+
 std::string DecimalArray::FormatValue(int64_t i) const {
-  const auto type_ = std::dynamic_pointer_cast<DecimalType>(type());
-  const int precision = type_->precision();
-  const int scale = type_->scale();
-  const int byte_width = byte_width_;
-  const uint8_t* bytes = GetValue(i);
+  const auto& type_ = static_cast<const DecimalType&>(*type());
+  const int precision = type_.precision();
+  const int scale = type_.scale();
+  const int byte_width = type_.byte_width();
+  const uint8_t* bytes = raw_values_ + (i + data_->offset) * byte_width;
   switch (byte_width) {
     case 4: {
       decimal::Decimal32 value;
@@ -274,73 +353,110 @@ std::string DecimalArray::FormatValue(int64_t i) const {
 }
 
 std::shared_ptr<Array> DecimalArray::Slice(int64_t offset, int64_t length) const {
-  ConformSliceParams(offset_, length_, &offset, &length);
-  return std::make_shared<DecimalArray>(
-      type_, length, data_, null_bitmap_, kUnknownNullCount, offset, sign_bitmap_);
+  return std::make_shared<DecimalArray>(SliceData(*data_, offset, length));
 }
 
 // ----------------------------------------------------------------------
 // Struct
 
+StructArray::StructArray(const std::shared_ptr<ArrayData>& data) {
+  DCHECK_EQ(data->type->id(), Type::STRUCT);
+  SetData(data);
+}
+
 StructArray::StructArray(const std::shared_ptr<DataType>& type, int64_t length,
     const std::vector<std::shared_ptr<Array>>& children,
-    std::shared_ptr<Buffer> null_bitmap, int64_t null_count, int64_t offset)
-    : Array(type, length, null_bitmap, null_count, offset) {
-  type_ = type;
-  children_ = children;
+    std::shared_ptr<Buffer> null_bitmap, int64_t null_count, int64_t offset) {
+  BufferVector buffers = {null_bitmap};
+  SetData(
+      std::make_shared<ArrayData>(type, length, std::move(buffers), null_count, offset));
+  for (const auto& child : children) {
+    data_->child_data.push_back(child->data());
+  }
 }
 
 std::shared_ptr<Array> StructArray::field(int pos) const {
-  DCHECK_GT(children_.size(), 0);
-  return children_[pos];
+  std::shared_ptr<Array> result;
+  DCHECK(internal::MakeArray(data_->child_data[pos], &result).ok());
+  return result;
 }
 
 std::shared_ptr<Array> StructArray::Slice(int64_t offset, int64_t length) const {
-  ConformSliceParams(offset_, length_, &offset, &length);
-  return std::make_shared<StructArray>(
-      type_, length, children_, null_bitmap_, kUnknownNullCount, offset);
+  return std::make_shared<StructArray>(SliceData(*data_, offset, length));
 }
 
 // ----------------------------------------------------------------------
 // UnionArray
 
+void UnionArray::SetData(const std::shared_ptr<ArrayData>& data) {
+  this->Array::SetData(data);
+
+  auto type_ids = data_->buffers[1];
+  auto value_offsets = data_->buffers[2];
+  raw_type_ids_ =
+      type_ids == nullptr ? nullptr : reinterpret_cast<const uint8_t*>(type_ids->data());
+  raw_value_offsets_ = value_offsets == nullptr
+                           ? nullptr
+                           : reinterpret_cast<const int32_t*>(value_offsets->data());
+}
+
+UnionArray::UnionArray(const std::shared_ptr<ArrayData>& data) {
+  DCHECK_EQ(data->type->id(), Type::UNION);
+  SetData(data);
+}
+
 UnionArray::UnionArray(const std::shared_ptr<DataType>& type, int64_t length,
     const std::vector<std::shared_ptr<Array>>& children,
     const std::shared_ptr<Buffer>& type_ids, const std::shared_ptr<Buffer>& value_offsets,
-    const std::shared_ptr<Buffer>& null_bitmap, int64_t null_count, int64_t offset)
-    : Array(type, length, null_bitmap, null_count, offset),
-      children_(children),
-      type_ids_(type_ids),
-      raw_type_ids_(nullptr),
-      value_offsets_(value_offsets),
-      raw_value_offsets_(nullptr) {
-  if (type_ids) { raw_type_ids_ = reinterpret_cast<const uint8_t*>(type_ids->data()); }
-  if (value_offsets) {
-    raw_value_offsets_ = reinterpret_cast<const int32_t*>(value_offsets->data());
+    const std::shared_ptr<Buffer>& null_bitmap, int64_t null_count, int64_t offset) {
+  BufferVector buffers = {null_bitmap, type_ids, value_offsets};
+  auto internal_data =
+      std::make_shared<ArrayData>(type, length, std::move(buffers), null_count, offset);
+  for (const auto& child : children) {
+    internal_data->child_data.push_back(child->data());
   }
+  SetData(internal_data);
 }
 
 std::shared_ptr<Array> UnionArray::child(int pos) const {
-  DCHECK_GT(children_.size(), 0);
-  return children_[pos];
+  std::shared_ptr<Array> result;
+  DCHECK(internal::MakeArray(data_->child_data[pos], &result).ok());
+  return result;
 }
 
 std::shared_ptr<Array> UnionArray::Slice(int64_t offset, int64_t length) const {
-  ConformSliceParams(offset_, length_, &offset, &length);
-  return std::make_shared<UnionArray>(type_, length, children_, type_ids_, value_offsets_,
-      null_bitmap_, kUnknownNullCount, offset);
+  return std::make_shared<UnionArray>(SliceData(*data_, offset, length));
 }
 
 // ----------------------------------------------------------------------
 // DictionaryArray
 
+DictionaryArray::DictionaryArray(const std::shared_ptr<ArrayData>& data)
+    : dict_type_(static_cast<const DictionaryType*>(data->type.get())) {
+  DCHECK_EQ(data->type->id(), Type::DICTIONARY);
+  SetData(data);
+}
+
 DictionaryArray::DictionaryArray(
     const std::shared_ptr<DataType>& type, const std::shared_ptr<Array>& indices)
-    : Array(type, indices->length(), indices->null_bitmap(), indices->null_count(),
-          indices->offset()),
-      dict_type_(static_cast<const DictionaryType*>(type.get())),
-      indices_(indices) {
+    : dict_type_(static_cast<const DictionaryType*>(type.get())) {
   DCHECK_EQ(type->id(), Type::DICTIONARY);
+  DCHECK_EQ(indices->type_id(), dict_type_->index_type()->id());
+  auto data = indices->data()->ShallowCopy();
+  data->type = type;
+  SetData(data);
+}
+
+void DictionaryArray::SetData(const std::shared_ptr<ArrayData>& data) {
+  this->Array::SetData(data);
+  auto indices_data = data_->ShallowCopy();
+  indices_data->type = dict_type_->index_type();
+  std::shared_ptr<Array> result;
+  DCHECK(internal::MakeArray(indices_data, &indices_).ok());
+}
+
+std::shared_ptr<Array> DictionaryArray::indices() const {
+  return indices_;
 }
 
 std::shared_ptr<Array> DictionaryArray::dictionary() const {
@@ -348,8 +464,7 @@ std::shared_ptr<Array> DictionaryArray::dictionary() const {
 }
 
 std::shared_ptr<Array> DictionaryArray::Slice(int64_t offset, int64_t length) const {
-  std::shared_ptr<Array> sliced_indices = indices_->Slice(offset, length);
-  return std::make_shared<DictionaryArray>(type_, sliced_indices);
+  return std::make_shared<DictionaryArray>(SliceData(*data_, offset, length));
 }
 
 // ----------------------------------------------------------------------
@@ -367,6 +482,8 @@ struct ValidateVisitor {
 
   Status Visit(const PrimitiveArray& array) { return Status::OK(); }
 
+  Status Visit(const DecimalArray& array) { return Status::OK(); }
+
   Status Visit(const BinaryArray& array) {
     // TODO(wesm): what to do here?
     return Status::OK();
@@ -435,11 +552,12 @@ struct ValidateVisitor {
       return Status::Invalid("Null count exceeds the length of this struct");
     }
 
-    if (array.fields().size() > 0) {
+    if (array.num_fields() > 0) {
       // Validate fields
-      int64_t array_length = array.fields()[0]->length();
+      int64_t array_length = array.field(0)->length();
       size_t idx = 0;
-      for (auto it : array.fields()) {
+      for (int i = 0; i < array.num_fields(); ++i) {
+        auto it = array.field(i);
         if (it->length() != array_length) {
           std::stringstream ss;
           ss << "Length is not equal from field " << it->type()->ToString()
@@ -488,6 +606,51 @@ Status ValidateArray(const Array& array) {
 }
 
 // ----------------------------------------------------------------------
+// Loading from ArrayData
+
+namespace internal {
+
+class ArrayDataWrapper {
+ public:
+  ArrayDataWrapper(const std::shared_ptr<ArrayData>& data, std::shared_ptr<Array>* out)
+      : data_(data), out_(out) {}
+
+  template <typename T>
+  Status Visit(const T& type) {
+    using ArrayType = typename TypeTraits<T>::ArrayType;
+    *out_ = std::make_shared<ArrayType>(data_);
+    return Status::OK();
+  }
+
+  const std::shared_ptr<ArrayData>& data_;
+  std::shared_ptr<Array>* out_;
+};
+
+Status MakeArray(const std::shared_ptr<ArrayData>& data, std::shared_ptr<Array>* out) {
+  ArrayDataWrapper wrapper_visitor(data, out);
+  return VisitTypeInline(*data->type, &wrapper_visitor);
+}
+
+}  // namespace internal
+
+Status MakePrimitiveArray(const std::shared_ptr<DataType>& type, int64_t length,
+    const std::shared_ptr<Buffer>& data, const std::shared_ptr<Buffer>& null_bitmap,
+    int64_t null_count, int64_t offset, std::shared_ptr<Array>* out) {
+  BufferVector buffers = {null_bitmap, data};
+  auto internal_data = std::make_shared<internal::ArrayData>(
+      type, length, std::move(buffers), null_count, offset);
+  return internal::MakeArray(internal_data, out);
+}
+
+Status MakePrimitiveArray(const std::shared_ptr<DataType>& type,
+    const std::vector<std::shared_ptr<Buffer>>& buffers, int64_t length,
+    int64_t null_count, int64_t offset, std::shared_ptr<Array>* out) {
+  auto internal_data =
+      std::make_shared<internal::ArrayData>(type, length, buffers, null_count, offset);
+  return internal::MakeArray(internal_data, out);
+}
+
+// ----------------------------------------------------------------------
 // Instantiate templates
 
 template class ARROW_TEMPLATE_EXPORT NumericArray<UInt8Type>;

http://git-wip-us.apache.org/repos/asf/arrow/blob/84520711/cpp/src/arrow/array.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/array.h b/cpp/src/arrow/array.h
index 59269ad..80284cd 100644
--- a/cpp/src/arrow/array.h
+++ b/cpp/src/arrow/array.h
@@ -35,13 +35,125 @@
 
 namespace arrow {
 
+using BufferVector = std::vector<std::shared_ptr<Buffer>>;
+
+// When slicing, we do not know the null count of the sliced range without
+// doing some computation. To avoid doing this eagerly, we set the null count
+// to -1 (any negative number will do). When Array::null_count is called the
+// first time, the null count will be computed. See ARROW-33
+constexpr int64_t kUnknownNullCount = -1;
+
 class MemoryPool;
-class MutableBuffer;
 class Status;
 
 template <typename T>
 struct Decimal;
 
+// ----------------------------------------------------------------------
+// Generic array data container
+
+namespace internal {
+
+/// \brief Mutable internal container for generic Arrow array data
+///
+/// This data structure is a self-contained representation of the memory and
+/// metadata inside an Arrow array data structure (called vectors in Java). The
+/// classes arrow::Array and its subclasses provide strongly-typed accessors
+/// with support for the visitor pattern and other affordances.
+///
+/// This class is designed for easy internal data manipulation, analytical data
+/// processing, and data transport to and from IPC messages. For example, we
+/// could cast from int64 to float64 like so:
+///
+/// Int64Array arr = GetMyData();
+/// auto new_data = arr->data()->ShallowCopy();
+/// new_data->type = arrow::float64();
+/// Float64Array double_arr(new_data);
+///
+/// This object is also useful in an analytics setting where memory may be
+/// reused. For example, if we had a group of operations all returning doubles,
+/// say:
+///
+/// Log(Sqrt(Expr(arr))
+///
+/// Then the low-level implementations of each of these functions could have
+/// the signatures
+///
+/// void Log(const ArrayData& values, ArrayData* out);
+///
+/// As another example a function may consume one or more memory buffers in an
+/// input array and replace them with newly-allocated data, changing the output
+/// data type as well.
+struct ARROW_EXPORT ArrayData {
+  ArrayData() {}
+
+  ArrayData(const std::shared_ptr<DataType>& type, int64_t length,
+      const std::vector<std::shared_ptr<Buffer>>& buffers,
+      int64_t null_count = kUnknownNullCount, int64_t offset = 0)
+      : type(type),
+        length(length),
+        buffers(buffers),
+        null_count(null_count),
+        offset(offset) {}
+
+  ArrayData(const std::shared_ptr<DataType>& type, int64_t length,
+      std::vector<std::shared_ptr<Buffer>>&& buffers,
+      int64_t null_count = kUnknownNullCount, int64_t offset = 0)
+      : type(type),
+        length(length),
+        buffers(std::move(buffers)),
+        null_count(null_count),
+        offset(offset) {}
+
+  // Move constructor
+  ArrayData(ArrayData&& other) noexcept
+      : type(std::move(other.type)),
+        length(other.length),
+        buffers(std::move(other.buffers)),
+        null_count(other.null_count),
+        offset(other.offset),
+        child_data(std::move(other.child_data)) {}
+
+  ArrayData(const ArrayData& other) noexcept
+      : type(other.type),
+        length(other.length),
+        buffers(other.buffers),
+        null_count(other.null_count),
+        offset(other.offset),
+        child_data(other.child_data) {}
+
+  // Move assignment
+  ArrayData& operator=(ArrayData&& other) {
+    type = std::move(other.type);
+    length = other.length;
+    buffers = std::move(other.buffers);
+    null_count = other.null_count;
+    offset = other.offset;
+    child_data = std::move(other.child_data);
+    return *this;
+  }
+
+  std::shared_ptr<ArrayData> ShallowCopy() const {
+    return std::make_shared<ArrayData>(*this);
+  }
+
+  std::shared_ptr<DataType> type;
+  int64_t length;
+  std::vector<std::shared_ptr<Buffer>> buffers;
+  int64_t null_count;
+  int64_t offset;
+  std::vector<std::shared_ptr<ArrayData>> child_data;
+};
+
+Status ARROW_EXPORT MakeArray(
+    const std::shared_ptr<ArrayData>& data, std::shared_ptr<Array>* out);
+
+}  // namespace internal
+
+// ----------------------------------------------------------------------
+// User array accessor types
+
+/// \brief Array base type
 /// Immutable data array with some logical type and some length.
 ///
 /// Any memory is owned by the respective Buffer instance (or its parents).
@@ -54,24 +166,20 @@ struct Decimal;
 /// be computed on the first call to null_count()
 class ARROW_EXPORT Array {
  public:
-  Array(const std::shared_ptr<DataType>& type, int64_t length,
-      const std::shared_ptr<Buffer>& null_bitmap = nullptr, int64_t null_count = 0,
-      int64_t offset = 0);
-
   virtual ~Array() = default;
 
   /// Determine if a slot is null. For inner loops. Does *not* boundscheck
   bool IsNull(int64_t i) const {
     return null_bitmap_data_ != nullptr &&
-           BitUtil::BitNotSet(null_bitmap_data_, i + offset_);
+           BitUtil::BitNotSet(null_bitmap_data_, i + data_->offset);
   }
 
   /// Size in the number of elements this array contains.
-  int64_t length() const { return length_; }
+  int64_t length() const { return data_->length; }
 
   /// A relative position into another array's data, to enable zero-copy
   /// slicing. This value defaults to zero
-  int64_t offset() const { return offset_; }
+  int64_t offset() const { return data_->offset; }
 
   /// The number of null entries in the array. If the null count was not known
   /// at time of construction (and set to a negative value), then the null
@@ -79,14 +187,14 @@ class ARROW_EXPORT Array {
   /// function
   int64_t null_count() const;
 
-  std::shared_ptr<DataType> type() const { return type_; }
-  Type::type type_id() const { return type_->id(); }
+  std::shared_ptr<DataType> type() const { return data_->type; }
+  Type::type type_id() const { return data_->type->id(); }
 
   /// Buffer for the null bitmap.
   ///
   /// Note that for `null_count == 0`, this can be a `nullptr`.
   /// This buffer does not account for any slice offset
-  std::shared_ptr<Buffer> null_bitmap() const { return null_bitmap_; }
+  std::shared_ptr<Buffer> null_bitmap() const { return data_->buffers[0]; }
 
   /// Raw pointer to the null bitmap.
   ///
@@ -124,49 +232,77 @@ class ARROW_EXPORT Array {
   /// Slice from offset until end of the array
   std::shared_ptr<Array> Slice(int64_t offset) const;
 
- protected:
-  std::shared_ptr<DataType> type_;
-  int64_t length_;
-  int64_t offset_;
+  std::shared_ptr<internal::ArrayData> data() const { return data_; }
+
+  int num_fields() const { return static_cast<int>(data_->child_data.size()); }
 
-  // This member is marked mutable so that it can be modified when null_count()
-  // is called from a const context and the null count has to be computed (if
-  // it is not already known)
-  mutable int64_t null_count_;
+ protected:
+  Array() {}
 
-  std::shared_ptr<Buffer> null_bitmap_;
+  std::shared_ptr<internal::ArrayData> data_;
   const uint8_t* null_bitmap_data_;
 
+  /// Protected method for constructors
+  inline void SetData(const std::shared_ptr<internal::ArrayData>& data) {
+    if (data->buffers.size() > 0 && data->buffers[0]) {
+      null_bitmap_data_ = data->buffers[0]->data();
+    } else {
+      null_bitmap_data_ = nullptr;
+    }
+    data_ = data;
+  }
+
  private:
-  Array() {}
   DISALLOW_COPY_AND_ASSIGN(Array);
 };
 
+ARROW_EXPORT std::ostream& operator<<(std::ostream& os, const Array& x);
+
+class ARROW_EXPORT FlatArray : public Array {
+ protected:
+  using Array::Array;
+};
+
 /// Degenerate null type Array
-class ARROW_EXPORT NullArray : public Array {
+class ARROW_EXPORT NullArray : public FlatArray {
  public:
   using TypeClass = NullType;
 
+  explicit NullArray(const std::shared_ptr<internal::ArrayData>& data) { SetData(data); }
+
   explicit NullArray(int64_t length);
 
   std::shared_ptr<Array> Slice(int64_t offset, int64_t length) const override;
 };
 
 /// Base class for fixed-size logical types
-class ARROW_EXPORT PrimitiveArray : public Array {
+class ARROW_EXPORT PrimitiveArray : public FlatArray {
  public:
   PrimitiveArray(const std::shared_ptr<DataType>& type, int64_t length,
       const std::shared_ptr<Buffer>& data,
       const std::shared_ptr<Buffer>& null_bitmap = nullptr, int64_t null_count = 0,
       int64_t offset = 0);
 
-  /// The memory containing this array's data
-  /// This buffer does not account for any slice offset
-  std::shared_ptr<Buffer> data() const { return data_; }
+  /// Does not account for any slice offset
+  std::shared_ptr<Buffer> values() const { return data_->buffers[1]; }
+
+  /// Does not account for any slice offset
+  const uint8_t* raw_values() const { return raw_values_; }
 
  protected:
-  std::shared_ptr<Buffer> data_;
-  const uint8_t* raw_data_;
+  PrimitiveArray() {}
+
+  inline void SetData(const std::shared_ptr<internal::ArrayData>& data) {
+    auto values = data->buffers[1];
+    this->Array::SetData(data);
+    raw_values_ = values == nullptr ? nullptr : values->data();
+  }
+
+  explicit inline PrimitiveArray(const std::shared_ptr<internal::ArrayData>& data) {
+    SetData(data);
+  }
+
+  const uint8_t* raw_values_;
 };
 
 template <typename TYPE>
@@ -175,7 +311,7 @@ class ARROW_EXPORT NumericArray : public PrimitiveArray {
   using TypeClass = TYPE;
   using value_type = typename TypeClass::c_type;
 
-  using PrimitiveArray::PrimitiveArray;
+  explicit NumericArray(const std::shared_ptr<internal::ArrayData>& data);
 
   // Only enable this constructor without a type argument for types without additional
   // metadata
@@ -188,20 +324,23 @@ class ARROW_EXPORT NumericArray : public PrimitiveArray {
       : PrimitiveArray(TypeTraits<T1>::type_singleton(), length, data, null_bitmap,
             null_count, offset) {}
 
-  const value_type* raw_data() const {
-    return reinterpret_cast<const value_type*>(raw_data_) + offset_;
+  const value_type* raw_values() const {
+    return reinterpret_cast<const value_type*>(raw_values_) + data_->offset;
   }
 
   std::shared_ptr<Array> Slice(int64_t offset, int64_t length) const override;
 
-  value_type Value(int64_t i) const { return raw_data()[i]; }
+  value_type Value(int64_t i) const { return raw_values()[i]; }
+
+ protected:
+  using PrimitiveArray::PrimitiveArray;
 };
 
 class ARROW_EXPORT BooleanArray : public PrimitiveArray {
  public:
   using TypeClass = BooleanType;
 
-  using PrimitiveArray::PrimitiveArray;
+  explicit BooleanArray(const std::shared_ptr<internal::ArrayData>& data);
 
   BooleanArray(int64_t length, const std::shared_ptr<Buffer>& data,
       const std::shared_ptr<Buffer>& null_bitmap = nullptr, int64_t null_count = 0,
@@ -210,8 +349,12 @@ class ARROW_EXPORT BooleanArray : public PrimitiveArray {
   std::shared_ptr<Array> Slice(int64_t offset, int64_t length) const override;
 
   bool Value(int64_t i) const {
-    return BitUtil::GetBit(reinterpret_cast<const uint8_t*>(raw_data_), i + offset_);
+    return BitUtil::GetBit(
+        reinterpret_cast<const uint8_t*>(raw_values_), i + data_->offset);
   }
+
+ protected:
+  using PrimitiveArray::PrimitiveArray;
 };
 
 // ----------------------------------------------------------------------
@@ -221,52 +364,50 @@ class ARROW_EXPORT ListArray : public Array {
  public:
   using TypeClass = ListType;
 
+  explicit ListArray(const std::shared_ptr<internal::ArrayData>& data);
+
   ListArray(const std::shared_ptr<DataType>& type, int64_t length,
       const std::shared_ptr<Buffer>& value_offsets, const std::shared_ptr<Array>& values,
       const std::shared_ptr<Buffer>& null_bitmap = nullptr, int64_t null_count = 0,
-      int64_t offset = 0)
-      : Array(type, length, null_bitmap, null_count, offset) {
-    value_offsets_ = value_offsets;
-    raw_value_offsets_ = value_offsets == nullptr
-                             ? nullptr
-                             : reinterpret_cast<const int32_t*>(value_offsets_->data());
-    values_ = values;
-  }
+      int64_t offset = 0);
 
-  // Return a shared pointer in case the requestor desires to share ownership
-  // with this array.
-  std::shared_ptr<Array> values() const { return values_; }
+  /// \brief Return array object containing the list's values
+  std::shared_ptr<Array> values() const;
 
   /// Note that this buffer does not account for any slice offset
-  std::shared_ptr<Buffer> value_offsets() const { return value_offsets_; }
+  std::shared_ptr<Buffer> value_offsets() const { return data_->buffers[1]; }
 
-  std::shared_ptr<DataType> value_type() const { return values_->type(); }
+  std::shared_ptr<DataType> value_type() const;
 
   /// Return pointer to raw value offsets accounting for any slice offset
-  const int32_t* raw_value_offsets() const { return raw_value_offsets_ + offset_; }
+  const int32_t* raw_value_offsets() const { return raw_value_offsets_ + data_->offset; }
 
   // Neither of these functions will perform boundschecking
-  int32_t value_offset(int64_t i) const { return raw_value_offsets_[i + offset_]; }
+  int32_t value_offset(int64_t i) const { return raw_value_offsets_[i + data_->offset]; }
   int32_t value_length(int64_t i) const {
-    i += offset_;
+    i += data_->offset;
     return raw_value_offsets_[i + 1] - raw_value_offsets_[i];
   }
 
   std::shared_ptr<Array> Slice(int64_t offset, int64_t length) const override;
 
  protected:
-  std::shared_ptr<Buffer> value_offsets_;
+  void SetData(const std::shared_ptr<internal::ArrayData>& data);
   const int32_t* raw_value_offsets_;
+
+ private:
   std::shared_ptr<Array> values_;
 };
 
 // ----------------------------------------------------------------------
 // Binary and String
 
-class ARROW_EXPORT BinaryArray : public Array {
+class ARROW_EXPORT BinaryArray : public FlatArray {
  public:
   using TypeClass = BinaryType;
 
+  explicit BinaryArray(const std::shared_ptr<internal::ArrayData>& data);
+
   BinaryArray(int64_t length, const std::shared_ptr<Buffer>& value_offsets,
       const std::shared_ptr<Buffer>& data,
       const std::shared_ptr<Buffer>& null_bitmap = nullptr, int64_t null_count = 0,
@@ -277,7 +418,7 @@ class ARROW_EXPORT BinaryArray : public Array {
   // pointer + offset
   const uint8_t* GetValue(int64_t i, int32_t* out_length) const {
     // Account for base offset
-    i += offset_;
+    i += data_->offset;
 
     const int32_t pos = raw_value_offsets_[i];
     *out_length = raw_value_offsets_[i + 1] - pos;
@@ -285,23 +426,29 @@ class ARROW_EXPORT BinaryArray : public Array {
   }
 
   /// Note that this buffer does not account for any slice offset
-  std::shared_ptr<Buffer> data() const { return data_; }
+  std::shared_ptr<Buffer> value_offsets() const { return data_->buffers[1]; }
 
   /// Note that this buffer does not account for any slice offset
-  std::shared_ptr<Buffer> value_offsets() const { return value_offsets_; }
+  std::shared_ptr<Buffer> value_data() const { return data_->buffers[2]; }
 
-  const int32_t* raw_value_offsets() const { return raw_value_offsets_ + offset_; }
+  const int32_t* raw_value_offsets() const { return raw_value_offsets_ + data_->offset; }
 
   // Neither of these functions will perform boundschecking
-  int32_t value_offset(int64_t i) const { return raw_value_offsets_[i + offset_]; }
+  int32_t value_offset(int64_t i) const { return raw_value_offsets_[i + data_->offset]; }
   int32_t value_length(int64_t i) const {
-    i += offset_;
+    i += data_->offset;
     return raw_value_offsets_[i + 1] - raw_value_offsets_[i];
   }
 
   std::shared_ptr<Array> Slice(int64_t offset, int64_t length) const override;
 
  protected:
+  // For subclasses
+  BinaryArray() {}
+
+  /// Protected method for constructors
+  void SetData(const std::shared_ptr<internal::ArrayData>& data);
+
   // Constructor that allows sub-classes/builders to propagate there logical type up the
   // class hierarchy.
   BinaryArray(const std::shared_ptr<DataType>& type, int64_t length,
@@ -309,10 +456,7 @@ class ARROW_EXPORT BinaryArray : public Array {
       const std::shared_ptr<Buffer>& null_bitmap = nullptr, int64_t null_count = 0,
       int64_t offset = 0);
 
-  std::shared_ptr<Buffer> value_offsets_;
   const int32_t* raw_value_offsets_;
-
-  std::shared_ptr<Buffer> data_;
   const uint8_t* raw_data_;
 };
 
@@ -320,6 +464,8 @@ class ARROW_EXPORT StringArray : public BinaryArray {
  public:
   using TypeClass = StringType;
 
+  explicit StringArray(const std::shared_ptr<internal::ArrayData>& data);
+
   StringArray(int64_t length, const std::shared_ptr<Buffer>& value_offsets,
       const std::shared_ptr<Buffer>& data,
       const std::shared_ptr<Buffer>& null_bitmap = nullptr, int64_t null_count = 0,
@@ -343,6 +489,8 @@ class ARROW_EXPORT FixedSizeBinaryArray : public PrimitiveArray {
  public:
   using TypeClass = FixedSizeBinaryType;
 
+  explicit FixedSizeBinaryArray(const std::shared_ptr<internal::ArrayData>& data);
+
   FixedSizeBinaryArray(const std::shared_ptr<DataType>& type, int64_t length,
       const std::shared_ptr<Buffer>& data,
       const std::shared_ptr<Buffer>& null_bitmap = nullptr, int64_t null_count = 0,
@@ -352,20 +500,28 @@ class ARROW_EXPORT FixedSizeBinaryArray : public PrimitiveArray {
 
   int32_t byte_width() const { return byte_width_; }
 
-  const uint8_t* raw_data() const { return raw_data_; }
+  const uint8_t* raw_values() const { return raw_values_; }
 
   std::shared_ptr<Array> Slice(int64_t offset, int64_t length) const override;
 
  protected:
+  inline void SetData(const std::shared_ptr<internal::ArrayData>& data) {
+    this->PrimitiveArray::SetData(data);
+    byte_width_ = static_cast<const FixedSizeBinaryType&>(*type()).byte_width();
+  }
+
   int32_t byte_width_;
 };
 
 // ----------------------------------------------------------------------
 // DecimalArray
-class ARROW_EXPORT DecimalArray : public FixedSizeBinaryArray {
+class ARROW_EXPORT DecimalArray : public FlatArray {
  public:
   using TypeClass = Type;
 
+  /// \brief Construct DecimalArray from internal::ArrayData instance
+  explicit DecimalArray(const std::shared_ptr<internal::ArrayData>& data);
+
   DecimalArray(const std::shared_ptr<DataType>& type, int64_t length,
       const std::shared_ptr<Buffer>& data,
       const std::shared_ptr<Buffer>& null_bitmap = nullptr, int64_t null_count = 0,
@@ -373,13 +529,27 @@ class ARROW_EXPORT DecimalArray : public FixedSizeBinaryArray {
 
   bool IsNegative(int64_t i) const;
 
+  const uint8_t* GetValue(int64_t i) const;
+
   std::string FormatValue(int64_t i) const;
 
   std::shared_ptr<Array> Slice(int64_t offset, int64_t length) const override;
 
- private:
+  /// \brief The main decimal data
+  /// For 32/64-bit decimal this is everything
+  std::shared_ptr<Buffer> values() const { return data_->buffers[1]; }
+
   /// Only needed for 128 bit Decimals
-  std::shared_ptr<Buffer> sign_bitmap_;
+  std::shared_ptr<Buffer> sign_bitmap() const { return data_->buffers[2]; }
+
+  int32_t byte_width() const {
+    return static_cast<const DecimalType&>(*type()).byte_width();
+  }
+  const uint8_t* raw_values() const { return raw_values_; }
+
+ private:
+  void SetData(const std::shared_ptr<internal::ArrayData>& data);
+  const uint8_t* raw_values_;
   const uint8_t* sign_bitmap_data_;
 };
 
@@ -390,6 +560,8 @@ class ARROW_EXPORT StructArray : public Array {
  public:
   using TypeClass = StructType;
 
+  explicit StructArray(const std::shared_ptr<internal::ArrayData>& data);
+
   StructArray(const std::shared_ptr<DataType>& type, int64_t length,
       const std::vector<std::shared_ptr<Array>>& children,
       std::shared_ptr<Buffer> null_bitmap = nullptr, int64_t null_count = 0,
@@ -399,13 +571,7 @@ class ARROW_EXPORT StructArray : public Array {
   // with this array.
   std::shared_ptr<Array> field(int pos) const;
 
-  const std::vector<std::shared_ptr<Array>>& fields() const { return children_; }
-
   std::shared_ptr<Array> Slice(int64_t offset, int64_t length) const override;
-
- protected:
-  // The child arrays corresponding to each field of the struct data type.
-  std::vector<std::shared_ptr<Array>> children_;
 };
 
 // ----------------------------------------------------------------------
@@ -416,6 +582,8 @@ class ARROW_EXPORT UnionArray : public Array {
   using TypeClass = UnionType;
   using type_id_t = uint8_t;
 
+  explicit UnionArray(const std::shared_ptr<internal::ArrayData>& data);
+
   UnionArray(const std::shared_ptr<DataType>& type, int64_t length,
       const std::vector<std::shared_ptr<Array>>& children,
       const std::shared_ptr<Buffer>& type_ids,
@@ -424,29 +592,24 @@ class ARROW_EXPORT UnionArray : public Array {
       int64_t offset = 0);
 
   /// Note that this buffer does not account for any slice offset
-  std::shared_ptr<Buffer> type_ids() const { return type_ids_; }
+  std::shared_ptr<Buffer> type_ids() const { return data_->buffers[1]; }
 
   /// Note that this buffer does not account for any slice offset
-  std::shared_ptr<Buffer> value_offsets() const { return value_offsets_; }
+  std::shared_ptr<Buffer> value_offsets() const { return data_->buffers[2]; }
 
-  const type_id_t* raw_type_ids() const { return raw_type_ids_ + offset_; }
-  const int32_t* raw_value_offsets() const { return raw_value_offsets_ + offset_; }
+  const type_id_t* raw_type_ids() const { return raw_type_ids_ + data_->offset; }
+  const int32_t* raw_value_offsets() const { return raw_value_offsets_ + data_->offset; }
 
-  UnionMode mode() const { return static_cast<const UnionType&>(*type_.get()).mode(); }
+  UnionMode mode() const { return static_cast<const UnionType&>(*type()).mode(); }
 
   std::shared_ptr<Array> child(int pos) const;
 
-  const std::vector<std::shared_ptr<Array>>& children() const { return children_; }
-
   std::shared_ptr<Array> Slice(int64_t offset, int64_t length) const override;
 
  protected:
-  std::vector<std::shared_ptr<Array>> children_;
+  void SetData(const std::shared_ptr<internal::ArrayData>& data);
 
-  std::shared_ptr<Buffer> type_ids_;
   const type_id_t* raw_type_ids_;
-
-  std::shared_ptr<Buffer> value_offsets_;
   const int32_t* raw_value_offsets_;
 };
 
@@ -472,17 +635,21 @@ class ARROW_EXPORT DictionaryArray : public Array {
  public:
   using TypeClass = DictionaryType;
 
+  explicit DictionaryArray(const std::shared_ptr<internal::ArrayData>& data);
+
   DictionaryArray(
       const std::shared_ptr<DataType>& type, const std::shared_ptr<Array>& indices);
 
-  std::shared_ptr<Array> indices() const { return indices_; }
+  std::shared_ptr<Array> indices() const;
   std::shared_ptr<Array> dictionary() const;
 
   const DictionaryType* dict_type() const { return dict_type_; }
 
   std::shared_ptr<Array> Slice(int64_t offset, int64_t length) const override;
 
- protected:
+ private:
+  void SetData(const std::shared_ptr<internal::ArrayData>& data);
+
   const DictionaryType* dict_type_;
   std::shared_ptr<Array> indices_;
 };
@@ -517,6 +684,16 @@ ARROW_EXTERN_TEMPLATE NumericArray<TimestampType>;
 /// \return Status
 Status ARROW_EXPORT ValidateArray(const Array& array);
 
+/// Create new arrays for logical types that are backed by primitive arrays.
+Status ARROW_EXPORT MakePrimitiveArray(const std::shared_ptr<DataType>& type,
+    int64_t length, const std::shared_ptr<Buffer>& data,
+    const std::shared_ptr<Buffer>& null_bitmap, int64_t null_count, int64_t offset,
+    std::shared_ptr<Array>* out);
+
+Status ARROW_EXPORT MakePrimitiveArray(const std::shared_ptr<DataType>& type,
+    const std::vector<std::shared_ptr<Buffer>>& buffers, int64_t length,
+    int64_t null_count, int64_t offset, std::shared_ptr<Array>* out);
+
 }  // namespace arrow
 
 #endif

http://git-wip-us.apache.org/repos/asf/arrow/blob/84520711/cpp/src/arrow/builder.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/builder.cc b/cpp/src/arrow/builder.cc
index a57f75a..c3bc745 100644
--- a/cpp/src/arrow/builder.cc
+++ b/cpp/src/arrow/builder.cc
@@ -150,15 +150,15 @@ void ArrayBuilder::UnsafeSetNotNull(int64_t length) {
   const int64_t new_length = length + length_;
 
   // Fill up the bytes until we have a byte alignment
-  int64_t pad_to_byte = 8 - (length_ % 8);
+  int64_t pad_to_byte = std::min<int64_t>(8 - (length_ % 8), length);
   if (pad_to_byte == 8) { pad_to_byte = 0; }
-  for (int64_t i = 0; i < pad_to_byte; ++i) {
+  for (int64_t i = length_; i < length_ + pad_to_byte; ++i) {
     BitUtil::SetBit(null_bitmap_data_, i);
   }
 
   // Fast bitsetting
   int64_t fast_length = (length - pad_to_byte) / 8;
-  memset(null_bitmap_data_ + ((length_ + pad_to_byte) / 8), 255,
+  memset(null_bitmap_data_ + ((length_ + pad_to_byte) / 8), 0xFF,
       static_cast<size_t>(fast_length));
 
   // Trailing bytes
@@ -700,11 +700,11 @@ template <typename T>
 Status DictionaryBuilder<T>::Finish(std::shared_ptr<Array>* out) {
   std::shared_ptr<Array> dictionary;
   RETURN_NOT_OK(dict_builder_.Finish(&dictionary));
-  auto type = std::make_shared<DictionaryType>(type_, dictionary);
 
   std::shared_ptr<Array> values;
   RETURN_NOT_OK(values_builder_.Finish(&values));
 
+  auto type = std::make_shared<DictionaryType>(values->type(), dictionary);
   *out = std::make_shared<DictionaryArray>(type, values);
   return Status::OK();
 }
@@ -1031,6 +1031,7 @@ Status ListBuilder::Finish(std::shared_ptr<Array>* out) {
 void ListBuilder::Reset() {
   capacity_ = length_ = null_count_ = 0;
   null_bitmap_ = nullptr;
+  values_ = nullptr;
 }
 
 ArrayBuilder* ListBuilder::value_builder() const {
@@ -1061,7 +1062,7 @@ Status BinaryBuilder::Finish(std::shared_ptr<Array>* out) {
   auto values = std::dynamic_pointer_cast<UInt8Array>(list->values());
 
   *out = std::make_shared<BinaryArray>(list->length(), list->value_offsets(),
-      values->data(), list->null_bitmap(), list->null_count());
+      values->values(), list->null_bitmap(), list->null_count());
   return Status::OK();
 }
 
@@ -1086,7 +1087,7 @@ Status StringBuilder::Finish(std::shared_ptr<Array>* out) {
   auto values = std::dynamic_pointer_cast<UInt8Array>(list->values());
 
   *out = std::make_shared<StringArray>(list->length(), list->value_offsets(),
-      values->data(), list->null_bitmap(), list->null_count());
+      values->values(), list->null_bitmap(), list->null_count());
   return Status::OK();
 }
 

http://git-wip-us.apache.org/repos/asf/arrow/blob/84520711/cpp/src/arrow/compare.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/compare.cc b/cpp/src/arrow/compare.cc
index 390a406..23f5a19 100644
--- a/cpp/src/arrow/compare.cc
+++ b/cpp/src/arrow/compare.cc
@@ -83,8 +83,8 @@ class RangeEqualsVisitor {
       }
 
       if (end_offset - begin_offset > 0 &&
-          std::memcmp(left.data()->data() + begin_offset,
-              right.data()->data() + right_begin_offset,
+          std::memcmp(left.value_data()->data() + begin_offset,
+              right.value_data()->data() + right_begin_offset,
               static_cast<size_t>(end_offset - begin_offset))) {
         return false;
       }
@@ -126,7 +126,7 @@ class RangeEqualsVisitor {
          ++i, ++o_i) {
       if (left.IsNull(i) != right.IsNull(o_i)) { return false; }
       if (left.IsNull(i)) continue;
-      for (int j = 0; j < static_cast<int>(left.fields().size()); ++j) {
+      for (int j = 0; j < left.num_fields(); ++j) {
         // TODO: really we should be comparing stretches of non-null data rather
         // than looking at one value at a time.
         const int64_t left_abs_index = i + left.offset();
@@ -188,7 +188,7 @@ class RangeEqualsVisitor {
         }
       } else {
         const int32_t offset = left.raw_value_offsets()[i];
-        const int32_t o_offset = right.raw_value_offsets()[i];
+        const int32_t o_offset = right.raw_value_offsets()[o_i];
         if (!left.child(child_num)->RangeEquals(
                 offset, offset + 1, o_offset, right.child(child_num))) {
           return false;
@@ -211,9 +211,9 @@ class RangeEqualsVisitor {
     const uint8_t* left_data = nullptr;
     const uint8_t* right_data = nullptr;
 
-    if (left.data()) { left_data = left.raw_data() + left.offset() * width; }
+    if (left.values()) { left_data = left.raw_values() + left.offset() * width; }
 
-    if (right.data()) { right_data = right.raw_data() + right.offset() * width; }
+    if (right.values()) { right_data = right.raw_values() + right.offset() * width; }
 
     for (int64_t i = left_start_idx_, o_i = right_start_idx_; i < left_end_idx_;
          ++i, ++o_i) {
@@ -241,9 +241,9 @@ class RangeEqualsVisitor {
     const uint8_t* left_data = nullptr;
     const uint8_t* right_data = nullptr;
 
-    if (left.data()) { left_data = left.raw_data() + left.offset() * width; }
+    if (left.values()) { left_data = left.raw_values() + left.offset() * width; }
 
-    if (right.data()) { right_data = right.raw_data() + right.offset() * width; }
+    if (right.values()) { right_data = right.raw_values() + right.offset() * width; }
 
     for (int64_t i = left_start_idx_, o_i = right_start_idx_; i < left_end_idx_;
          ++i, ++o_i) {
@@ -317,6 +317,95 @@ class RangeEqualsVisitor {
   bool result_;
 };
 
+static bool IsEqualPrimitive(const PrimitiveArray& left, const PrimitiveArray& right) {
+  const auto& size_meta = dynamic_cast<const FixedWidthType&>(*left.type());
+  const int byte_width = size_meta.bit_width() / 8;
+
+  const uint8_t* left_data = nullptr;
+  const uint8_t* right_data = nullptr;
+
+  if (left.values()) { left_data = left.values()->data() + left.offset() * byte_width; }
+  if (right.values()) {
+    right_data = right.values()->data() + right.offset() * byte_width;
+  }
+
+  if (left.null_count() > 0) {
+    for (int64_t i = 0; i < left.length(); ++i) {
+      bool left_null = left.IsNull(i);
+      if (!left_null && (memcmp(left_data, right_data, byte_width) || right.IsNull(i))) {
+        return false;
+      }
+      left_data += byte_width;
+      right_data += byte_width;
+    }
+    return true;
+  } else {
+    return memcmp(left_data, right_data,
+               static_cast<size_t>(byte_width * left.length())) == 0;
+  }
+}
+
+template <typename T>
+static inline bool CompareBuiltIn(
+    const Array& left, const Array& right, const T* ldata, const T* rdata) {
+  if (left.null_count() > 0) {
+    for (int64_t i = 0; i < left.length(); ++i) {
+      if (left.IsNull(i) != right.IsNull(i)) {
+        return false;
+      } else if (!left.IsNull(i) && (ldata[i] != rdata[i])) {
+        return false;
+      }
+    }
+    return true;
+  } else {
+    return memcmp(ldata, rdata, sizeof(T) * left.length()) == 0;
+  }
+}
+
+static bool IsEqualDecimal(const DecimalArray& left, const DecimalArray& right) {
+  const int64_t loffset = left.offset();
+  const int64_t roffset = right.offset();
+
+  const uint8_t* left_data = nullptr;
+  const uint8_t* right_data = nullptr;
+
+  if (left.values()) { left_data = left.values()->data(); }
+  if (right.values()) { right_data = right.values()->data(); }
+
+  const int32_t byte_width = left.byte_width();
+  if (byte_width == 4) {
+    return CompareBuiltIn<int32_t>(left, right,
+        reinterpret_cast<const int32_t*>(left_data) + loffset,
+        reinterpret_cast<const int32_t*>(right_data) + roffset);
+  } else if (byte_width == 8) {
+    return CompareBuiltIn<int64_t>(left, right,
+        reinterpret_cast<const int64_t*>(left_data) + loffset,
+        reinterpret_cast<const int64_t*>(right_data) + roffset);
+  } else {
+    // 128-bit
+
+    // Must also compare sign bitmap
+    const uint8_t* left_sign = nullptr;
+    const uint8_t* right_sign = nullptr;
+    if (left.sign_bitmap()) { left_sign = left.sign_bitmap()->data(); }
+    if (right.sign_bitmap()) { right_sign = right.sign_bitmap()->data(); }
+
+    for (int64_t i = 0; i < left.length(); ++i) {
+      bool left_null = left.IsNull(i);
+      if (!left_null && (memcmp(left_data, right_data, byte_width) || right.IsNull(i))) {
+        return false;
+      }
+      if (BitUtil::GetBit(left_sign, i + loffset) !=
+          BitUtil::GetBit(right_sign, i + roffset)) {
+        return false;
+      }
+      left_data += byte_width;
+      right_data += byte_width;
+    }
+    return true;
+  }
+}
+
 class ArrayEqualsVisitor : public RangeEqualsVisitor {
  public:
   explicit ArrayEqualsVisitor(const Array& right)
@@ -331,8 +420,8 @@ class ArrayEqualsVisitor : public RangeEqualsVisitor {
     const auto& right = static_cast<const BooleanArray&>(right_);
 
     if (left.null_count() > 0) {
-      const uint8_t* left_data = left.data()->data();
-      const uint8_t* right_data = right.data()->data();
+      const uint8_t* left_data = left.values()->data();
+      const uint8_t* right_data = right.values()->data();
 
       for (int64_t i = 0; i < left.length(); ++i) {
         if (!left.IsNull(i) &&
@@ -344,45 +433,23 @@ class ArrayEqualsVisitor : public RangeEqualsVisitor {
       }
       result_ = true;
     } else {
-      result_ = BitmapEquals(left.data()->data(), left.offset(), right.data()->data(),
+      result_ = BitmapEquals(left.values()->data(), left.offset(), right.values()->data(),
           right.offset(), left.length());
     }
     return Status::OK();
   }
 
-  bool IsEqualPrimitive(const PrimitiveArray& left) {
-    const auto& right = static_cast<const PrimitiveArray&>(right_);
-    const auto& size_meta = dynamic_cast<const FixedWidthType&>(*left.type());
-    const int byte_width = size_meta.bit_width() / 8;
-
-    const uint8_t* left_data = nullptr;
-    const uint8_t* right_data = nullptr;
-
-    if (left.data()) { left_data = left.data()->data() + left.offset() * byte_width; }
-
-    if (right.data()) { right_data = right.data()->data() + right.offset() * byte_width; }
-
-    if (left.null_count() > 0) {
-      for (int64_t i = 0; i < left.length(); ++i) {
-        if (!left.IsNull(i) && memcmp(left_data, right_data, byte_width)) {
-          return false;
-        }
-        left_data += byte_width;
-        right_data += byte_width;
-      }
-      return true;
-    } else {
-      return memcmp(left_data, right_data,
-                 static_cast<size_t>(byte_width * left.length())) == 0;
-    }
-  }
-
   template <typename T>
   typename std::enable_if<std::is_base_of<PrimitiveArray, T>::value &&
                               !std::is_base_of<BooleanArray, T>::value,
       Status>::type
   Visit(const T& left) {
-    result_ = IsEqualPrimitive(left);
+    result_ = IsEqualPrimitive(left, static_cast<const PrimitiveArray&>(right_));
+    return Status::OK();
+  }
+
+  Status Visit(const DecimalArray& left) {
+    result_ = IsEqualDecimal(left, static_cast<const DecimalArray&>(right_));
     return Status::OK();
   }
 
@@ -417,11 +484,11 @@ class ArrayEqualsVisitor : public RangeEqualsVisitor {
     bool equal_offsets = ValueOffsetsEqual<BinaryArray>(left);
     if (!equal_offsets) { return false; }
 
-    if (!left.data() && !(right.data())) { return true; }
+    if (!left.value_data() && !(right.value_data())) { return true; }
     if (left.value_offset(left.length()) == 0) { return true; }
 
-    const uint8_t* left_data = left.data()->data();
-    const uint8_t* right_data = right.data()->data();
+    const uint8_t* left_data = left.value_data()->data();
+    const uint8_t* right_data = right.value_data()->data();
 
     if (left.null_count() == 0) {
       // Fast path for null count 0, single memcmp
@@ -491,8 +558,8 @@ inline bool FloatingApproxEquals(
     const NumericArray<TYPE>& left, const NumericArray<TYPE>& right) {
   using T = typename TYPE::c_type;
 
-  const T* left_data = left.raw_data();
-  const T* right_data = right.raw_data();
+  const T* left_data = left.raw_values();
+  const T* right_data = right.raw_values();
 
   static constexpr T EPSILON = static_cast<T>(1E-5);
 

http://git-wip-us.apache.org/repos/asf/arrow/blob/84520711/cpp/src/arrow/ipc/feather-test.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/feather-test.cc b/cpp/src/arrow/ipc/feather-test.cc
index 807ea4e..a7793f2 100644
--- a/cpp/src/arrow/ipc/feather-test.cc
+++ b/cpp/src/arrow/ipc/feather-test.cc
@@ -28,7 +28,6 @@
 #include "arrow/ipc/feather-internal.h"
 #include "arrow/ipc/feather.h"
 #include "arrow/ipc/test-common.h"
-#include "arrow/loader.h"
 #include "arrow/pretty_print.h"
 #include "arrow/test-util.h"
 
@@ -365,25 +364,19 @@ TEST_F(TestTableWriter, TimeTypes) {
   std::shared_ptr<Array> date_array;
   ArrayFromVector<Date32Type, int32_t>(is_valid, date_values_vec, &date_array);
 
-  std::vector<FieldMetadata> fields(1);
-  fields[0].length = values->length();
-  fields[0].null_count = values->null_count();
-  fields[0].offset = 0;
-
   const auto& prim_values = static_cast<const PrimitiveArray&>(*values);
   std::vector<std::shared_ptr<Buffer>> buffers = {
-      prim_values.null_bitmap(), prim_values.data()};
+      prim_values.null_bitmap(), prim_values.values()};
 
-  std::vector<std::shared_ptr<Array>> arrays;
-  arrays.push_back(date_array);
+  std::vector<std::shared_ptr<internal::ArrayData>> arrays;
+  arrays.push_back(date_array->data());
 
   for (int i = 1; i < schema->num_fields(); ++i) {
-    std::shared_ptr<Array> arr;
-    ASSERT_OK(LoadArray(schema->field(i)->type(), fields, buffers, &arr));
-    arrays.push_back(arr);
+    arrays.emplace_back(std::make_shared<internal::ArrayData>(
+        schema->field(i)->type(), values->length(), buffers, values->null_count(), 0));
   }
 
-  RecordBatch batch(schema, values->length(), arrays);
+  RecordBatch batch(schema, values->length(), std::move(arrays));
   CheckBatch(batch);
 }
 

http://git-wip-us.apache.org/repos/asf/arrow/blob/84520711/cpp/src/arrow/ipc/feather.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/feather.cc b/cpp/src/arrow/ipc/feather.cc
index bc7c431..37b01c5 100644
--- a/cpp/src/arrow/ipc/feather.cc
+++ b/cpp/src/arrow/ipc/feather.cc
@@ -33,7 +33,6 @@
 #include "arrow/io/file.h"
 #include "arrow/ipc/feather-internal.h"
 #include "arrow/ipc/feather_generated.h"
-#include "arrow/loader.h"
 #include "arrow/status.h"
 #include "arrow/table.h"
 #include "arrow/util/bit-util.h"
@@ -565,7 +564,7 @@ class TableWriter::TableWriterImpl : public ArrayVisitor {
           &bytes_written));
       meta->total_bytes += bytes_written;
 
-      if (bin_values.data()) { values_buffer = bin_values.data()->data(); }
+      if (bin_values.value_data()) { values_buffer = bin_values.value_data()->data(); }
     } else {
       const auto& prim_values = static_cast<const PrimitiveArray&>(values);
       const auto& fw_type = static_cast<const FixedWidthType&>(*values.type());
@@ -577,7 +576,7 @@ class TableWriter::TableWriterImpl : public ArrayVisitor {
         values_bytes = values.length() * fw_type.bit_width() / 8;
       }
 
-      if (prim_values.data()) { values_buffer = prim_values.data()->data(); }
+      if (prim_values.values()) { values_buffer = prim_values.values()->data(); }
     }
     RETURN_NOT_OK(
         WritePadded(stream_.get(), values_buffer, values_bytes, &bytes_written));

http://git-wip-us.apache.org/repos/asf/arrow/blob/84520711/cpp/src/arrow/ipc/json-internal.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/json-internal.cc b/cpp/src/arrow/ipc/json-internal.cc
index beebb4f..69e4ae8 100644
--- a/cpp/src/arrow/ipc/json-internal.cc
+++ b/cpp/src/arrow/ipc/json-internal.cc
@@ -414,7 +414,7 @@ class ArrayWriter {
   template <typename T>
   typename std::enable_if<IsSignedInt<T>::value, void>::type WriteDataValues(
       const T& arr) {
-    const auto data = arr.raw_data();
+    const auto data = arr.raw_values();
     for (int i = 0; i < arr.length(); ++i) {
       writer_->Int64(data[i]);
     }
@@ -423,7 +423,7 @@ class ArrayWriter {
   template <typename T>
   typename std::enable_if<IsUnsignedInt<T>::value, void>::type WriteDataValues(
       const T& arr) {
-    const auto data = arr.raw_data();
+    const auto data = arr.raw_values();
     for (int i = 0; i < arr.length(); ++i) {
       writer_->Uint64(data[i]);
     }
@@ -432,7 +432,7 @@ class ArrayWriter {
   template <typename T>
   typename std::enable_if<IsFloatingPoint<T>::value, void>::type WriteDataValues(
       const T& arr) {
-    const auto data = arr.raw_data();
+    const auto data = arr.raw_values();
     for (int i = 0; i < arr.length(); ++i) {
       writer_->Double(data[i]);
     }
@@ -558,7 +558,12 @@ class ArrayWriter {
   Status Visit(const StructArray& array) {
     WriteValidityField(array);
     const auto& type = static_cast<const StructType&>(*array.type());
-    return WriteChildren(type.children(), array.fields());
+    std::vector<std::shared_ptr<Array>> children;
+    children.reserve(array.num_fields());
+    for (int i = 0; i < array.num_fields(); ++i) {
+      children.emplace_back(array.field(i));
+    }
+    return WriteChildren(type.children(), children);
   }
 
   Status Visit(const UnionArray& array) {
@@ -569,7 +574,12 @@ class ArrayWriter {
     if (type.mode() == UnionMode::DENSE) {
       WriteIntegerField("OFFSET", array.raw_value_offsets(), array.length());
     }
-    return WriteChildren(type.children(), array.children());
+    std::vector<std::shared_ptr<Array>> children;
+    children.reserve(array.num_fields());
+    for (int i = 0; i < array.num_fields(); ++i) {
+      children.emplace_back(array.child(i));
+    }
+    return WriteChildren(type.children(), children);
   }
 
  private:

http://git-wip-us.apache.org/repos/asf/arrow/blob/84520711/cpp/src/arrow/ipc/metadata.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/metadata.h b/cpp/src/arrow/ipc/metadata.h
index ec7bc39..257bbd8 100644
--- a/cpp/src/arrow/ipc/metadata.h
+++ b/cpp/src/arrow/ipc/metadata.h
@@ -26,7 +26,6 @@
 #include <unordered_map>
 #include <vector>
 
-#include "arrow/loader.h"
 #include "arrow/util/macros.h"
 #include "arrow/util/visibility.h"
 
@@ -54,6 +53,42 @@ enum class MetadataVersion : char { V1, V2, V3 };
 
 static constexpr const char* kArrowMagicBytes = "ARROW1";
 
+// ARROW-109: We set this number arbitrarily to help catch user mistakes. For
+// deeply nested schemas, it is expected the user will indicate explicitly the
+// maximum allowed recursion depth
+constexpr int kMaxNestingDepth = 64;
+
+struct ARROW_EXPORT FieldMetadata {
+  FieldMetadata() {}
+  FieldMetadata(int64_t length, int64_t null_count, int64_t offset)
+      : length(length), null_count(null_count), offset(offset) {}
+
+  FieldMetadata(const FieldMetadata& other) {
+    this->length = other.length;
+    this->null_count = other.null_count;
+    this->offset = other.offset;
+  }
+
+  int64_t length;
+  int64_t null_count;
+  int64_t offset;
+};
+
+struct ARROW_EXPORT BufferMetadata {
+  BufferMetadata() {}
+  BufferMetadata(int32_t page, int64_t offset, int64_t length)
+      : page(page), offset(offset), length(length) {}
+
+  /// The shared memory page id where to find this. Set to -1 if unused
+  int32_t page;
+
+  /// The relative offset into the memory page to the starting byte of the buffer
+  int64_t offset;
+
+  /// Absolute length in bytes of the buffer
+  int64_t length;
+};
+
 struct FileBlock {
   FileBlock() {}
   FileBlock(int64_t offset, int32_t metadata_length, int64_t body_length)