You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2017/02/06 16:25:34 UTC

[1/3] arrow git commit: ARROW-33: [C++] Implement zero-copy array slicing, integrate with IPC code paths

Repository: arrow
Updated Branches:
  refs/heads/master 74bc4dd48 -> 5439b7158


http://git-wip-us.apache.org/repos/asf/arrow/blob/5439b715/python/src/pyarrow/adapters/pandas.cc
----------------------------------------------------------------------
diff --git a/python/src/pyarrow/adapters/pandas.cc b/python/src/pyarrow/adapters/pandas.cc
index 8d05821..345dc90 100644
--- a/python/src/pyarrow/adapters/pandas.cc
+++ b/python/src/pyarrow/adapters/pandas.cc
@@ -1338,8 +1338,7 @@ class ArrowSerializer {
     PyAcquireGIL lock;
 
     PyObject** objects = reinterpret_cast<PyObject**>(PyArray_DATA(arr_));
-    arrow::TypePtr string_type(new arrow::DateType());
-    arrow::DateBuilder date_builder(pool_, string_type);
+    arrow::DateBuilder date_builder(pool_);
     RETURN_NOT_OK(date_builder.Resize(length_));
 
     Status s;
@@ -1363,8 +1362,7 @@ class ArrowSerializer {
     // and unicode mixed in the object array
 
     PyObject** objects = reinterpret_cast<PyObject**>(PyArray_DATA(arr_));
-    arrow::TypePtr string_type(new arrow::StringType());
-    arrow::StringBuilder string_builder(pool_, string_type);
+    arrow::StringBuilder string_builder(pool_);
     RETURN_NOT_OK(string_builder.Resize(length_));
 
     Status s;
@@ -1374,8 +1372,8 @@ class ArrowSerializer {
 
     if (have_bytes) {
       const auto& arr = static_cast<const arrow::StringArray&>(*out->get());
-      *out = std::make_shared<arrow::BinaryArray>(
-          arr.length(), arr.offsets(), arr.data(), arr.null_count(), arr.null_bitmap());
+      *out = std::make_shared<arrow::BinaryArray>(arr.length(), arr.value_offsets(),
+          arr.data(), arr.null_bitmap(), arr.null_count());
     }
     return Status::OK();
   }
@@ -1403,7 +1401,7 @@ class ArrowSerializer {
       }
     }
 
-    *out = std::make_shared<arrow::BooleanArray>(length_, data, null_count, null_bitmap_);
+    *out = std::make_shared<arrow::BooleanArray>(length_, data, null_bitmap_, null_count);
 
     return Status::OK();
   }
@@ -1515,10 +1513,14 @@ inline Status ArrowSerializer<TYPE>::Convert(std::shared_ptr<Array>* out) {
     null_count = ValuesToBitmap<TYPE>(PyArray_DATA(arr_), length_, null_bitmap_data_);
   }
 
+  // For readability
+  constexpr int32_t kOffset = 0;
+
   RETURN_NOT_OK(ConvertData());
   std::shared_ptr<DataType> type;
   RETURN_NOT_OK(MakeDataType(&type));
-  RETURN_NOT_OK(MakePrimitiveArray(type, length_, data_, null_count, null_bitmap_, out));
+  RETURN_NOT_OK(
+      MakePrimitiveArray(type, length_, data_, null_bitmap_, null_count, kOffset, out));
   return Status::OK();
 }
 
@@ -1657,7 +1659,7 @@ ArrowSerializer<NPY_OBJECT>::ConvertTypedLists<NPY_OBJECT, ::arrow::StringType>(
   // TODO: If there are bytes involed, convert to Binary representation
   bool have_bytes = false;
 
-  auto value_builder = std::make_shared<arrow::StringBuilder>(pool_, field->type);
+  auto value_builder = std::make_shared<arrow::StringBuilder>(pool_);
   ListBuilder list_builder(pool_, value_builder);
   PyObject** objects = reinterpret_cast<PyObject**>(PyArray_DATA(arr_));
   for (int64_t i = 0; i < length_; ++i) {

http://git-wip-us.apache.org/repos/asf/arrow/blob/5439b715/python/src/pyarrow/io.cc
----------------------------------------------------------------------
diff --git a/python/src/pyarrow/io.cc b/python/src/pyarrow/io.cc
index 9235260..aa4cb7b 100644
--- a/python/src/pyarrow/io.cc
+++ b/python/src/pyarrow/io.cc
@@ -56,9 +56,20 @@ static Status CheckPyError() {
   return Status::OK();
 }
 
+// This is annoying: because C++11 does not allow implicit conversion of string
+// literals to non-const char*, we need to go through some gymnastics to use
+// PyObject_CallMethod without a lot of pain (its arguments are non-const
+// char*)
+template <typename... ArgTypes>
+static inline PyObject* cpp_PyObject_CallMethod(
+    PyObject* obj, const char* method_name, const char* argspec, ArgTypes... args) {
+  return PyObject_CallMethod(
+      obj, const_cast<char*>(method_name), const_cast<char*>(argspec), args...);
+}
+
 Status PythonFile::Close() {
   // whence: 0 for relative to start of file, 2 for end of file
-  PyObject* result = PyObject_CallMethod(file_, "close", "()");
+  PyObject* result = cpp_PyObject_CallMethod(file_, "close", "()");
   Py_XDECREF(result);
   ARROW_RETURN_NOT_OK(CheckPyError());
   return Status::OK();
@@ -66,14 +77,14 @@ Status PythonFile::Close() {
 
 Status PythonFile::Seek(int64_t position, int whence) {
   // whence: 0 for relative to start of file, 2 for end of file
-  PyObject* result = PyObject_CallMethod(file_, "seek", "(ii)", position, whence);
+  PyObject* result = cpp_PyObject_CallMethod(file_, "seek", "(ii)", position, whence);
   Py_XDECREF(result);
   ARROW_RETURN_NOT_OK(CheckPyError());
   return Status::OK();
 }
 
 Status PythonFile::Read(int64_t nbytes, PyObject** out) {
-  PyObject* result = PyObject_CallMethod(file_, "read", "(i)", nbytes);
+  PyObject* result = cpp_PyObject_CallMethod(file_, "read", "(i)", nbytes);
   ARROW_RETURN_NOT_OK(CheckPyError());
   *out = result;
   return Status::OK();
@@ -84,7 +95,7 @@ Status PythonFile::Write(const uint8_t* data, int64_t nbytes) {
       PyBytes_FromStringAndSize(reinterpret_cast<const char*>(data), nbytes);
   ARROW_RETURN_NOT_OK(CheckPyError());
 
-  PyObject* result = PyObject_CallMethod(file_, "write", "(O)", py_data);
+  PyObject* result = cpp_PyObject_CallMethod(file_, "write", "(O)", py_data);
   Py_XDECREF(py_data);
   Py_XDECREF(result);
   ARROW_RETURN_NOT_OK(CheckPyError());
@@ -92,7 +103,7 @@ Status PythonFile::Write(const uint8_t* data, int64_t nbytes) {
 }
 
 Status PythonFile::Tell(int64_t* position) {
-  PyObject* result = PyObject_CallMethod(file_, "tell", "()");
+  PyObject* result = cpp_PyObject_CallMethod(file_, "tell", "()");
   ARROW_RETURN_NOT_OK(CheckPyError());
 
   *position = PyLong_AsLongLong(result);


[3/3] arrow git commit: ARROW-33: [C++] Implement zero-copy array slicing, integrate with IPC code paths

Posted by we...@apache.org.
ARROW-33: [C++] Implement zero-copy array slicing, integrate with IPC code paths

This turned into a bit of a refactoring bloodbath. I have sorted through most of the issues that this turned up, so I should have this all completely working within a day or so. There will be some follow up work to do to polish things up

Closes #56.

Author: Wes McKinney <we...@twosigma.com>

Closes #322 from wesm/ARROW-33 and squashes the following commits:

61afe42 [Wes McKinney] Some API cleaning in builder.h
86511a3 [Wes McKinney] Python fixes, clang warning fixes
9a00870 [Wes McKinney] Make ApproxEquals for floating point arrays work on slices
2a13929 [Wes McKinney] Implement slicing IPC logic for dense array
4f08628 [Wes McKinney] Add missing include
1a6fcb4 [Wes McKinney] Make some more progress. dense union needs more work
c6d814d [Wes McKinney] Work on adding sliced array support to IPC code path, with pretty printer and comparison fixed for sliced bitmaps, etc. Not all working yet
b6c511e [Wes McKinney] Add RecordBatch::Slice convenience method
8900d58 [Wes McKinney] Add Slice tests for DictionaryArray. Test recomputing the null count
55454d7 [Wes McKinney] Add slice tests for struct, union, string, list
a72653d [Wes McKinney] Rename offsets to value_offsets in list/binary/string/union for better clarity. Test Slice for primitive arrays
0355f71 [Wes McKinney] Implement CopyBitmap function
a228b50 [Wes McKinney] Implement Slice methods on Array classes
e502901 [Wes McKinney] Move null_count and offset as last two parameters of all array ctors. Implement/test bitmap set bit count with offset
bae6922 [Wes McKinney] Temporary work on adding offset parameter to Array classes for slicing


Project: http://git-wip-us.apache.org/repos/asf/arrow/repo
Commit: http://git-wip-us.apache.org/repos/asf/arrow/commit/5439b715
Tree: http://git-wip-us.apache.org/repos/asf/arrow/tree/5439b715
Diff: http://git-wip-us.apache.org/repos/asf/arrow/diff/5439b715

Branch: refs/heads/master
Commit: 5439b71586f4b0f9a36544b9e2417ee6ad7b48e8
Parents: 74bc4dd
Author: Wes McKinney <we...@twosigma.com>
Authored: Mon Feb 6 11:25:18 2017 -0500
Committer: Wes McKinney <we...@twosigma.com>
Committed: Mon Feb 6 11:25:18 2017 -0500

----------------------------------------------------------------------
 cpp/src/arrow/CMakeLists.txt               |   1 +
 cpp/src/arrow/array-dictionary-test.cc     |  62 ++++--
 cpp/src/arrow/array-list-test.cc           |  36 +++-
 cpp/src/arrow/array-primitive-test.cc      |  78 ++++++-
 cpp/src/arrow/array-string-test.cc         |  90 ++++++--
 cpp/src/arrow/array-struct-test.cc         |  19 +-
 cpp/src/arrow/array-test.cc                |  32 ++-
 cpp/src/arrow/array-union-test.cc          |  67 ++++++
 cpp/src/arrow/array.cc                     | 233 ++++++++++++++-------
 cpp/src/arrow/array.h                      | 265 +++++++++++++++---------
 cpp/src/arrow/buffer.cc                    |  16 ++
 cpp/src/arrow/buffer.h                     |  21 +-
 cpp/src/arrow/builder.cc                   |  64 +++---
 cpp/src/arrow/builder.h                    |  21 +-
 cpp/src/arrow/column-test.cc               |  14 +-
 cpp/src/arrow/compare.cc                   | 122 ++++++++---
 cpp/src/arrow/io/file.cc                   |   4 +-
 cpp/src/arrow/io/hdfs.cc                   |   8 +-
 cpp/src/arrow/io/io-hdfs-test.cc           |  10 +-
 cpp/src/arrow/io/io-memory-test.cc         |   4 +-
 cpp/src/arrow/ipc/adapter.cc               | 260 +++++++++++++++++++----
 cpp/src/arrow/ipc/adapter.h                |   8 +-
 cpp/src/arrow/ipc/ipc-adapter-test.cc      |  52 ++++-
 cpp/src/arrow/ipc/ipc-json-test.cc         |  21 +-
 cpp/src/arrow/ipc/json-integration-test.cc |   6 +-
 cpp/src/arrow/ipc/json-internal.cc         |  37 ++--
 cpp/src/arrow/ipc/stream.cc                |  15 +-
 cpp/src/arrow/ipc/stream.h                 |   8 +
 cpp/src/arrow/ipc/test-common.h            |  79 ++++---
 cpp/src/arrow/pretty_print-test.cc         |   6 +-
 cpp/src/arrow/pretty_print.cc              |  53 +++--
 cpp/src/arrow/table-test.cc                |  26 +++
 cpp/src/arrow/table.cc                     |  19 +-
 cpp/src/arrow/table.h                      |   4 +
 cpp/src/arrow/test-util.h                  |  43 +---
 cpp/src/arrow/type.cc                      |   6 +-
 cpp/src/arrow/type.h                       |   8 +-
 cpp/src/arrow/type_traits.h                |   9 +
 cpp/src/arrow/util/bit-util-test.cc        |  62 +++++-
 cpp/src/arrow/util/bit-util.cc             |  83 +++++++-
 cpp/src/arrow/util/bit-util.h              |  45 ++++
 cpp/src/arrow/util/logging.h               |   4 +-
 cpp/src/arrow/util/macros.h                |   2 +-
 python/CMakeLists.txt                      |   2 +-
 python/pyarrow/includes/libarrow.pxd       |   4 +-
 python/pyarrow/scalar.pyx                  |   2 +-
 python/src/pyarrow/adapters/builtin.cc     |   2 +-
 python/src/pyarrow/adapters/pandas.cc      |  20 +-
 python/src/pyarrow/io.cc                   |  21 +-
 49 files changed, 1524 insertions(+), 550 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/arrow/blob/5439b715/cpp/src/arrow/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/CMakeLists.txt b/cpp/src/arrow/CMakeLists.txt
index b002bb7..824ced1 100644
--- a/cpp/src/arrow/CMakeLists.txt
+++ b/cpp/src/arrow/CMakeLists.txt
@@ -53,6 +53,7 @@ ADD_ARROW_TEST(array-list-test)
 ADD_ARROW_TEST(array-primitive-test)
 ADD_ARROW_TEST(array-string-test)
 ADD_ARROW_TEST(array-struct-test)
+ADD_ARROW_TEST(array-union-test)
 ADD_ARROW_TEST(buffer-test)
 ADD_ARROW_TEST(column-test)
 ADD_ARROW_TEST(memory_pool-test)

http://git-wip-us.apache.org/repos/asf/arrow/blob/5439b715/cpp/src/arrow/array-dictionary-test.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/array-dictionary-test.cc b/cpp/src/arrow/array-dictionary-test.cc
index 1a0d49a..61381b7 100644
--- a/cpp/src/arrow/array-dictionary-test.cc
+++ b/cpp/src/arrow/array-dictionary-test.cc
@@ -34,7 +34,7 @@ namespace arrow {
 TEST(TestDictionary, Basics) {
   std::vector<int32_t> values = {100, 1000, 10000, 100000};
   std::shared_ptr<Array> dict;
-  ArrayFromVector<Int32Type, int32_t>(int32(), values, &dict);
+  ArrayFromVector<Int32Type, int32_t>(values, &dict);
 
   std::shared_ptr<DictionaryType> type1 =
       std::dynamic_pointer_cast<DictionaryType>(dictionary(int16(), dict));
@@ -54,45 +54,67 @@ TEST(TestDictionary, Equals) {
 
   std::shared_ptr<Array> dict;
   std::vector<std::string> dict_values = {"foo", "bar", "baz"};
-  ArrayFromVector<StringType, std::string>(utf8(), dict_values, &dict);
+  ArrayFromVector<StringType, std::string>(dict_values, &dict);
   std::shared_ptr<DataType> dict_type = dictionary(int16(), dict);
 
   std::shared_ptr<Array> dict2;
   std::vector<std::string> dict2_values = {"foo", "bar", "baz", "qux"};
-  ArrayFromVector<StringType, std::string>(utf8(), dict2_values, &dict2);
+  ArrayFromVector<StringType, std::string>(dict2_values, &dict2);
   std::shared_ptr<DataType> dict2_type = dictionary(int16(), dict2);
 
   std::shared_ptr<Array> indices;
   std::vector<int16_t> indices_values = {1, 2, -1, 0, 2, 0};
-  ArrayFromVector<Int16Type, int16_t>(int16(), is_valid, indices_values, &indices);
+  ArrayFromVector<Int16Type, int16_t>(is_valid, indices_values, &indices);
 
   std::shared_ptr<Array> indices2;
   std::vector<int16_t> indices2_values = {1, 2, 0, 0, 2, 0};
-  ArrayFromVector<Int16Type, int16_t>(int16(), is_valid, indices2_values, &indices2);
+  ArrayFromVector<Int16Type, int16_t>(is_valid, indices2_values, &indices2);
 
   std::shared_ptr<Array> indices3;
   std::vector<int16_t> indices3_values = {1, 1, 0, 0, 2, 0};
-  ArrayFromVector<Int16Type, int16_t>(int16(), is_valid, indices3_values, &indices3);
+  ArrayFromVector<Int16Type, int16_t>(is_valid, indices3_values, &indices3);
 
-  auto arr = std::make_shared<DictionaryArray>(dict_type, indices);
-  auto arr2 = std::make_shared<DictionaryArray>(dict_type, indices2);
-  auto arr3 = std::make_shared<DictionaryArray>(dict2_type, indices);
-  auto arr4 = std::make_shared<DictionaryArray>(dict_type, indices3);
+  auto array = std::make_shared<DictionaryArray>(dict_type, indices);
+  auto array2 = std::make_shared<DictionaryArray>(dict_type, indices2);
+  auto array3 = std::make_shared<DictionaryArray>(dict2_type, indices);
+  auto array4 = std::make_shared<DictionaryArray>(dict_type, indices3);
 
-  ASSERT_TRUE(arr->Equals(arr));
+  ASSERT_TRUE(array->Equals(array));
 
   // Equal, because the unequal index is masked by null
-  ASSERT_TRUE(arr->Equals(arr2));
+  ASSERT_TRUE(array->Equals(array2));
 
   // Unequal dictionaries
-  ASSERT_FALSE(arr->Equals(arr3));
+  ASSERT_FALSE(array->Equals(array3));
 
   // Unequal indices
-  ASSERT_FALSE(arr->Equals(arr4));
+  ASSERT_FALSE(array->Equals(array4));
 
   // RangeEquals
-  ASSERT_TRUE(arr->RangeEquals(3, 6, 3, arr4));
-  ASSERT_FALSE(arr->RangeEquals(1, 3, 1, arr4));
+  ASSERT_TRUE(array->RangeEquals(3, 6, 3, array4));
+  ASSERT_FALSE(array->RangeEquals(1, 3, 1, array4));
+
+  // ARROW-33 Test slices
+  const int size = array->length();
+
+  std::shared_ptr<Array> slice, slice2;
+  slice = array->Array::Slice(2);
+  slice2 = array->Array::Slice(2);
+  ASSERT_EQ(size - 2, slice->length());
+
+  ASSERT_TRUE(slice->Equals(slice2));
+  ASSERT_TRUE(array->RangeEquals(2, array->length(), 0, slice));
+
+  // Chained slices
+  slice2 = array->Array::Slice(1)->Array::Slice(1);
+  ASSERT_TRUE(slice->Equals(slice2));
+
+  slice = array->Slice(1, 3);
+  slice2 = array->Slice(1, 3);
+  ASSERT_EQ(3, slice->length());
+
+  ASSERT_TRUE(slice->Equals(slice2));
+  ASSERT_TRUE(array->RangeEquals(1, 4, 0, slice));
 }
 
 TEST(TestDictionary, Validate) {
@@ -100,20 +122,20 @@ TEST(TestDictionary, Validate) {
 
   std::shared_ptr<Array> dict;
   std::vector<std::string> dict_values = {"foo", "bar", "baz"};
-  ArrayFromVector<StringType, std::string>(utf8(), dict_values, &dict);
+  ArrayFromVector<StringType, std::string>(dict_values, &dict);
   std::shared_ptr<DataType> dict_type = dictionary(int16(), dict);
 
   std::shared_ptr<Array> indices;
   std::vector<uint8_t> indices_values = {1, 2, 0, 0, 2, 0};
-  ArrayFromVector<UInt8Type, uint8_t>(uint8(), is_valid, indices_values, &indices);
+  ArrayFromVector<UInt8Type, uint8_t>(is_valid, indices_values, &indices);
 
   std::shared_ptr<Array> indices2;
   std::vector<float> indices2_values = {1., 2., 0., 0., 2., 0.};
-  ArrayFromVector<FloatType, float>(float32(), is_valid, indices2_values, &indices2);
+  ArrayFromVector<FloatType, float>(is_valid, indices2_values, &indices2);
 
   std::shared_ptr<Array> indices3;
   std::vector<int64_t> indices3_values = {1, 2, 0, 0, 2, 0};
-  ArrayFromVector<Int64Type, int64_t>(int64(), is_valid, indices3_values, &indices3);
+  ArrayFromVector<Int64Type, int64_t>(is_valid, indices3_values, &indices3);
 
   std::shared_ptr<Array> arr = std::make_shared<DictionaryArray>(dict_type, indices);
   std::shared_ptr<Array> arr2 = std::make_shared<DictionaryArray>(dict_type, indices2);

http://git-wip-us.apache.org/repos/asf/arrow/blob/5439b715/cpp/src/arrow/array-list-test.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/array-list-test.cc b/cpp/src/arrow/array-list-test.cc
index 8e4d319..a144fd9 100644
--- a/cpp/src/arrow/array-list-test.cc
+++ b/cpp/src/arrow/array-list-test.cc
@@ -90,9 +90,9 @@ TEST_F(TestListBuilder, Equality) {
   Int32Builder* vb = static_cast<Int32Builder*>(builder_->value_builder().get());
 
   std::shared_ptr<Array> array, equal_array, unequal_array;
-  vector<int32_t> equal_offsets = {0, 1, 2, 5};
-  vector<int32_t> equal_values = {1, 2, 3, 4, 5, 2, 2, 2};
-  vector<int32_t> unequal_offsets = {0, 1, 4};
+  vector<int32_t> equal_offsets = {0, 1, 2, 5, 6, 7, 8, 10};
+  vector<int32_t> equal_values = {1, 2, 3, 4, 5, 2, 2, 2, 5, 6};
+  vector<int32_t> unequal_offsets = {0, 1, 4, 7};
   vector<int32_t> unequal_values = {1, 2, 2, 2, 3, 4, 5};
 
   // setup two equal arrays
@@ -122,7 +122,27 @@ TEST_F(TestListBuilder, Equality) {
   EXPECT_FALSE(array->RangeEquals(0, 2, 0, unequal_array));
   EXPECT_FALSE(array->RangeEquals(1, 2, 1, unequal_array));
   EXPECT_TRUE(array->RangeEquals(2, 3, 2, unequal_array));
-  EXPECT_TRUE(array->RangeEquals(3, 4, 1, unequal_array));
+
+  // Check with slices, ARROW-33
+  std::shared_ptr<Array> slice, slice2;
+
+  slice = array->Slice(2);
+  slice2 = array->Slice(2);
+  ASSERT_EQ(array->length() - 2, slice->length());
+
+  ASSERT_TRUE(slice->Equals(slice2));
+  ASSERT_TRUE(array->RangeEquals(2, slice->length(), 0, slice));
+
+  // Chained slices
+  slice2 = array->Slice(1)->Slice(1);
+  ASSERT_TRUE(slice->Equals(slice2));
+
+  slice = array->Slice(1, 4);
+  slice2 = array->Slice(1, 4);
+  ASSERT_EQ(4, slice->length());
+
+  ASSERT_TRUE(slice->Equals(slice2));
+  ASSERT_TRUE(array->RangeEquals(1, 5, 0, slice));
 }
 
 TEST_F(TestListBuilder, TestResize) {}
@@ -137,9 +157,9 @@ TEST_F(TestListBuilder, TestAppendNull) {
   ASSERT_TRUE(result_->IsNull(0));
   ASSERT_TRUE(result_->IsNull(1));
 
-  ASSERT_EQ(0, result_->raw_offsets()[0]);
-  ASSERT_EQ(0, result_->offset(1));
-  ASSERT_EQ(0, result_->offset(2));
+  ASSERT_EQ(0, result_->raw_value_offsets()[0]);
+  ASSERT_EQ(0, result_->value_offset(1));
+  ASSERT_EQ(0, result_->value_offset(2));
 
   Int32Array* values = static_cast<Int32Array*>(result_->values().get());
   ASSERT_EQ(0, values->length());
@@ -154,7 +174,7 @@ void ValidateBasicListArray(const ListArray* result, const vector<int32_t>& valu
   ASSERT_EQ(3, result->length());
   vector<int32_t> ex_offsets = {0, 3, 3, 7};
   for (size_t i = 0; i < ex_offsets.size(); ++i) {
-    ASSERT_EQ(ex_offsets[i], result->offset(i));
+    ASSERT_EQ(ex_offsets[i], result->value_offset(i));
   }
 
   for (int i = 0; i < result->length(); ++i) {

http://git-wip-us.apache.org/repos/asf/arrow/blob/5439b715/cpp/src/arrow/array-primitive-test.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/array-primitive-test.cc b/cpp/src/arrow/array-primitive-test.cc
index c839fb9..a20fdbf 100644
--- a/cpp/src/arrow/array-primitive-test.cc
+++ b/cpp/src/arrow/array-primitive-test.cc
@@ -121,7 +121,7 @@ class TestPrimitiveBuilder : public TestBuilder {
     }
 
     auto expected =
-        std::make_shared<ArrayType>(size, ex_data, ex_null_count, ex_null_bitmap);
+        std::make_shared<ArrayType>(size, ex_data, ex_null_bitmap, ex_null_count);
 
     std::shared_ptr<Array> out;
     ASSERT_OK(builder->Finish(&out));
@@ -217,7 +217,7 @@ void TestPrimitiveBuilder<PBoolean>::Check(
   }
 
   auto expected =
-      std::make_shared<BooleanArray>(size, ex_data, ex_null_count, ex_null_bitmap);
+      std::make_shared<BooleanArray>(size, ex_data, ex_null_bitmap, ex_null_count);
 
   std::shared_ptr<Array> out;
   ASSERT_OK(builder->Finish(&out));
@@ -235,15 +235,14 @@ void TestPrimitiveBuilder<PBoolean>::Check(
 
   for (int i = 0; i < result->length(); ++i) {
     if (nullable) { ASSERT_EQ(valid_bytes_[i] == 0, result->IsNull(i)) << i; }
-    bool actual = BitUtil::GetBit(result->raw_data(), i);
+    bool actual = BitUtil::GetBit(result->data()->data(), i);
     ASSERT_EQ(static_cast<bool>(draws_[i]), actual) << i;
   }
   ASSERT_TRUE(result->Equals(*expected));
 }
 
 typedef ::testing::Types<PBoolean, PUInt8, PUInt16, PUInt32, PUInt64, PInt8, PInt16,
-    PInt32, PInt64, PFloat, PDouble>
-    Primitives;
+    PInt32, PInt64, PFloat, PDouble> Primitives;
 
 TYPED_TEST_CASE(TestPrimitiveBuilder, Primitives);
 
@@ -347,6 +346,39 @@ TYPED_TEST(TestPrimitiveBuilder, Equality) {
       array->RangeEquals(first_valid_idx + 1, size, first_valid_idx + 1, unequal_array));
 }
 
+TYPED_TEST(TestPrimitiveBuilder, SliceEquality) {
+  DECL_T();
+
+  const int size = 1000;
+  this->RandomData(size);
+  vector<T>& draws = this->draws_;
+  vector<uint8_t>& valid_bytes = this->valid_bytes_;
+  auto builder = this->builder_.get();
+
+  std::shared_ptr<Array> array;
+  ASSERT_OK(MakeArray(valid_bytes, draws, size, builder, &array));
+
+  std::shared_ptr<Array> slice, slice2;
+
+  slice = array->Slice(5);
+  slice2 = array->Slice(5);
+  ASSERT_EQ(size - 5, slice->length());
+
+  ASSERT_TRUE(slice->Equals(slice2));
+  ASSERT_TRUE(array->RangeEquals(5, array->length(), 0, slice));
+
+  // Chained slices
+  slice2 = array->Slice(2)->Slice(3);
+  ASSERT_TRUE(slice->Equals(slice2));
+
+  slice = array->Slice(5, 10);
+  slice2 = array->Slice(5, 10);
+  ASSERT_EQ(10, slice->length());
+
+  ASSERT_TRUE(slice->Equals(slice2));
+  ASSERT_TRUE(array->RangeEquals(5, 15, 0, slice));
+}
+
 TYPED_TEST(TestPrimitiveBuilder, TestAppendScalar) {
   DECL_T();
 
@@ -473,4 +505,40 @@ TYPED_TEST(TestPrimitiveBuilder, TestReserve) {
   ASSERT_EQ(BitUtil::NextPower2(kMinBuilderCapacity + 100), this->builder_->capacity());
 }
 
+template <typename TYPE>
+void CheckSliceApproxEquals() {
+  using T = typename TYPE::c_type;
+
+  const int kSize = 50;
+  std::vector<T> draws1;
+  std::vector<T> draws2;
+
+  const uint32_t kSeed = 0;
+  test::random_real<T>(kSize, kSeed, 0, 100, &draws1);
+  test::random_real<T>(kSize, kSeed + 1, 0, 100, &draws2);
+
+  // Make the draws equal in the sliced segment, but unequal elsewhere (to
+  // catch not using the slice offset)
+  for (int i = 10; i < 30; ++i) {
+    draws2[i] = draws1[i];
+  }
+
+  std::vector<bool> is_valid;
+  test::random_is_valid(kSize, 0.1, &is_valid);
+
+  std::shared_ptr<Array> array1, array2;
+  ArrayFromVector<TYPE, T>(is_valid, draws1, &array1);
+  ArrayFromVector<TYPE, T>(is_valid, draws2, &array2);
+
+  std::shared_ptr<Array> slice1 = array1->Slice(10, 20);
+  std::shared_ptr<Array> slice2 = array2->Slice(10, 20);
+
+  ASSERT_TRUE(slice1->ApproxEquals(slice2));
+}
+
+TEST(TestPrimitiveAdHoc, FloatingSliceApproxEquals) {
+  CheckSliceApproxEquals<FloatType>();
+  CheckSliceApproxEquals<DoubleType>();
+}
+
 }  // namespace arrow

http://git-wip-us.apache.org/repos/asf/arrow/blob/5439b715/cpp/src/arrow/array-string-test.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/array-string-test.cc b/cpp/src/arrow/array-string-test.cc
index 5ea384a..8b7eb41 100644
--- a/cpp/src/arrow/array-string-test.cc
+++ b/cpp/src/arrow/array-string-test.cc
@@ -27,6 +27,7 @@
 #include "arrow/builder.h"
 #include "arrow/test-util.h"
 #include "arrow/type.h"
+#include "arrow/type_traits.h"
 
 namespace arrow {
 
@@ -70,7 +71,7 @@ class TestStringArray : public ::testing::Test {
     null_count_ = test::null_count(valid_bytes_);
 
     strings_ = std::make_shared<StringArray>(
-        length_, offsets_buf_, value_buf_, null_count_, null_bitmap_);
+        length_, offsets_buf_, value_buf_, null_bitmap_, null_count_);
   }
 
  protected:
@@ -114,7 +115,7 @@ TEST_F(TestStringArray, TestListFunctions) {
 
 TEST_F(TestStringArray, TestDestructor) {
   auto arr = std::make_shared<StringArray>(
-      length_, offsets_buf_, value_buf_, null_count_, null_bitmap_);
+      length_, offsets_buf_, value_buf_, null_bitmap_, null_count_);
 }
 
 TEST_F(TestStringArray, TestGetString) {
@@ -133,9 +134,9 @@ TEST_F(TestStringArray, TestEmptyStringComparison) {
   length_ = offsets_.size() - 1;
 
   auto strings_a = std::make_shared<StringArray>(
-      length_, offsets_buf_, nullptr, null_count_, null_bitmap_);
+      length_, offsets_buf_, nullptr, null_bitmap_, null_count_);
   auto strings_b = std::make_shared<StringArray>(
-      length_, offsets_buf_, nullptr, null_count_, null_bitmap_);
+      length_, offsets_buf_, nullptr, null_bitmap_, null_count_);
   ASSERT_TRUE(strings_a->Equals(strings_b));
 }
 
@@ -146,8 +147,7 @@ class TestStringBuilder : public TestBuilder {
  public:
   void SetUp() {
     TestBuilder::SetUp();
-    type_ = TypePtr(new StringType());
-    builder_.reset(new StringBuilder(pool_, type_));
+    builder_.reset(new StringBuilder(pool_));
   }
 
   void Done() {
@@ -159,8 +159,6 @@ class TestStringBuilder : public TestBuilder {
   }
 
  protected:
-  TypePtr type_;
-
   std::unique_ptr<StringBuilder> builder_;
   std::shared_ptr<StringArray> result_;
 };
@@ -195,7 +193,7 @@ TEST_F(TestStringBuilder, TestScalarAppend) {
     } else {
       ASSERT_FALSE(result_->IsNull(i));
       result_->GetValue(i, &length);
-      ASSERT_EQ(pos, result_->offset(i));
+      ASSERT_EQ(pos, result_->value_offset(i));
       ASSERT_EQ(static_cast<int>(strings[i % N].size()), length);
       ASSERT_EQ(strings[i % N], result_->GetString(i));
 
@@ -232,7 +230,7 @@ class TestBinaryArray : public ::testing::Test {
     null_count_ = test::null_count(valid_bytes_);
 
     strings_ = std::make_shared<BinaryArray>(
-        length_, offsets_buf_, value_buf_, null_count_, null_bitmap_);
+        length_, offsets_buf_, value_buf_, null_bitmap_, null_count_);
   }
 
  protected:
@@ -276,7 +274,7 @@ TEST_F(TestBinaryArray, TestListFunctions) {
 
 TEST_F(TestBinaryArray, TestDestructor) {
   auto arr = std::make_shared<BinaryArray>(
-      length_, offsets_buf_, value_buf_, null_count_, null_bitmap_);
+      length_, offsets_buf_, value_buf_, null_bitmap_, null_count_);
 }
 
 TEST_F(TestBinaryArray, TestGetValue) {
@@ -306,8 +304,8 @@ TEST_F(TestBinaryArray, TestEqualsEmptyStrings) {
   ASSERT_OK(builder.Finish(&left_arr));
 
   const BinaryArray& left = static_cast<const BinaryArray&>(*left_arr);
-  std::shared_ptr<Array> right = std::make_shared<BinaryArray>(
-      left.length(), left.offsets(), nullptr, left.null_count(), left.null_bitmap());
+  std::shared_ptr<Array> right = std::make_shared<BinaryArray>(left.length(),
+      left.value_offsets(), nullptr, left.null_bitmap(), left.null_count());
 
   ASSERT_TRUE(left.Equals(right));
   ASSERT_TRUE(left.RangeEquals(0, left.length(), 0, right));
@@ -317,8 +315,7 @@ class TestBinaryBuilder : public TestBuilder {
  public:
   void SetUp() {
     TestBuilder::SetUp();
-    type_ = TypePtr(new BinaryType());
-    builder_.reset(new BinaryBuilder(pool_, type_));
+    builder_.reset(new BinaryBuilder(pool_));
   }
 
   void Done() {
@@ -330,8 +327,6 @@ class TestBinaryBuilder : public TestBuilder {
   }
 
  protected:
-  TypePtr type_;
-
   std::unique_ptr<BinaryBuilder> builder_;
   std::shared_ptr<BinaryArray> result_;
 };
@@ -348,8 +343,7 @@ TEST_F(TestBinaryBuilder, TestScalarAppend) {
       if (is_null[i]) {
         builder_->AppendNull();
       } else {
-        builder_->Append(
-            reinterpret_cast<const uint8_t*>(strings[i].data()), strings[i].size());
+        builder_->Append(strings[i]);
       }
     }
   }
@@ -377,4 +371,62 @@ TEST_F(TestBinaryBuilder, TestZeroLength) {
   Done();
 }
 
+// ----------------------------------------------------------------------
+// Slice tests
+
+template <typename TYPE>
+void CheckSliceEquality() {
+  using Traits = TypeTraits<TYPE>;
+  using BuilderType = typename Traits::BuilderType;
+
+  BuilderType builder(default_memory_pool());
+
+  std::vector<std::string> strings = {"foo", "", "bar", "baz", "qux", ""};
+  std::vector<uint8_t> is_null = {0, 1, 0, 1, 0, 0};
+
+  int N = strings.size();
+  int reps = 10;
+
+  for (int j = 0; j < reps; ++j) {
+    for (int i = 0; i < N; ++i) {
+      if (is_null[i]) {
+        builder.AppendNull();
+      } else {
+        builder.Append(strings[i]);
+      }
+    }
+  }
+
+  std::shared_ptr<Array> array;
+  ASSERT_OK(builder.Finish(&array));
+
+  std::shared_ptr<Array> slice, slice2;
+
+  slice = array->Slice(5);
+  slice2 = array->Slice(5);
+  ASSERT_EQ(N * reps - 5, slice->length());
+
+  ASSERT_TRUE(slice->Equals(slice2));
+  ASSERT_TRUE(array->RangeEquals(5, slice->length(), 0, slice));
+
+  // Chained slices
+  slice2 = array->Slice(2)->Slice(3);
+  ASSERT_TRUE(slice->Equals(slice2));
+
+  slice = array->Slice(5, 20);
+  slice2 = array->Slice(5, 20);
+  ASSERT_EQ(20, slice->length());
+
+  ASSERT_TRUE(slice->Equals(slice2));
+  ASSERT_TRUE(array->RangeEquals(5, 25, 0, slice));
+}
+
+TEST_F(TestBinaryArray, TestSliceEquality) {
+  CheckSliceEquality<BinaryType>();
+}
+
+TEST_F(TestStringArray, TestSliceEquality) {
+  CheckSliceEquality<BinaryType>();
+}
+
 }  // namespace arrow

http://git-wip-us.apache.org/repos/asf/arrow/blob/5439b715/cpp/src/arrow/array-struct-test.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/array-struct-test.cc b/cpp/src/arrow/array-struct-test.cc
index 5827c39..f4e7409 100644
--- a/cpp/src/arrow/array-struct-test.cc
+++ b/cpp/src/arrow/array-struct-test.cc
@@ -75,7 +75,7 @@ void ValidateBasicStructArray(const StructArray* result,
   ASSERT_EQ(4, list_char_arr->length());
   ASSERT_EQ(10, list_char_arr->values()->length());
   for (size_t i = 0; i < list_offsets.size(); ++i) {
-    ASSERT_EQ(list_offsets[i], list_char_arr->raw_offsets()[i]);
+    ASSERT_EQ(list_offsets[i], list_char_arr->raw_value_offsets()[i]);
   }
   for (size_t i = 0; i < list_values.size(); ++i) {
     ASSERT_EQ(list_values[i], char_arr->Value(i));
@@ -381,6 +381,23 @@ TEST_F(TestStructBuilder, TestEquality) {
   EXPECT_FALSE(array->RangeEquals(0, 1, 0, unequal_values_array));
   EXPECT_TRUE(array->RangeEquals(1, 3, 1, unequal_values_array));
   EXPECT_FALSE(array->RangeEquals(3, 4, 3, unequal_values_array));
+
+  // ARROW-33 Slice / equality
+  std::shared_ptr<Array> slice, slice2;
+
+  slice = array->Slice(2);
+  slice2 = array->Slice(2);
+  ASSERT_EQ(array->length() - 2, slice->length());
+
+  ASSERT_TRUE(slice->Equals(slice2));
+  ASSERT_TRUE(array->RangeEquals(2, slice->length(), 0, slice));
+
+  slice = array->Slice(1, 2);
+  slice2 = array->Slice(1, 2);
+  ASSERT_EQ(2, slice->length());
+
+  ASSERT_TRUE(slice->Equals(slice2));
+  ASSERT_TRUE(array->RangeEquals(1, 3, 0, slice));
 }
 
 TEST_F(TestStructBuilder, TestZeroLength) {

http://git-wip-us.apache.org/repos/asf/arrow/blob/5439b715/cpp/src/arrow/array-test.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/array-test.cc b/cpp/src/arrow/array-test.cc
index a1d8fdf..45130d8 100644
--- a/cpp/src/arrow/array-test.cc
+++ b/cpp/src/arrow/array-test.cc
@@ -43,7 +43,7 @@ TEST_F(TestArray, TestNullCount) {
   auto data = std::make_shared<PoolBuffer>(pool_);
   auto null_bitmap = std::make_shared<PoolBuffer>(pool_);
 
-  std::unique_ptr<Int32Array> arr(new Int32Array(100, data, 10, null_bitmap));
+  std::unique_ptr<Int32Array> arr(new Int32Array(100, data, null_bitmap, 10));
   ASSERT_EQ(10, arr->null_count());
 
   std::unique_ptr<Int32Array> arr_no_nulls(new Int32Array(100, data));
@@ -67,7 +67,7 @@ std::shared_ptr<Array> MakeArrayFromValidBytes(
   }
 
   std::shared_ptr<Array> arr(
-      new Int32Array(v.size(), value_builder.Finish(), null_count, null_buf));
+      new Int32Array(v.size(), value_builder.Finish(), null_buf, null_count));
   return arr;
 }
 
@@ -87,6 +87,32 @@ TEST_F(TestArray, TestEquality) {
   EXPECT_FALSE(array->RangeEquals(1, 2, 1, unequal_array));
 }
 
+TEST_F(TestArray, SliceRecomputeNullCount) {
+  std::vector<uint8_t> valid_bytes = {1, 0, 1, 1, 0, 1, 0, 0};
+
+  auto array = MakeArrayFromValidBytes(valid_bytes, pool_);
+
+  ASSERT_EQ(4, array->null_count());
+
+  auto slice = array->Slice(1, 4);
+  ASSERT_EQ(2, slice->null_count());
+
+  slice = array->Slice(4);
+  ASSERT_EQ(1, slice->null_count());
+
+  slice = array->Slice(0);
+  ASSERT_EQ(4, slice->null_count());
+
+  // No bitmap, compute 0
+  std::shared_ptr<MutableBuffer> data;
+  const int kBufferSize = 64;
+  ASSERT_OK(AllocateBuffer(pool_, kBufferSize, &data));
+  memset(data->mutable_data(), 0, kBufferSize);
+
+  auto arr = std::make_shared<Int32Array>(16, data, nullptr, -1);
+  ASSERT_EQ(0, arr->null_count());
+}
+
 TEST_F(TestArray, TestIsNull) {
   // clang-format off
   std::vector<uint8_t> null_bitmap = {1, 0, 1, 1, 0, 1, 0, 0,
@@ -102,7 +128,7 @@ TEST_F(TestArray, TestIsNull) {
 
   std::shared_ptr<Buffer> null_buf = test::bytes_to_null_buffer(null_bitmap);
   std::unique_ptr<Array> arr;
-  arr.reset(new Int32Array(null_bitmap.size(), nullptr, null_count, null_buf));
+  arr.reset(new Int32Array(null_bitmap.size(), nullptr, null_buf, null_count));
 
   ASSERT_EQ(null_count, arr->null_count());
   ASSERT_EQ(5, null_buf->size());

http://git-wip-us.apache.org/repos/asf/arrow/blob/5439b715/cpp/src/arrow/array-union-test.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/array-union-test.cc b/cpp/src/arrow/array-union-test.cc
new file mode 100644
index 0000000..eb9bd7d
--- /dev/null
+++ b/cpp/src/arrow/array-union-test.cc
@@ -0,0 +1,67 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Tests for UnionArray
+
+#include <memory>
+#include <string>
+#include <vector>
+
+#include "gtest/gtest.h"
+
+#include "arrow/array.h"
+#include "arrow/builder.h"
+#include "arrow/ipc/test-common.h"
+#include "arrow/status.h"
+#include "arrow/table.h"
+#include "arrow/test-util.h"
+#include "arrow/type.h"
+
+namespace arrow {
+
+TEST(TestUnionArrayAdHoc, TestSliceEquals) {
+  std::shared_ptr<RecordBatch> batch;
+  ASSERT_OK(ipc::MakeUnion(&batch));
+
+  const int size = batch->num_rows();
+
+  auto CheckUnion = [&size](std::shared_ptr<Array> array) {
+    std::shared_ptr<Array> slice, slice2;
+    slice = array->Slice(2);
+    slice2 = array->Slice(2);
+    ASSERT_EQ(size - 2, slice->length());
+
+    ASSERT_TRUE(slice->Equals(slice2));
+    ASSERT_TRUE(array->RangeEquals(2, array->length(), 0, slice));
+
+    // Chained slices
+    slice2 = array->Slice(1)->Slice(1);
+    ASSERT_TRUE(slice->Equals(slice2));
+
+    slice = array->Slice(1, 5);
+    slice2 = array->Slice(1, 5);
+    ASSERT_EQ(5, slice->length());
+
+    ASSERT_TRUE(slice->Equals(slice2));
+    ASSERT_TRUE(array->RangeEquals(1, 6, 0, slice));
+  };
+
+  CheckUnion(batch->column(1));
+  CheckUnion(batch->column(2));
+}
+
+}  // namespace arrow

http://git-wip-us.apache.org/repos/asf/arrow/blob/5439b715/cpp/src/arrow/array.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/array.cc b/cpp/src/arrow/array.cc
index 6fc7fb6..f84023e 100644
--- a/cpp/src/arrow/array.cc
+++ b/cpp/src/arrow/array.cc
@@ -17,6 +17,7 @@
 
 #include "arrow/array.h"
 
+#include <algorithm>
 #include <cstdint>
 #include <cstring>
 #include <sstream>
@@ -30,28 +31,37 @@
 
 namespace arrow {
 
-Status GetEmptyBitmap(
-    MemoryPool* pool, int32_t length, std::shared_ptr<MutableBuffer>* result) {
-  auto buffer = std::make_shared<PoolBuffer>(pool);
-  RETURN_NOT_OK(buffer->Resize(BitUtil::BytesForBits(length)));
-  memset(buffer->mutable_data(), 0, buffer->size());
-
-  *result = buffer;
-  return Status::OK();
-}
+// When slicing, we do not know the null count of the sliced range without
+// doing some computation. To avoid doing this eagerly, we set the null count
+// to -1 (any negative number will do). When Array::null_count is called the
+// first time, the null count will be computed. See ARROW-33
+constexpr int32_t kUnknownNullCount = -1;
 
 // ----------------------------------------------------------------------
 // Base array class
 
-Array::Array(const std::shared_ptr<DataType>& type, int32_t length, int32_t null_count,
-    const std::shared_ptr<Buffer>& null_bitmap) {
-  type_ = type;
-  length_ = length;
-  null_count_ = null_count;
-  null_bitmap_ = null_bitmap;
+Array::Array(const std::shared_ptr<DataType>& type, int32_t length,
+    const std::shared_ptr<Buffer>& null_bitmap, int32_t null_count, int32_t offset)
+    : type_(type),
+      length_(length),
+      offset_(offset),
+      null_count_(null_count),
+      null_bitmap_(null_bitmap),
+      null_bitmap_data_(nullptr) {
   if (null_bitmap_) { null_bitmap_data_ = null_bitmap_->data(); }
 }
 
+int32_t Array::null_count() const {
+  if (null_count_ < 0) {
+    if (null_bitmap_) {
+      null_count_ = CountSetBits(null_bitmap_data_, offset_, length_);
+    } else {
+      null_count_ = 0;
+    }
+  }
+  return null_count_;
+}
+
 bool Array::Equals(const Array& arr) const {
   bool are_equal = false;
   Status error = ArrayEquals(*this, arr, &are_equal);
@@ -86,10 +96,32 @@ bool Array::RangeEquals(int32_t start_idx, int32_t end_idx, int32_t other_start_
   return are_equal;
 }
 
+// Last two parameters are in-out parameters
+static inline void ConformSliceParams(
+    int32_t array_offset, int32_t array_length, int32_t* offset, int32_t* length) {
+  DCHECK_LE(*offset, array_length);
+  DCHECK_GE(offset, 0);
+  *length = std::min(array_length - *offset, *length);
+  *offset = array_offset + *offset;
+}
+
+std::shared_ptr<Array> Array::Slice(int32_t offset) const {
+  int32_t slice_length = length_ - offset;
+  return Slice(offset, slice_length);
+}
+
 Status Array::Validate() const {
   return Status::OK();
 }
 
+NullArray::NullArray(int32_t length) : Array(null(), length, nullptr, length) {}
+
+std::shared_ptr<Array> NullArray::Slice(int32_t offset, int32_t length) const {
+  DCHECK_LE(offset, length_);
+  length = std::min(length_ - offset, length);
+  return std::make_shared<NullArray>(length);
+}
+
 Status NullArray::Accept(ArrayVisitor* visitor) const {
   return visitor->Visit(*this);
 }
@@ -98,9 +130,9 @@ Status NullArray::Accept(ArrayVisitor* visitor) const {
 // Primitive array base
 
 PrimitiveArray::PrimitiveArray(const std::shared_ptr<DataType>& type, int32_t length,
-    const std::shared_ptr<Buffer>& data, int32_t null_count,
-    const std::shared_ptr<Buffer>& null_bitmap)
-    : Array(type, length, null_count, null_bitmap) {
+    const std::shared_ptr<Buffer>& data, const std::shared_ptr<Buffer>& null_bitmap,
+    int32_t null_count, int32_t offset)
+    : Array(type, length, null_bitmap, null_count, offset) {
   data_ = data;
   raw_data_ = data == nullptr ? nullptr : data_->data();
 }
@@ -110,6 +142,13 @@ Status NumericArray<T>::Accept(ArrayVisitor* visitor) const {
   return visitor->Visit(*this);
 }
 
+template <typename T>
+std::shared_ptr<Array> NumericArray<T>::Slice(int32_t offset, int32_t length) const {
+  ConformSliceParams(offset_, length_, &offset, &length);
+  return std::make_shared<NumericArray<T>>(
+      type_, length, data_, null_bitmap_, kUnknownNullCount, offset);
+}
+
 template class NumericArray<UInt8Type>;
 template class NumericArray<UInt16Type>;
 template class NumericArray<UInt32Type>;
@@ -129,32 +168,33 @@ template class NumericArray<DoubleType>;
 // BooleanArray
 
 BooleanArray::BooleanArray(int32_t length, const std::shared_ptr<Buffer>& data,
-    int32_t null_count, const std::shared_ptr<Buffer>& null_bitmap)
-    : PrimitiveArray(
-          std::make_shared<BooleanType>(), length, data, null_count, null_bitmap) {}
-
-BooleanArray::BooleanArray(const std::shared_ptr<DataType>& type, int32_t length,
-    const std::shared_ptr<Buffer>& data, int32_t null_count,
-    const std::shared_ptr<Buffer>& null_bitmap)
-    : PrimitiveArray(type, length, data, null_count, null_bitmap) {}
+    const std::shared_ptr<Buffer>& null_bitmap, int32_t null_count, int32_t offset)
+    : PrimitiveArray(std::make_shared<BooleanType>(), length, data, null_bitmap,
+          null_count, offset) {}
 
 Status BooleanArray::Accept(ArrayVisitor* visitor) const {
   return visitor->Visit(*this);
 }
 
+std::shared_ptr<Array> BooleanArray::Slice(int32_t offset, int32_t length) const {
+  ConformSliceParams(offset_, length_, &offset, &length);
+  return std::make_shared<BooleanArray>(
+      length, data_, null_bitmap_, kUnknownNullCount, offset);
+}
+
 // ----------------------------------------------------------------------
 // ListArray
 
 Status ListArray::Validate() const {
   if (length_ < 0) { return Status::Invalid("Length was negative"); }
-  if (!offsets_buffer_) { return Status::Invalid("offsets_buffer_ was null"); }
-  if (offsets_buffer_->size() / static_cast<int>(sizeof(int32_t)) < length_) {
+  if (!value_offsets_) { return Status::Invalid("value_offsets_ was null"); }
+  if (value_offsets_->size() / static_cast<int>(sizeof(int32_t)) < length_) {
     std::stringstream ss;
-    ss << "offset buffer size (bytes): " << offsets_buffer_->size()
+    ss << "offset buffer size (bytes): " << value_offsets_->size()
        << " isn't large enough for length: " << length_;
     return Status::Invalid(ss.str());
   }
-  const int32_t last_offset = offset(length_);
+  const int32_t last_offset = this->value_offset(length_);
   if (last_offset > 0) {
     if (!values_) {
       return Status::Invalid("last offset was non-zero and values was null");
@@ -174,14 +214,15 @@ Status ListArray::Validate() const {
     }
   }
 
-  int32_t prev_offset = offset(0);
+  int32_t prev_offset = this->value_offset(0);
   if (prev_offset != 0) { return Status::Invalid("The first offset wasn't zero"); }
   for (int32_t i = 1; i <= length_; ++i) {
-    int32_t current_offset = offset(i);
+    int32_t current_offset = this->value_offset(i);
     if (IsNull(i - 1) && current_offset != prev_offset) {
       std::stringstream ss;
-      ss << "Offset invariant failure at: " << i << " inconsistent offsets for null slot"
-         << current_offset << "!=" << prev_offset;
+      ss << "Offset invariant failure at: " << i
+         << " inconsistent value_offsets for null slot" << current_offset
+         << "!=" << prev_offset;
       return Status::Invalid(ss.str());
     }
     if (current_offset < prev_offset) {
@@ -200,26 +241,33 @@ Status ListArray::Accept(ArrayVisitor* visitor) const {
   return visitor->Visit(*this);
 }
 
+std::shared_ptr<Array> ListArray::Slice(int32_t offset, int32_t length) const {
+  ConformSliceParams(offset_, length_, &offset, &length);
+  return std::make_shared<ListArray>(
+      type_, length, value_offsets_, values_, null_bitmap_, kUnknownNullCount, offset);
+}
+
 // ----------------------------------------------------------------------
 // String and binary
 
 static std::shared_ptr<DataType> kBinary = std::make_shared<BinaryType>();
 static std::shared_ptr<DataType> kString = std::make_shared<StringType>();
 
-BinaryArray::BinaryArray(int32_t length, const std::shared_ptr<Buffer>& offsets,
-    const std::shared_ptr<Buffer>& data, int32_t null_count,
-    const std::shared_ptr<Buffer>& null_bitmap)
-    : BinaryArray(kBinary, length, offsets, data, null_count, null_bitmap) {}
+BinaryArray::BinaryArray(int32_t length, const std::shared_ptr<Buffer>& value_offsets,
+    const std::shared_ptr<Buffer>& data, const std::shared_ptr<Buffer>& null_bitmap,
+    int32_t null_count, int32_t offset)
+    : BinaryArray(kBinary, length, value_offsets, data, null_bitmap, null_count, offset) {
+}
 
 BinaryArray::BinaryArray(const std::shared_ptr<DataType>& type, int32_t length,
-    const std::shared_ptr<Buffer>& offsets, const std::shared_ptr<Buffer>& data,
-    int32_t null_count, const std::shared_ptr<Buffer>& null_bitmap)
-    : Array(type, length, null_count, null_bitmap),
-      offsets_buffer_(offsets),
-      offsets_(reinterpret_cast<const int32_t*>(offsets_buffer_->data())),
-      data_buffer_(data),
-      data_(nullptr) {
-  if (data_buffer_ != nullptr) { data_ = data_buffer_->data(); }
+    const std::shared_ptr<Buffer>& value_offsets, const std::shared_ptr<Buffer>& data,
+    const std::shared_ptr<Buffer>& null_bitmap, int32_t null_count, int32_t offset)
+    : Array(type, length, null_bitmap, null_count, offset),
+      value_offsets_(value_offsets),
+      raw_value_offsets_(reinterpret_cast<const int32_t*>(value_offsets_->data())),
+      data_(data),
+      raw_data_(nullptr) {
+  if (data_ != nullptr) { raw_data_ = data_->data(); }
 }
 
 Status BinaryArray::Validate() const {
@@ -231,10 +279,17 @@ Status BinaryArray::Accept(ArrayVisitor* visitor) const {
   return visitor->Visit(*this);
 }
 
-StringArray::StringArray(int32_t length, const std::shared_ptr<Buffer>& offsets,
-    const std::shared_ptr<Buffer>& data, int32_t null_count,
-    const std::shared_ptr<Buffer>& null_bitmap)
-    : BinaryArray(kString, length, offsets, data, null_count, null_bitmap) {}
+std::shared_ptr<Array> BinaryArray::Slice(int32_t offset, int32_t length) const {
+  ConformSliceParams(offset_, length_, &offset, &length);
+  return std::make_shared<BinaryArray>(
+      length, value_offsets_, data_, null_bitmap_, kUnknownNullCount, offset);
+}
+
+StringArray::StringArray(int32_t length, const std::shared_ptr<Buffer>& value_offsets,
+    const std::shared_ptr<Buffer>& data, const std::shared_ptr<Buffer>& null_bitmap,
+    int32_t null_count, int32_t offset)
+    : BinaryArray(kString, length, value_offsets, data, null_bitmap, null_count, offset) {
+}
 
 Status StringArray::Validate() const {
   // TODO(emkornfield) Validate proper UTF8 code points?
@@ -245,12 +300,26 @@ Status StringArray::Accept(ArrayVisitor* visitor) const {
   return visitor->Visit(*this);
 }
 
+std::shared_ptr<Array> StringArray::Slice(int32_t offset, int32_t length) const {
+  ConformSliceParams(offset_, length_, &offset, &length);
+  return std::make_shared<StringArray>(
+      length, value_offsets_, data_, null_bitmap_, kUnknownNullCount, offset);
+}
+
 // ----------------------------------------------------------------------
 // Struct
 
+StructArray::StructArray(const std::shared_ptr<DataType>& type, int32_t length,
+    const std::vector<std::shared_ptr<Array>>& children,
+    std::shared_ptr<Buffer> null_bitmap, int32_t null_count, int32_t offset)
+    : Array(type, length, null_bitmap, null_count, offset) {
+  type_ = type;
+  children_ = children;
+}
+
 std::shared_ptr<Array> StructArray::field(int32_t pos) const {
-  DCHECK_GT(field_arrays_.size(), 0);
-  return field_arrays_[pos];
+  DCHECK_GT(children_.size(), 0);
+  return children_[pos];
 }
 
 Status StructArray::Validate() const {
@@ -260,11 +329,11 @@ Status StructArray::Validate() const {
     return Status::Invalid("Null count exceeds the length of this struct");
   }
 
-  if (field_arrays_.size() > 0) {
+  if (children_.size() > 0) {
     // Validate fields
-    int32_t array_length = field_arrays_[0]->length();
+    int32_t array_length = children_[0]->length();
     size_t idx = 0;
-    for (auto it : field_arrays_) {
+    for (auto it : children_) {
       if (it->length() != array_length) {
         std::stringstream ss;
         ss << "Length is not equal from field " << it->type()->ToString()
@@ -293,19 +362,27 @@ Status StructArray::Accept(ArrayVisitor* visitor) const {
   return visitor->Visit(*this);
 }
 
+std::shared_ptr<Array> StructArray::Slice(int32_t offset, int32_t length) const {
+  ConformSliceParams(offset_, length_, &offset, &length);
+  return std::make_shared<StructArray>(
+      type_, length, children_, null_bitmap_, kUnknownNullCount, offset);
+}
+
 // ----------------------------------------------------------------------
 // UnionArray
 
 UnionArray::UnionArray(const std::shared_ptr<DataType>& type, int32_t length,
     const std::vector<std::shared_ptr<Array>>& children,
-    const std::shared_ptr<Buffer>& type_ids, const std::shared_ptr<Buffer>& offsets,
-    int32_t null_count, const std::shared_ptr<Buffer>& null_bitmap)
-    : Array(type, length, null_count, null_bitmap),
+    const std::shared_ptr<Buffer>& type_ids, const std::shared_ptr<Buffer>& value_offsets,
+    const std::shared_ptr<Buffer>& null_bitmap, int32_t null_count, int32_t offset)
+    : Array(type, length, null_bitmap, null_count, offset),
       children_(children),
-      type_ids_buffer_(type_ids),
-      offsets_buffer_(offsets) {
-  type_ids_ = reinterpret_cast<const uint8_t*>(type_ids->data());
-  if (offsets) { offsets_ = reinterpret_cast<const int32_t*>(offsets->data()); }
+      type_ids_(type_ids),
+      value_offsets_(value_offsets) {
+  raw_type_ids_ = reinterpret_cast<const uint8_t*>(type_ids->data());
+  if (value_offsets) {
+    raw_value_offsets_ = reinterpret_cast<const int32_t*>(value_offsets->data());
+  }
 }
 
 std::shared_ptr<Array> UnionArray::child(int32_t pos) const {
@@ -328,18 +405,24 @@ Status UnionArray::Accept(ArrayVisitor* visitor) const {
   return visitor->Visit(*this);
 }
 
+std::shared_ptr<Array> UnionArray::Slice(int32_t offset, int32_t length) const {
+  ConformSliceParams(offset_, length_, &offset, &length);
+  return std::make_shared<UnionArray>(type_, length, children_, type_ids_, value_offsets_,
+      null_bitmap_, kUnknownNullCount, offset);
+}
+
 // ----------------------------------------------------------------------
 // DictionaryArray
 
 Status DictionaryArray::FromBuffer(const std::shared_ptr<DataType>& type, int32_t length,
-    const std::shared_ptr<Buffer>& indices, int32_t null_count,
-    const std::shared_ptr<Buffer>& null_bitmap, std::shared_ptr<DictionaryArray>* out) {
+    const std::shared_ptr<Buffer>& indices, const std::shared_ptr<Buffer>& null_bitmap,
+    int32_t null_count, int32_t offset, std::shared_ptr<DictionaryArray>* out) {
   DCHECK_EQ(type->type, Type::DICTIONARY);
   const auto& dict_type = static_cast<const DictionaryType*>(type.get());
 
   std::shared_ptr<Array> boxed_indices;
-  RETURN_NOT_OK(MakePrimitiveArray(
-      dict_type->index_type(), length, indices, null_count, null_bitmap, &boxed_indices));
+  RETURN_NOT_OK(MakePrimitiveArray(dict_type->index_type(), length, indices, null_bitmap,
+      null_count, offset, &boxed_indices));
 
   *out = std::make_shared<DictionaryArray>(type, boxed_indices);
   return Status::OK();
@@ -347,7 +430,8 @@ Status DictionaryArray::FromBuffer(const std::shared_ptr<DataType>& type, int32_
 
 DictionaryArray::DictionaryArray(
     const std::shared_ptr<DataType>& type, const std::shared_ptr<Array>& indices)
-    : Array(type, indices->length(), indices->null_count(), indices->null_bitmap()),
+    : Array(type, indices->length(), indices->null_bitmap(), indices->null_count(),
+          indices->offset()),
       dict_type_(static_cast<const DictionaryType*>(type.get())),
       indices_(indices) {
   DCHECK_EQ(type->type, Type::DICTIONARY);
@@ -369,16 +453,21 @@ Status DictionaryArray::Accept(ArrayVisitor* visitor) const {
   return visitor->Visit(*this);
 }
 
+std::shared_ptr<Array> DictionaryArray::Slice(int32_t offset, int32_t length) const {
+  std::shared_ptr<Array> sliced_indices = indices_->Slice(offset, length);
+  return std::make_shared<DictionaryArray>(type_, sliced_indices);
+}
+
 // ----------------------------------------------------------------------
 
-#define MAKE_PRIMITIVE_ARRAY_CASE(ENUM, ArrayType)                          \
-  case Type::ENUM:                                                          \
-    out->reset(new ArrayType(type, length, data, null_count, null_bitmap)); \
+#define MAKE_PRIMITIVE_ARRAY_CASE(ENUM, ArrayType)                                  \
+  case Type::ENUM:                                                                  \
+    out->reset(new ArrayType(type, length, data, null_bitmap, null_count, offset)); \
     break;
 
 Status MakePrimitiveArray(const std::shared_ptr<DataType>& type, int32_t length,
-    const std::shared_ptr<Buffer>& data, int32_t null_count,
-    const std::shared_ptr<Buffer>& null_bitmap, std::shared_ptr<Array>* out) {
+    const std::shared_ptr<Buffer>& data, const std::shared_ptr<Buffer>& null_bitmap,
+    int32_t null_count, int32_t offset, std::shared_ptr<Array>* out) {
   switch (type->type) {
     MAKE_PRIMITIVE_ARRAY_CASE(BOOL, BooleanArray);
     MAKE_PRIMITIVE_ARRAY_CASE(UINT8, UInt8Array);

http://git-wip-us.apache.org/repos/asf/arrow/blob/5439b715/cpp/src/arrow/array.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/array.h b/cpp/src/arrow/array.h
index 3b6e93f..f3e8f9a 100644
--- a/cpp/src/arrow/array.h
+++ b/cpp/src/arrow/array.h
@@ -27,6 +27,7 @@
 #include "arrow/buffer.h"
 #include "arrow/type.h"
 #include "arrow/type_fwd.h"
+#include "arrow/type_traits.h"
 #include "arrow/util/bit-util.h"
 #include "arrow/util/macros.h"
 #include "arrow/util/visibility.h"
@@ -71,23 +72,36 @@ class ArrayVisitor {
 ///
 /// The base class is only required to have a null bitmap buffer if the null
 /// count is greater than 0
+///
+/// If known, the null count can be provided in the base Array constructor. If
+/// the null count is not known, pass -1 to indicate that the null count is to
+/// be computed on the first call to null_count()
 class ARROW_EXPORT Array {
  public:
-  Array(const std::shared_ptr<DataType>& type, int32_t length, int32_t null_count = 0,
-      const std::shared_ptr<Buffer>& null_bitmap = nullptr);
+  Array(const std::shared_ptr<DataType>& type, int32_t length,
+      const std::shared_ptr<Buffer>& null_bitmap = nullptr, int32_t null_count = 0,
+      int32_t offset = 0);
 
   virtual ~Array() = default;
 
   /// Determine if a slot is null. For inner loops. Does *not* boundscheck
   bool IsNull(int i) const {
-    return null_count_ > 0 && BitUtil::BitNotSet(null_bitmap_data_, i);
+    return null_bitmap_data_ != nullptr &&
+           BitUtil::BitNotSet(null_bitmap_data_, i + offset_);
   }
 
   /// Size in the number of elements this array contains.
   int32_t length() const { return length_; }
 
-  /// The number of null entries in the array.
-  int32_t null_count() const { return null_count_; }
+  /// A relative position into another array's data, to enable zero-copy
+  /// slicing. This value defaults to zero
+  int32_t offset() const { return offset_; }
+
+  /// The number of null entries in the array. If the null count was not known
+  /// at time of construction (and set to a negative value), then the null
+  /// count will be computed and cached on the first invocation of this
+  /// function
+  int32_t null_count() const;
 
   std::shared_ptr<DataType> type() const { return type_; }
   Type::type type_enum() const { return type_->type; }
@@ -95,11 +109,13 @@ class ARROW_EXPORT Array {
   /// Buffer for the null bitmap.
   ///
   /// Note that for `null_count == 0`, this can be a `nullptr`.
+  /// This buffer does not account for any slice offset
   std::shared_ptr<Buffer> null_bitmap() const { return null_bitmap_; }
 
   /// Raw pointer to the null bitmap.
   ///
   /// Note that for `null_count == 0`, this can be a `nullptr`.
+  /// This buffer does not account for any slice offset
   const uint8_t* null_bitmap_data() const { return null_bitmap_data_; }
 
   bool Equals(const Array& arr) const;
@@ -120,10 +136,29 @@ class ARROW_EXPORT Array {
 
   virtual Status Accept(ArrayVisitor* visitor) const = 0;
 
+  /// Construct a zero-copy slice of the array with the indicated offset and
+  /// length
+  ///
+  /// \param[in] offset the position of the first element in the constructed slice
+  /// \param[in] length the length of the slice. If there are not enough elements in the
+  /// array,
+  ///     the length will be adjusted accordingly
+  ///
+  /// \return a new object wrapped in std::shared_ptr<Array>
+  virtual std::shared_ptr<Array> Slice(int32_t offset, int32_t length) const = 0;
+
+  /// Slice from offset until end of the array
+  std::shared_ptr<Array> Slice(int32_t offset) const;
+
  protected:
   std::shared_ptr<DataType> type_;
-  int32_t null_count_;
   int32_t length_;
+  int32_t offset_;
+
+  // This member is marked mutable so that it can be modified when null_count()
+  // is called from a const context and the null count has to be computed (if
+  // it is not already known)
+  mutable int32_t null_count_;
 
   std::shared_ptr<Buffer> null_bitmap_;
   const uint8_t* null_bitmap_data_;
@@ -138,28 +173,26 @@ class ARROW_EXPORT NullArray : public Array {
  public:
   using TypeClass = NullType;
 
-  NullArray(const std::shared_ptr<DataType>& type, int32_t length)
-      : Array(type, length, length, nullptr) {}
-
-  explicit NullArray(int32_t length) : NullArray(std::make_shared<NullType>(), length) {}
+  explicit NullArray(int32_t length);
 
   Status Accept(ArrayVisitor* visitor) const override;
-};
 
-Status ARROW_EXPORT GetEmptyBitmap(
-    MemoryPool* pool, int32_t length, std::shared_ptr<MutableBuffer>* result);
+  std::shared_ptr<Array> Slice(int32_t offset, int32_t length) const override;
+};
 
 /// Base class for fixed-size logical types
 class ARROW_EXPORT PrimitiveArray : public Array {
  public:
-  virtual ~PrimitiveArray() {}
+  PrimitiveArray(const std::shared_ptr<DataType>& type, int32_t length,
+      const std::shared_ptr<Buffer>& data,
+      const std::shared_ptr<Buffer>& null_bitmap = nullptr, int32_t null_count = 0,
+      int32_t offset = 0);
 
+  /// The memory containing this array's data
+  /// This buffer does not account for any slice offset
   std::shared_ptr<Buffer> data() const { return data_; }
 
  protected:
-  PrimitiveArray(const std::shared_ptr<DataType>& type, int32_t length,
-      const std::shared_ptr<Buffer>& data, int32_t null_count = 0,
-      const std::shared_ptr<Buffer>& null_bitmap = nullptr);
   std::shared_ptr<Buffer> data_;
   const uint8_t* raw_data_;
 };
@@ -169,21 +202,28 @@ class ARROW_EXPORT NumericArray : public PrimitiveArray {
  public:
   using TypeClass = TYPE;
   using value_type = typename TypeClass::c_type;
-  NumericArray(int32_t length, const std::shared_ptr<Buffer>& data,
-      int32_t null_count = 0, const std::shared_ptr<Buffer>& null_bitmap = nullptr)
-      : PrimitiveArray(
-            std::make_shared<TypeClass>(), length, data, null_count, null_bitmap) {}
-  NumericArray(const std::shared_ptr<DataType>& type, int32_t length,
-      const std::shared_ptr<Buffer>& data, int32_t null_count = 0,
-      const std::shared_ptr<Buffer>& null_bitmap = nullptr)
-      : PrimitiveArray(type, length, data, null_count, null_bitmap) {}
+
+  using PrimitiveArray::PrimitiveArray;
+
+  // Only enable this constructor without a type argument for types without additional
+  // metadata
+  template <typename T1 = TYPE>
+  NumericArray(
+      typename std::enable_if<TypeTraits<T1>::is_parameter_free, int32_t>::type length,
+      const std::shared_ptr<Buffer>& data,
+      const std::shared_ptr<Buffer>& null_bitmap = nullptr, int32_t null_count = 0,
+      int32_t offset = 0)
+      : PrimitiveArray(TypeTraits<T1>::type_singleton(), length, data, null_bitmap,
+            null_count, offset) {}
 
   const value_type* raw_data() const {
-    return reinterpret_cast<const value_type*>(raw_data_);
+    return reinterpret_cast<const value_type*>(raw_data_) + offset_;
   }
 
   Status Accept(ArrayVisitor* visitor) const override;
 
+  std::shared_ptr<Array> Slice(int32_t offset, int32_t length) const override;
+
   value_type Value(int i) const { return raw_data()[i]; }
 };
 
@@ -191,17 +231,19 @@ class ARROW_EXPORT BooleanArray : public PrimitiveArray {
  public:
   using TypeClass = BooleanType;
 
+  using PrimitiveArray::PrimitiveArray;
+
   BooleanArray(int32_t length, const std::shared_ptr<Buffer>& data,
-      int32_t null_count = 0, const std::shared_ptr<Buffer>& null_bitmap = nullptr);
-  BooleanArray(const std::shared_ptr<DataType>& type, int32_t length,
-      const std::shared_ptr<Buffer>& data, int32_t null_count = 0,
-      const std::shared_ptr<Buffer>& null_bitmap = nullptr);
+      const std::shared_ptr<Buffer>& null_bitmap = nullptr, int32_t null_count = 0,
+      int32_t offset = 0);
 
   Status Accept(ArrayVisitor* visitor) const override;
 
-  const uint8_t* raw_data() const { return reinterpret_cast<const uint8_t*>(raw_data_); }
+  std::shared_ptr<Array> Slice(int32_t offset, int32_t length) const override;
 
-  bool Value(int i) const { return BitUtil::GetBit(raw_data(), i); }
+  bool Value(int i) const {
+    return BitUtil::GetBit(reinterpret_cast<const uint8_t*>(raw_data_), i + offset_);
+  }
 };
 
 // ----------------------------------------------------------------------
@@ -212,39 +254,45 @@ class ARROW_EXPORT ListArray : public Array {
   using TypeClass = ListType;
 
   ListArray(const std::shared_ptr<DataType>& type, int32_t length,
-      const std::shared_ptr<Buffer>& offsets, const std::shared_ptr<Array>& values,
-      int32_t null_count = 0, const std::shared_ptr<Buffer>& null_bitmap = nullptr)
-      : Array(type, length, null_count, null_bitmap) {
-    offsets_buffer_ = offsets;
-    offsets_ = offsets == nullptr ? nullptr : reinterpret_cast<const int32_t*>(
-                                                  offsets_buffer_->data());
+      const std::shared_ptr<Buffer>& value_offsets, const std::shared_ptr<Array>& values,
+      const std::shared_ptr<Buffer>& null_bitmap = nullptr, int32_t null_count = 0,
+      int32_t offset = 0)
+      : Array(type, length, null_bitmap, null_count, offset) {
+    value_offsets_ = value_offsets;
+    raw_value_offsets_ = value_offsets == nullptr
+                             ? nullptr
+                             : reinterpret_cast<const int32_t*>(value_offsets_->data());
     values_ = values;
   }
 
   Status Validate() const override;
 
-  virtual ~ListArray() = default;
-
   // Return a shared pointer in case the requestor desires to share ownership
   // with this array.
   std::shared_ptr<Array> values() const { return values_; }
-  std::shared_ptr<Buffer> offsets() const { return offsets_buffer_; }
 
-  std::shared_ptr<DataType> value_type() const { return values_->type(); }
+  /// Note that this buffer does not account for any slice offset
+  std::shared_ptr<Buffer> value_offsets() const { return value_offsets_; }
 
-  const int32_t* raw_offsets() const { return offsets_; }
+  std::shared_ptr<DataType> value_type() const { return values_->type(); }
 
-  int32_t offset(int i) const { return offsets_[i]; }
+  /// Return pointer to raw value offsets accounting for any slice offset
+  const int32_t* raw_value_offsets() const { return raw_value_offsets_ + offset_; }
 
   // Neither of these functions will perform boundschecking
-  int32_t value_offset(int i) const { return offsets_[i]; }
-  int32_t value_length(int i) const { return offsets_[i + 1] - offsets_[i]; }
+  int32_t value_offset(int i) const { return raw_value_offsets_[i + offset_]; }
+  int32_t value_length(int i) const {
+    i += offset_;
+    return raw_value_offsets_[i + 1] - raw_value_offsets_[i];
+  }
 
   Status Accept(ArrayVisitor* visitor) const override;
 
+  std::shared_ptr<Array> Slice(int32_t offset, int32_t length) const override;
+
  protected:
-  std::shared_ptr<Buffer> offsets_buffer_;
-  const int32_t* offsets_;
+  std::shared_ptr<Buffer> value_offsets_;
+  const int32_t* raw_value_offsets_;
   std::shared_ptr<Array> values_;
 };
 
@@ -255,55 +303,67 @@ class ARROW_EXPORT BinaryArray : public Array {
  public:
   using TypeClass = BinaryType;
 
-  BinaryArray(int32_t length, const std::shared_ptr<Buffer>& offsets,
-      const std::shared_ptr<Buffer>& data, int32_t null_count = 0,
-      const std::shared_ptr<Buffer>& null_bitmap = nullptr);
-
-  // Constructor that allows sub-classes/builders to propagate there logical type up the
-  // class hierarchy.
-  BinaryArray(const std::shared_ptr<DataType>& type, int32_t length,
-      const std::shared_ptr<Buffer>& offsets, const std::shared_ptr<Buffer>& data,
-      int32_t null_count = 0, const std::shared_ptr<Buffer>& null_bitmap = nullptr);
+  BinaryArray(int32_t length, const std::shared_ptr<Buffer>& value_offsets,
+      const std::shared_ptr<Buffer>& data,
+      const std::shared_ptr<Buffer>& null_bitmap = nullptr, int32_t null_count = 0,
+      int32_t offset = 0);
 
   // Return the pointer to the given elements bytes
   // TODO(emkornfield) introduce a StringPiece or something similar to capture zero-copy
   // pointer + offset
   const uint8_t* GetValue(int i, int32_t* out_length) const {
-    const int32_t pos = offsets_[i];
-    *out_length = offsets_[i + 1] - pos;
-    return data_ + pos;
+    // Account for base offset
+    i += offset_;
+
+    const int32_t pos = raw_value_offsets_[i];
+    *out_length = raw_value_offsets_[i + 1] - pos;
+    return raw_data_ + pos;
   }
 
-  std::shared_ptr<Buffer> data() const { return data_buffer_; }
-  std::shared_ptr<Buffer> offsets() const { return offsets_buffer_; }
+  /// Note that this buffer does not account for any slice offset
+  std::shared_ptr<Buffer> data() const { return data_; }
 
-  const int32_t* raw_offsets() const { return offsets_; }
+  /// Note that this buffer does not account for any slice offset
+  std::shared_ptr<Buffer> value_offsets() const { return value_offsets_; }
 
-  int32_t offset(int i) const { return offsets_[i]; }
+  const int32_t* raw_value_offsets() const { return raw_value_offsets_ + offset_; }
 
   // Neither of these functions will perform boundschecking
-  int32_t value_offset(int i) const { return offsets_[i]; }
-  int32_t value_length(int i) const { return offsets_[i + 1] - offsets_[i]; }
+  int32_t value_offset(int i) const { return raw_value_offsets_[i + offset_]; }
+  int32_t value_length(int i) const {
+    i += offset_;
+    return raw_value_offsets_[i + 1] - raw_value_offsets_[i];
+  }
 
   Status Validate() const override;
 
   Status Accept(ArrayVisitor* visitor) const override;
 
- private:
-  std::shared_ptr<Buffer> offsets_buffer_;
-  const int32_t* offsets_;
+  std::shared_ptr<Array> Slice(int32_t offset, int32_t length) const override;
+
+ protected:
+  // Constructor that allows sub-classes/builders to propagate there logical type up the
+  // class hierarchy.
+  BinaryArray(const std::shared_ptr<DataType>& type, int32_t length,
+      const std::shared_ptr<Buffer>& value_offsets, const std::shared_ptr<Buffer>& data,
+      const std::shared_ptr<Buffer>& null_bitmap = nullptr, int32_t null_count = 0,
+      int32_t offset = 0);
 
-  std::shared_ptr<Buffer> data_buffer_;
-  const uint8_t* data_;
+  std::shared_ptr<Buffer> value_offsets_;
+  const int32_t* raw_value_offsets_;
+
+  std::shared_ptr<Buffer> data_;
+  const uint8_t* raw_data_;
 };
 
 class ARROW_EXPORT StringArray : public BinaryArray {
  public:
   using TypeClass = StringType;
 
-  StringArray(int32_t length, const std::shared_ptr<Buffer>& offsets,
-      const std::shared_ptr<Buffer>& data, int32_t null_count = 0,
-      const std::shared_ptr<Buffer>& null_bitmap = nullptr);
+  StringArray(int32_t length, const std::shared_ptr<Buffer>& value_offsets,
+      const std::shared_ptr<Buffer>& data,
+      const std::shared_ptr<Buffer>& null_bitmap = nullptr, int32_t null_count = 0,
+      int32_t offset = 0);
 
   // Construct a std::string
   // TODO: std::bad_alloc possibility
@@ -316,6 +376,8 @@ class ARROW_EXPORT StringArray : public BinaryArray {
   Status Validate() const override;
 
   Status Accept(ArrayVisitor* visitor) const override;
+
+  std::shared_ptr<Array> Slice(int32_t offset, int32_t length) const override;
 };
 
 // ----------------------------------------------------------------------
@@ -326,28 +388,25 @@ class ARROW_EXPORT StructArray : public Array {
   using TypeClass = StructType;
 
   StructArray(const std::shared_ptr<DataType>& type, int32_t length,
-      const std::vector<std::shared_ptr<Array>>& field_arrays, int32_t null_count = 0,
-      std::shared_ptr<Buffer> null_bitmap = nullptr)
-      : Array(type, length, null_count, null_bitmap) {
-    type_ = type;
-    field_arrays_ = field_arrays;
-  }
+      const std::vector<std::shared_ptr<Array>>& children,
+      std::shared_ptr<Buffer> null_bitmap = nullptr, int32_t null_count = 0,
+      int32_t offset = 0);
 
   Status Validate() const override;
 
-  virtual ~StructArray() {}
-
   // Return a shared pointer in case the requestor desires to share ownership
   // with this array.
   std::shared_ptr<Array> field(int32_t pos) const;
 
-  const std::vector<std::shared_ptr<Array>>& fields() const { return field_arrays_; }
+  const std::vector<std::shared_ptr<Array>>& fields() const { return children_; }
 
   Status Accept(ArrayVisitor* visitor) const override;
 
+  std::shared_ptr<Array> Slice(int32_t offset, int32_t length) const override;
+
  protected:
   // The child arrays corresponding to each field of the struct data type.
-  std::vector<std::shared_ptr<Array>> field_arrays_;
+  std::vector<std::shared_ptr<Array>> children_;
 };
 
 // ----------------------------------------------------------------------
@@ -356,22 +415,25 @@ class ARROW_EXPORT StructArray : public Array {
 class ARROW_EXPORT UnionArray : public Array {
  public:
   using TypeClass = UnionType;
+  using type_id_t = uint8_t;
 
   UnionArray(const std::shared_ptr<DataType>& type, int32_t length,
       const std::vector<std::shared_ptr<Array>>& children,
       const std::shared_ptr<Buffer>& type_ids,
-      const std::shared_ptr<Buffer>& offsets = nullptr, int32_t null_count = 0,
-      const std::shared_ptr<Buffer>& null_bitmap = nullptr);
+      const std::shared_ptr<Buffer>& value_offsets = nullptr,
+      const std::shared_ptr<Buffer>& null_bitmap = nullptr, int32_t null_count = 0,
+      int32_t offset = 0);
 
   Status Validate() const override;
 
-  virtual ~UnionArray() {}
+  /// Note that this buffer does not account for any slice offset
+  std::shared_ptr<Buffer> type_ids() const { return type_ids_; }
 
-  std::shared_ptr<Buffer> type_ids() const { return type_ids_buffer_; }
-  const uint8_t* raw_type_ids() const { return type_ids_; }
+  /// Note that this buffer does not account for any slice offset
+  std::shared_ptr<Buffer> value_offsets() const { return value_offsets_; }
 
-  std::shared_ptr<Buffer> offsets() const { return offsets_buffer_; }
-  const int32_t* raw_offsets() const { return offsets_; }
+  const type_id_t* raw_type_ids() const { return raw_type_ids_ + offset_; }
+  const int32_t* raw_value_offsets() const { return raw_value_offsets_ + offset_; }
 
   UnionMode mode() const { return static_cast<const UnionType&>(*type_.get()).mode; }
 
@@ -381,14 +443,16 @@ class ARROW_EXPORT UnionArray : public Array {
 
   Status Accept(ArrayVisitor* visitor) const override;
 
+  std::shared_ptr<Array> Slice(int32_t offset, int32_t length) const override;
+
  protected:
   std::vector<std::shared_ptr<Array>> children_;
 
-  std::shared_ptr<Buffer> type_ids_buffer_;
-  const uint8_t* type_ids_;
+  std::shared_ptr<Buffer> type_ids_;
+  const type_id_t* raw_type_ids_;
 
-  std::shared_ptr<Buffer> offsets_buffer_;
-  const int32_t* offsets_;
+  std::shared_ptr<Buffer> value_offsets_;
+  const int32_t* raw_value_offsets_;
 };
 
 // ----------------------------------------------------------------------
@@ -419,8 +483,8 @@ class ARROW_EXPORT DictionaryArray : public Array {
   // Alternate ctor; other attributes (like null count) are inherited from the
   // passed indices array
   static Status FromBuffer(const std::shared_ptr<DataType>& type, int32_t length,
-      const std::shared_ptr<Buffer>& indices, int32_t null_count,
-      const std::shared_ptr<Buffer>& null_bitmap, std::shared_ptr<DictionaryArray>* out);
+      const std::shared_ptr<Buffer>& indices, const std::shared_ptr<Buffer>& null_bitmap,
+      int32_t null_count, int32_t offset, std::shared_ptr<DictionaryArray>* out);
 
   Status Validate() const override;
 
@@ -431,6 +495,8 @@ class ARROW_EXPORT DictionaryArray : public Array {
 
   Status Accept(ArrayVisitor* visitor) const override;
 
+  std::shared_ptr<Array> Slice(int32_t offset, int32_t length) const override;
+
  protected:
   const DictionaryType* dict_type_;
   std::shared_ptr<Array> indices_;
@@ -471,8 +537,9 @@ extern template class ARROW_EXPORT NumericArray<TimeType>;
 
 // Create new arrays for logical types that are backed by primitive arrays.
 Status ARROW_EXPORT MakePrimitiveArray(const std::shared_ptr<DataType>& type,
-    int32_t length, const std::shared_ptr<Buffer>& data, int32_t null_count,
-    const std::shared_ptr<Buffer>& null_bitmap, std::shared_ptr<Array>* out);
+    int32_t length, const std::shared_ptr<Buffer>& data,
+    const std::shared_ptr<Buffer>& null_bitmap, int32_t null_count, int32_t offset,
+    std::shared_ptr<Array>* out);
 
 }  // namespace arrow
 

http://git-wip-us.apache.org/repos/asf/arrow/blob/5439b715/cpp/src/arrow/buffer.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/buffer.cc b/cpp/src/arrow/buffer.cc
index 6cce0ef..fb5a010 100644
--- a/cpp/src/arrow/buffer.cc
+++ b/cpp/src/arrow/buffer.cc
@@ -116,4 +116,20 @@ Status PoolBuffer::Resize(int64_t new_size, bool shrink_to_fit) {
   return Status::OK();
 }
 
+Status AllocateBuffer(
+    MemoryPool* pool, int64_t size, std::shared_ptr<MutableBuffer>* out) {
+  auto buffer = std::make_shared<PoolBuffer>(pool);
+  RETURN_NOT_OK(buffer->Resize(size));
+  *out = buffer;
+  return Status::OK();
+}
+
+Status AllocateResizableBuffer(
+    MemoryPool* pool, int64_t size, std::shared_ptr<ResizableBuffer>* out) {
+  auto buffer = std::make_shared<PoolBuffer>(pool);
+  RETURN_NOT_OK(buffer->Resize(size));
+  *out = buffer;
+  return Status::OK();
+}
+
 }  // namespace arrow

http://git-wip-us.apache.org/repos/asf/arrow/blob/5439b715/cpp/src/arrow/buffer.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/buffer.h b/cpp/src/arrow/buffer.h
index d43ab03..9c400b1 100644
--- a/cpp/src/arrow/buffer.h
+++ b/cpp/src/arrow/buffer.h
@@ -15,8 +15,8 @@
 // specific language governing permissions and limitations
 // under the License.
 
-#ifndef ARROW_UTIL_BUFFER_H
-#define ARROW_UTIL_BUFFER_H
+#ifndef ARROW_BUFFER_H
+#define ARROW_BUFFER_H
 
 #include <algorithm>
 #include <cstdint>
@@ -105,7 +105,7 @@ class ARROW_EXPORT Buffer : public std::enable_shared_from_this<Buffer> {
 
 /// Construct a view on passed buffer at the indicated offset and length. This
 /// function cannot fail and does not error checking (except in debug builds)
-ARROW_EXPORT std::shared_ptr<Buffer> SliceBuffer(
+std::shared_ptr<Buffer> ARROW_EXPORT SliceBuffer(
     const std::shared_ptr<Buffer>& buffer, int64_t offset, int64_t length);
 
 /// A Buffer whose contents can be mutated. May or may not own its data.
@@ -232,6 +232,19 @@ class ARROW_EXPORT BufferBuilder {
   int64_t size_;
 };
 
+/// Allocate a new mutable buffer from a memory pool
+///
+/// \param[in] pool a memory pool
+/// \param[in] size size of buffer to allocate
+/// \param[out] out the allocated buffer with padding
+///
+/// \return Status message
+Status ARROW_EXPORT AllocateBuffer(
+    MemoryPool* pool, int64_t size, std::shared_ptr<MutableBuffer>* out);
+
+Status ARROW_EXPORT AllocateResizableBuffer(
+    MemoryPool* pool, int64_t size, std::shared_ptr<ResizableBuffer>* out);
+
 }  // namespace arrow
 
-#endif  // ARROW_UTIL_BUFFER_H
+#endif  // ARROW_BUFFER_H

http://git-wip-us.apache.org/repos/asf/arrow/blob/5439b715/cpp/src/arrow/builder.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/builder.cc b/cpp/src/arrow/builder.cc
index b0dc41b..dddadee 100644
--- a/cpp/src/arrow/builder.cc
+++ b/cpp/src/arrow/builder.cc
@@ -185,7 +185,7 @@ Status PrimitiveBuilder<T>::Finish(std::shared_ptr<Array>* out) {
     RETURN_NOT_OK(data_->Resize(bytes_required));
   }
   *out = std::make_shared<typename TypeTraits<T>::ArrayType>(
-      type_, length_, data_, null_count_, null_bitmap_);
+      type_, length_, data_, null_bitmap_, null_count_);
 
   data_ = null_bitmap_ = nullptr;
   capacity_ = length_ = null_count_ = 0;
@@ -202,10 +202,19 @@ template class PrimitiveBuilder<Int32Type>;
 template class PrimitiveBuilder<Int64Type>;
 template class PrimitiveBuilder<DateType>;
 template class PrimitiveBuilder<TimestampType>;
+template class PrimitiveBuilder<TimeType>;
 template class PrimitiveBuilder<HalfFloatType>;
 template class PrimitiveBuilder<FloatType>;
 template class PrimitiveBuilder<DoubleType>;
 
+BooleanBuilder::BooleanBuilder(MemoryPool* pool)
+    : ArrayBuilder(pool, boolean()), data_(nullptr), raw_data_(nullptr) {}
+
+BooleanBuilder::BooleanBuilder(MemoryPool* pool, const std::shared_ptr<DataType>& type)
+    : BooleanBuilder(pool) {
+  DCHECK_EQ(Type::BOOL, type->type);
+}
+
 Status BooleanBuilder::Init(int32_t capacity) {
   RETURN_NOT_OK(ArrayBuilder::Init(capacity));
   data_ = std::make_shared<PoolBuffer>(pool_);
@@ -244,7 +253,7 @@ Status BooleanBuilder::Finish(std::shared_ptr<Array>* out) {
     // Trim buffers
     RETURN_NOT_OK(data_->Resize(bytes_required));
   }
-  *out = std::make_shared<BooleanArray>(type_, length_, data_, null_count_, null_bitmap_);
+  *out = std::make_shared<BooleanArray>(type_, length_, data_, null_bitmap_, null_count_);
 
   data_ = null_bitmap_ = nullptr;
   capacity_ = length_ = null_count_ = 0;
@@ -313,7 +322,7 @@ Status ListBuilder::Finish(std::shared_ptr<Array>* out) {
   std::shared_ptr<Buffer> offsets = offset_builder_.Finish();
 
   *out = std::make_shared<ListArray>(
-      type_, length_, offsets, items, null_count_, null_bitmap_);
+      type_, length_, offsets, items, null_bitmap_, null_count_);
 
   Reset();
 
@@ -333,14 +342,13 @@ std::shared_ptr<ArrayBuilder> ListBuilder::value_builder() const {
 // ----------------------------------------------------------------------
 // String and binary
 
-// This used to be a static member variable of BinaryBuilder, but it can cause
-// valgrind to report a (spurious?) memory leak when needed in other shared
-// libraries. The problem came up while adding explicit visibility to libarrow
-// and libparquet_arrow
-static TypePtr kBinaryValueType = TypePtr(new UInt8Type());
+BinaryBuilder::BinaryBuilder(MemoryPool* pool)
+    : ListBuilder(pool, std::make_shared<UInt8Builder>(pool, uint8()), binary()) {
+  byte_builder_ = static_cast<UInt8Builder*>(value_builder_.get());
+}
 
 BinaryBuilder::BinaryBuilder(MemoryPool* pool, const TypePtr& type)
-    : ListBuilder(pool, std::make_shared<UInt8Builder>(pool, kBinaryValueType), type) {
+    : ListBuilder(pool, std::make_shared<UInt8Builder>(pool, uint8()), type) {
   byte_builder_ = static_cast<UInt8Builder*>(value_builder_.get());
 }
 
@@ -351,11 +359,13 @@ Status BinaryBuilder::Finish(std::shared_ptr<Array>* out) {
   const auto list = std::dynamic_pointer_cast<ListArray>(result);
   auto values = std::dynamic_pointer_cast<UInt8Array>(list->values());
 
-  *out = std::make_shared<BinaryArray>(list->length(), list->offsets(), values->data(),
-      list->null_count(), list->null_bitmap());
+  *out = std::make_shared<BinaryArray>(list->length(), list->value_offsets(),
+      values->data(), list->null_bitmap(), list->null_count());
   return Status::OK();
 }
 
+StringBuilder::StringBuilder(MemoryPool* pool) : BinaryBuilder(pool, utf8()) {}
+
 Status StringBuilder::Finish(std::shared_ptr<Array>* out) {
   std::shared_ptr<Array> result;
   RETURN_NOT_OK(ListBuilder::Finish(&result));
@@ -363,8 +373,8 @@ Status StringBuilder::Finish(std::shared_ptr<Array>* out) {
   const auto list = std::dynamic_pointer_cast<ListArray>(result);
   auto values = std::dynamic_pointer_cast<UInt8Array>(list->values());
 
-  *out = std::make_shared<StringArray>(list->length(), list->offsets(), values->data(),
-      list->null_count(), list->null_bitmap());
+  *out = std::make_shared<StringArray>(list->length(), list->value_offsets(),
+      values->data(), list->null_bitmap(), list->null_count());
   return Status::OK();
 }
 
@@ -377,7 +387,7 @@ Status StructBuilder::Finish(std::shared_ptr<Array>* out) {
     RETURN_NOT_OK(field_builders_[i]->Finish(&fields[i]));
   }
 
-  *out = std::make_shared<StructArray>(type_, length_, fields, null_count_, null_bitmap_);
+  *out = std::make_shared<StructArray>(type_, length_, fields, null_bitmap_, null_count_);
 
   null_bitmap_ = nullptr;
   capacity_ = length_ = null_count_ = 0;
@@ -393,9 +403,9 @@ std::shared_ptr<ArrayBuilder> StructBuilder::field_builder(int pos) const {
 // ----------------------------------------------------------------------
 // Helper functions
 
-#define BUILDER_CASE(ENUM, BuilderType)      \
-  case Type::ENUM:                           \
-    out->reset(new BuilderType(pool, type)); \
+#define BUILDER_CASE(ENUM, BuilderType) \
+  case Type::ENUM:                      \
+    out->reset(new BuilderType(pool));  \
     return Status::OK();
 
 // Initially looked at doing this with vtables, but shared pointers makes it
@@ -414,19 +424,17 @@ Status MakeBuilder(MemoryPool* pool, const std::shared_ptr<DataType>& type,
     BUILDER_CASE(UINT64, UInt64Builder);
     BUILDER_CASE(INT64, Int64Builder);
     BUILDER_CASE(DATE, DateBuilder);
-    BUILDER_CASE(TIMESTAMP, TimestampBuilder);
-
-    BUILDER_CASE(BOOL, BooleanBuilder);
-
-    BUILDER_CASE(FLOAT, FloatBuilder);
-    BUILDER_CASE(DOUBLE, DoubleBuilder);
-
-    case Type::STRING:
-      out->reset(new StringBuilder(pool));
+    case Type::TIMESTAMP:
+      out->reset(new TimestampBuilder(pool, type));
       return Status::OK();
-    case Type::BINARY:
-      out->reset(new BinaryBuilder(pool, type));
+    case Type::TIME:
+      out->reset(new TimeBuilder(pool, type));
       return Status::OK();
+      BUILDER_CASE(BOOL, BooleanBuilder);
+      BUILDER_CASE(FLOAT, FloatBuilder);
+      BUILDER_CASE(DOUBLE, DoubleBuilder);
+      BUILDER_CASE(STRING, StringBuilder);
+      BUILDER_CASE(BINARY, BinaryBuilder);
     case Type::LIST: {
       std::shared_ptr<ArrayBuilder> value_builder;
       std::shared_ptr<DataType> value_type =

http://git-wip-us.apache.org/repos/asf/arrow/blob/5439b715/cpp/src/arrow/builder.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/builder.h b/cpp/src/arrow/builder.h
index 672d2d8..0b83b9f 100644
--- a/cpp/src/arrow/builder.h
+++ b/cpp/src/arrow/builder.h
@@ -141,9 +141,7 @@ class ARROW_EXPORT PrimitiveBuilder : public ArrayBuilder {
   using value_type = typename Type::c_type;
 
   explicit PrimitiveBuilder(MemoryPool* pool, const TypePtr& type)
-      : ArrayBuilder(pool, type), data_(nullptr) {}
-
-  virtual ~PrimitiveBuilder() {}
+      : ArrayBuilder(pool, type), data_(nullptr), raw_data_(nullptr) {}
 
   using ArrayBuilder::Advance;
 
@@ -233,6 +231,7 @@ using Int16Builder = NumericBuilder<Int16Type>;
 using Int32Builder = NumericBuilder<Int32Type>;
 using Int64Builder = NumericBuilder<Int64Type>;
 using TimestampBuilder = NumericBuilder<TimestampType>;
+using TimeBuilder = NumericBuilder<TimeType>;
 using DateBuilder = NumericBuilder<DateType>;
 
 using HalfFloatBuilder = NumericBuilder<HalfFloatType>;
@@ -241,10 +240,8 @@ using DoubleBuilder = NumericBuilder<DoubleType>;
 
 class ARROW_EXPORT BooleanBuilder : public ArrayBuilder {
  public:
-  explicit BooleanBuilder(MemoryPool* pool, const TypePtr& type = boolean())
-      : ArrayBuilder(pool, type), data_(nullptr) {}
-
-  virtual ~BooleanBuilder() {}
+  explicit BooleanBuilder(MemoryPool* pool);
+  explicit BooleanBuilder(MemoryPool* pool, const std::shared_ptr<DataType>& type);
 
   using ArrayBuilder::Advance;
 
@@ -321,8 +318,6 @@ class ARROW_EXPORT ListBuilder : public ArrayBuilder {
   ListBuilder(
       MemoryPool* pool, std::shared_ptr<Array> values, const TypePtr& type = nullptr);
 
-  virtual ~ListBuilder() {}
-
   Status Init(int32_t elements) override;
   Status Resize(int32_t capacity) override;
   Status Finish(std::shared_ptr<Array>* out) override;
@@ -368,8 +363,8 @@ class ARROW_EXPORT ListBuilder : public ArrayBuilder {
 // BinaryBuilder : public ListBuilder
 class ARROW_EXPORT BinaryBuilder : public ListBuilder {
  public:
+  explicit BinaryBuilder(MemoryPool* pool);
   explicit BinaryBuilder(MemoryPool* pool, const TypePtr& type);
-  virtual ~BinaryBuilder() {}
 
   Status Append(const uint8_t* value, int32_t length) {
     RETURN_NOT_OK(ListBuilder::Append());
@@ -391,11 +386,7 @@ class ARROW_EXPORT BinaryBuilder : public ListBuilder {
 // String builder
 class ARROW_EXPORT StringBuilder : public BinaryBuilder {
  public:
-  explicit StringBuilder(MemoryPool* pool = default_memory_pool())
-      : BinaryBuilder(pool, utf8()) {}
-
-  explicit StringBuilder(MemoryPool* pool, const std::shared_ptr<DataType>& type)
-      : BinaryBuilder(pool, type) {}
+  explicit StringBuilder(MemoryPool* pool);
 
   using BinaryBuilder::Append;
 

http://git-wip-us.apache.org/repos/asf/arrow/blob/5439b715/cpp/src/arrow/column-test.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/column-test.cc b/cpp/src/arrow/column-test.cc
index 1e722ed..0bbfc83 100644
--- a/cpp/src/arrow/column-test.cc
+++ b/cpp/src/arrow/column-test.cc
@@ -51,7 +51,7 @@ TEST_F(TestChunkedArray, BasicEquals) {
   std::vector<bool> null_bitmap(100, true);
   std::vector<int32_t> data(100, 1);
   std::shared_ptr<Array> array;
-  ArrayFromVector<Int32Type, int32_t>(int32(), null_bitmap, data, &array);
+  ArrayFromVector<Int32Type, int32_t>(null_bitmap, data, &array);
   arrays_one_.push_back(array);
   arrays_another_.push_back(array);
 
@@ -67,9 +67,9 @@ TEST_F(TestChunkedArray, EqualsDifferingTypes) {
   std::vector<int32_t> data32(100, 1);
   std::vector<int64_t> data64(100, 1);
   std::shared_ptr<Array> array;
-  ArrayFromVector<Int32Type, int32_t>(int32(), null_bitmap, data32, &array);
+  ArrayFromVector<Int32Type, int32_t>(null_bitmap, data32, &array);
   arrays_one_.push_back(array);
-  ArrayFromVector<Int64Type, int64_t>(int64(), null_bitmap, data64, &array);
+  ArrayFromVector<Int64Type, int64_t>(null_bitmap, data64, &array);
   arrays_another_.push_back(array);
 
   Construct();
@@ -83,9 +83,9 @@ TEST_F(TestChunkedArray, EqualsDifferingLengths) {
   std::vector<int32_t> data100(100, 1);
   std::vector<int32_t> data101(101, 1);
   std::shared_ptr<Array> array;
-  ArrayFromVector<Int32Type, int32_t>(int32(), null_bitmap100, data100, &array);
+  ArrayFromVector<Int32Type, int32_t>(null_bitmap100, data100, &array);
   arrays_one_.push_back(array);
-  ArrayFromVector<Int32Type, int32_t>(int32(), null_bitmap101, data101, &array);
+  ArrayFromVector<Int32Type, int32_t>(null_bitmap101, data101, &array);
   arrays_another_.push_back(array);
 
   Construct();
@@ -94,7 +94,7 @@ TEST_F(TestChunkedArray, EqualsDifferingLengths) {
 
   std::vector<bool> null_bitmap1(1, true);
   std::vector<int32_t> data1(1, 1);
-  ArrayFromVector<Int32Type, int32_t>(int32(), null_bitmap1, data1, &array);
+  ArrayFromVector<Int32Type, int32_t>(null_bitmap1, data1, &array);
   arrays_one_.push_back(array);
 
   Construct();
@@ -156,7 +156,7 @@ TEST_F(TestColumn, Equals) {
   std::vector<bool> null_bitmap(100, true);
   std::vector<int32_t> data(100, 1);
   std::shared_ptr<Array> array;
-  ArrayFromVector<Int32Type, int32_t>(int32(), null_bitmap, data, &array);
+  ArrayFromVector<Int32Type, int32_t>(null_bitmap, data, &array);
   arrays_one_.push_back(array);
   arrays_another_.push_back(array);
 


[2/3] arrow git commit: ARROW-33: [C++] Implement zero-copy array slicing, integrate with IPC code paths

Posted by we...@apache.org.
http://git-wip-us.apache.org/repos/asf/arrow/blob/5439b715/cpp/src/arrow/compare.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/compare.cc b/cpp/src/arrow/compare.cc
index d039bba..27fad71 100644
--- a/cpp/src/arrow/compare.cc
+++ b/cpp/src/arrow/compare.cc
@@ -76,10 +76,10 @@ class RangeEqualsVisitor : public ArrayVisitor {
       const bool is_null = left.IsNull(i);
       if (is_null != right.IsNull(o_i)) { return false; }
       if (is_null) continue;
-      const int32_t begin_offset = left.offset(i);
-      const int32_t end_offset = left.offset(i + 1);
-      const int32_t right_begin_offset = right.offset(o_i);
-      const int32_t right_end_offset = right.offset(o_i + 1);
+      const int32_t begin_offset = left.value_offset(i);
+      const int32_t end_offset = left.value_offset(i + 1);
+      const int32_t right_begin_offset = right.value_offset(o_i);
+      const int32_t right_end_offset = right.value_offset(o_i + 1);
       // Underlying can't be equal if the size isn't equal
       if (end_offset - begin_offset != right_end_offset - right_begin_offset) {
         return false;
@@ -169,10 +169,10 @@ class RangeEqualsVisitor : public ArrayVisitor {
       const bool is_null = left.IsNull(i);
       if (is_null != right.IsNull(o_i)) { return false; }
       if (is_null) continue;
-      const int32_t begin_offset = left.offset(i);
-      const int32_t end_offset = left.offset(i + 1);
-      const int32_t right_begin_offset = right.offset(o_i);
-      const int32_t right_end_offset = right.offset(o_i + 1);
+      const int32_t begin_offset = left.value_offset(i);
+      const int32_t end_offset = left.value_offset(i + 1);
+      const int32_t right_begin_offset = right.value_offset(o_i);
+      const int32_t right_end_offset = right.value_offset(o_i + 1);
       // Underlying can't be equal if the size isn't equal
       if (end_offset - begin_offset != right_end_offset - right_begin_offset) {
         return false;
@@ -200,7 +200,11 @@ class RangeEqualsVisitor : public ArrayVisitor {
       for (size_t j = 0; j < left.fields().size(); ++j) {
         // TODO: really we should be comparing stretches of non-null data rather
         // than looking at one value at a time.
-        equal_fields = left.field(j)->RangeEquals(i, i + 1, o_i, right.field(j));
+        const int left_abs_index = i + left.offset();
+        const int right_abs_index = o_i + right.offset();
+
+        equal_fields = left.field(j)->RangeEquals(
+            left_abs_index, left_abs_index + 1, right_abs_index, right.field(j));
         if (!equal_fields) { return false; }
       }
     }
@@ -223,7 +227,7 @@ class RangeEqualsVisitor : public ArrayVisitor {
     // Define a mapping from the type id to child number
     uint8_t max_code = 0;
 
-    const std::vector<uint8_t> type_codes = left_type.type_ids;
+    const std::vector<uint8_t> type_codes = left_type.type_codes;
     for (size_t i = 0; i < type_codes.size(); ++i) {
       const uint8_t code = type_codes[i];
       if (code > max_code) { max_code = code; }
@@ -248,15 +252,19 @@ class RangeEqualsVisitor : public ArrayVisitor {
       id = left_ids[i];
       child_num = type_id_to_child_num[id];
 
+      const int left_abs_index = i + left.offset();
+      const int right_abs_index = o_i + right.offset();
+
       // TODO(wesm): really we should be comparing stretches of non-null data
       // rather than looking at one value at a time.
       if (union_mode == UnionMode::SPARSE) {
-        if (!left.child(child_num)->RangeEquals(i, i + 1, o_i, right.child(child_num))) {
+        if (!left.child(child_num)->RangeEquals(left_abs_index, left_abs_index + 1,
+                right_abs_index, right.child(child_num))) {
           return false;
         }
       } else {
-        const int32_t offset = left.raw_offsets()[i];
-        const int32_t o_offset = right.raw_offsets()[i];
+        const int32_t offset = left.raw_value_offsets()[i];
+        const int32_t o_offset = right.raw_value_offsets()[i];
         if (!left.child(child_num)->RangeEquals(
                 offset, offset + 1, o_offset, right.child(child_num))) {
           return false;
@@ -315,20 +323,29 @@ class EqualsVisitor : public RangeEqualsVisitor {
       }
       result_ = true;
     } else {
-      result_ = left.data()->Equals(*right.data(), BitUtil::BytesForBits(left.length()));
+      result_ = BitmapEquals(left.data()->data(), left.offset(), right.data()->data(),
+          right.offset(), left.length());
     }
     return Status::OK();
   }
 
   bool IsEqualPrimitive(const PrimitiveArray& left) {
     const auto& right = static_cast<const PrimitiveArray&>(right_);
-    if (left.null_count() > 0) {
-      const uint8_t* left_data = left.data()->data();
-      const uint8_t* right_data = right.data()->data();
-      const auto& size_meta = dynamic_cast<const FixedWidthType&>(*left.type());
-      const int value_byte_size = size_meta.bit_width() / 8;
-      DCHECK_GT(value_byte_size, 0);
+    const auto& size_meta = dynamic_cast<const FixedWidthType&>(*left.type());
+    const int value_byte_size = size_meta.bit_width() / 8;
+    DCHECK_GT(value_byte_size, 0);
+
+    const uint8_t* left_data = nullptr;
+    if (left.length() > 0) {
+      left_data = left.data()->data() + left.offset() * value_byte_size;
+    }
+
+    const uint8_t* right_data = nullptr;
+    if (right.length() > 0) {
+      right_data = right.data()->data() + right.offset() * value_byte_size;
+    }
 
+    if (left.null_count() > 0) {
       for (int i = 0; i < left.length(); ++i) {
         if (!left.IsNull(i) && memcmp(left_data, right_data, value_byte_size)) {
           return false;
@@ -339,7 +356,7 @@ class EqualsVisitor : public RangeEqualsVisitor {
       return true;
     } else {
       if (left.length() == 0) { return true; }
-      return left.data()->Equals(*right.data(), left.length());
+      return memcmp(left_data, right_data, value_byte_size * left.length()) == 0;
     }
   }
 
@@ -376,13 +393,46 @@ class EqualsVisitor : public RangeEqualsVisitor {
 
   Status Visit(const IntervalArray& left) override { return ComparePrimitive(left); }
 
+  template <typename ArrayType>
+  bool ValueOffsetsEqual(const ArrayType& left) {
+    const auto& right = static_cast<const ArrayType&>(right_);
+
+    if (left.offset() == 0 && right.offset() == 0) {
+      return left.value_offsets()->Equals(
+          *right.value_offsets(), (left.length() + 1) * sizeof(int32_t));
+    } else {
+      // One of the arrays is sliced; logic is more complicated because the
+      // value offsets are not both 0-based
+      auto left_offsets =
+          reinterpret_cast<const int32_t*>(left.value_offsets()->data()) + left.offset();
+      auto right_offsets =
+          reinterpret_cast<const int32_t*>(right.value_offsets()->data()) +
+          right.offset();
+
+      for (int32_t i = 0; i < left.length() + 1; ++i) {
+        if (left_offsets[i] - left_offsets[0] != right_offsets[i] - right_offsets[0]) {
+          return false;
+        }
+      }
+      return true;
+    }
+  }
+
   bool CompareBinary(const BinaryArray& left) {
     const auto& right = static_cast<const BinaryArray&>(right_);
-    bool equal_offsets =
-        left.offsets()->Equals(*right.offsets(), (left.length() + 1) * sizeof(int32_t));
+
+    bool equal_offsets = ValueOffsetsEqual<BinaryArray>(left);
     if (!equal_offsets) { return false; }
-    if (!left.data() && !(right.data())) { return true; }
-    return left.data()->Equals(*right.data(), left.raw_offsets()[left.length()]);
+
+    if (left.offset() == 0 && right.offset() == 0) {
+      if (!left.data() && !(right.data())) { return true; }
+      return left.data()->Equals(*right.data(), left.raw_value_offsets()[left.length()]);
+    } else {
+      // Compare the corresponding data range
+      const int64_t total_bytes = left.value_offset(left.length()) - left.value_offset(0);
+      return std::memcmp(left.data()->data() + left.value_offset(0),
+                 right.data()->data() + right.value_offset(0), total_bytes) == 0;
+    }
   }
 
   Status Visit(const StringArray& left) override {
@@ -397,12 +447,20 @@ class EqualsVisitor : public RangeEqualsVisitor {
 
   Status Visit(const ListArray& left) override {
     const auto& right = static_cast<const ListArray&>(right_);
-    if (!left.offsets()->Equals(
-            *right.offsets(), (left.length() + 1) * sizeof(int32_t))) {
+    bool equal_offsets = ValueOffsetsEqual<ListArray>(left);
+    if (!equal_offsets) {
       result_ = false;
-    } else {
+      return Status::OK();
+    }
+
+    if (left.offset() == 0 && right.offset() == 0) {
       result_ = left.values()->Equals(right.values());
+    } else {
+      // One of the arrays is sliced
+      result_ = left.values()->RangeEquals(left.value_offset(0),
+          left.value_offset(left.length()), right.value_offset(0), right.values());
     }
+
     return Status::OK();
   }
 
@@ -422,8 +480,8 @@ inline bool FloatingApproxEquals(
     const NumericArray<TYPE>& left, const NumericArray<TYPE>& right) {
   using T = typename TYPE::c_type;
 
-  auto left_data = reinterpret_cast<const T*>(left.data()->data());
-  auto right_data = reinterpret_cast<const T*>(right.data()->data());
+  const T* left_data = left.raw_data();
+  const T* right_data = right.raw_data();
 
   static constexpr T EPSILON = 1E-5;
 
@@ -465,8 +523,8 @@ static bool BaseDataEquals(const Array& left, const Array& right) {
     return false;
   }
   if (left.null_count() > 0) {
-    return left.null_bitmap()->Equals(
-        *right.null_bitmap(), BitUtil::BytesForBits(left.length()));
+    return BitmapEquals(left.null_bitmap()->data(), left.offset(),
+        right.null_bitmap()->data(), right.offset(), left.length());
   }
   return true;
 }

http://git-wip-us.apache.org/repos/asf/arrow/blob/5439b715/cpp/src/arrow/io/file.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/io/file.cc b/cpp/src/arrow/io/file.cc
index ff58e53..c1f0ea4 100644
--- a/cpp/src/arrow/io/file.cc
+++ b/cpp/src/arrow/io/file.cc
@@ -401,8 +401,8 @@ class ReadableFile::ReadableFileImpl : public OSFile {
   Status Open(const std::string& path) { return OpenReadable(path); }
 
   Status ReadBuffer(int64_t nbytes, std::shared_ptr<Buffer>* out) {
-    auto buffer = std::make_shared<PoolBuffer>(pool_);
-    RETURN_NOT_OK(buffer->Resize(nbytes));
+    std::shared_ptr<ResizableBuffer> buffer;
+    RETURN_NOT_OK(AllocateResizableBuffer(pool_, nbytes, &buffer));
 
     int64_t bytes_read = 0;
     RETURN_NOT_OK(Read(nbytes, &bytes_read, buffer->mutable_data()));

http://git-wip-us.apache.org/repos/asf/arrow/blob/5439b715/cpp/src/arrow/io/hdfs.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/io/hdfs.cc b/cpp/src/arrow/io/hdfs.cc
index 2845b0d..5682f44 100644
--- a/cpp/src/arrow/io/hdfs.cc
+++ b/cpp/src/arrow/io/hdfs.cc
@@ -125,8 +125,8 @@ class HdfsReadableFile::HdfsReadableFileImpl : public HdfsAnyFileImpl {
   }
 
   Status ReadAt(int64_t position, int64_t nbytes, std::shared_ptr<Buffer>* out) {
-    auto buffer = std::make_shared<PoolBuffer>(pool_);
-    RETURN_NOT_OK(buffer->Resize(nbytes));
+    std::shared_ptr<ResizableBuffer> buffer;
+    RETURN_NOT_OK(AllocateResizableBuffer(pool_, nbytes, &buffer));
 
     int64_t bytes_read = 0;
     RETURN_NOT_OK(ReadAt(position, nbytes, &bytes_read, buffer->mutable_data()));
@@ -152,8 +152,8 @@ class HdfsReadableFile::HdfsReadableFileImpl : public HdfsAnyFileImpl {
   }
 
   Status Read(int64_t nbytes, std::shared_ptr<Buffer>* out) {
-    auto buffer = std::make_shared<PoolBuffer>(pool_);
-    RETURN_NOT_OK(buffer->Resize(nbytes));
+    std::shared_ptr<ResizableBuffer> buffer;
+    RETURN_NOT_OK(AllocateResizableBuffer(pool_, nbytes, &buffer));
 
     int64_t bytes_read = 0;
     RETURN_NOT_OK(Read(nbytes, &bytes_read, buffer->mutable_data()));

http://git-wip-us.apache.org/repos/asf/arrow/blob/5439b715/cpp/src/arrow/io/io-hdfs-test.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/io/io-hdfs-test.cc b/cpp/src/arrow/io/io-hdfs-test.cc
index 72e0ba8..f0e5a28 100644
--- a/cpp/src/arrow/io/io-hdfs-test.cc
+++ b/cpp/src/arrow/io/io-hdfs-test.cc
@@ -336,8 +336,9 @@ TYPED_TEST(TestHdfsClient, LargeFile) {
   std::shared_ptr<HdfsReadableFile> file;
   ASSERT_OK(this->client_->OpenReadable(path, &file));
 
-  auto buffer = std::make_shared<PoolBuffer>();
-  ASSERT_OK(buffer->Resize(size));
+  std::shared_ptr<MutableBuffer> buffer;
+  ASSERT_OK(AllocateBuffer(nullptr, size, &buffer));
+
   int64_t bytes_read = 0;
 
   ASSERT_OK(file->Read(size, &bytes_read, buffer->mutable_data()));
@@ -348,8 +349,9 @@ TYPED_TEST(TestHdfsClient, LargeFile) {
   std::shared_ptr<HdfsReadableFile> file2;
   ASSERT_OK(this->client_->OpenReadable(path, 1 << 18, &file2));
 
-  auto buffer2 = std::make_shared<PoolBuffer>();
-  ASSERT_OK(buffer2->Resize(size));
+  std::shared_ptr<MutableBuffer> buffer2;
+  ASSERT_OK(AllocateBuffer(nullptr, size, &buffer2));
+
   ASSERT_OK(file2->Read(size, &bytes_read, buffer2->mutable_data()));
   ASSERT_EQ(0, std::memcmp(buffer2->data(), data.data(), size));
   ASSERT_EQ(size, bytes_read);

http://git-wip-us.apache.org/repos/asf/arrow/blob/5439b715/cpp/src/arrow/io/io-memory-test.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/io/io-memory-test.cc b/cpp/src/arrow/io/io-memory-test.cc
index c0b0165..442cd0c 100644
--- a/cpp/src/arrow/io/io-memory-test.cc
+++ b/cpp/src/arrow/io/io-memory-test.cc
@@ -73,8 +73,8 @@ TEST(TestBufferReader, RetainParentReference) {
   std::shared_ptr<Buffer> slice1;
   std::shared_ptr<Buffer> slice2;
   {
-    auto buffer = std::make_shared<PoolBuffer>();
-    ASSERT_OK(buffer->Resize(static_cast<int64_t>(data.size())));
+    std::shared_ptr<MutableBuffer> buffer;
+    ASSERT_OK(AllocateBuffer(nullptr, static_cast<int64_t>(data.size()), &buffer));
     std::memcpy(buffer->mutable_data(), data.c_str(), data.size());
     BufferReader reader(buffer);
     ASSERT_OK(reader.Read(4, &slice1));

http://git-wip-us.apache.org/repos/asf/arrow/blob/5439b715/cpp/src/arrow/ipc/adapter.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/adapter.cc b/cpp/src/arrow/ipc/adapter.cc
index c8e631c..3613ccb 100644
--- a/cpp/src/arrow/ipc/adapter.cc
+++ b/cpp/src/arrow/ipc/adapter.cc
@@ -17,6 +17,7 @@
 
 #include "arrow/ipc/adapter.h"
 
+#include <algorithm>
 #include <cstdint>
 #include <cstring>
 #include <sstream>
@@ -30,6 +31,7 @@
 #include "arrow/ipc/metadata-internal.h"
 #include "arrow/ipc/metadata.h"
 #include "arrow/ipc/util.h"
+#include "arrow/memory_pool.h"
 #include "arrow/schema.h"
 #include "arrow/status.h"
 #include "arrow/table.h"
@@ -49,9 +51,10 @@ namespace ipc {
 
 class RecordBatchWriter : public ArrayVisitor {
  public:
-  RecordBatchWriter(
-      const RecordBatch& batch, int64_t buffer_start_offset, int max_recursion_depth)
-      : batch_(batch),
+  RecordBatchWriter(MemoryPool* pool, const RecordBatch& batch,
+      int64_t buffer_start_offset, int max_recursion_depth)
+      : pool_(pool),
+        batch_(batch),
         max_recursion_depth_(max_recursion_depth),
         buffer_start_offset_(buffer_start_offset) {}
 
@@ -62,7 +65,15 @@ class RecordBatchWriter : public ArrayVisitor {
     // push back all common elements
     field_nodes_.push_back(flatbuf::FieldNode(arr.length(), arr.null_count()));
     if (arr.null_count() > 0) {
-      buffers_.push_back(arr.null_bitmap());
+      std::shared_ptr<Buffer> bitmap = arr.null_bitmap();
+
+      if (arr.offset() != 0) {
+        // With a sliced array / non-zero offset, we must copy the bitmap
+        RETURN_NOT_OK(
+            CopyBitmap(pool_, bitmap->data(), arr.offset(), arr.length(), &bitmap));
+      }
+
+      buffers_.push_back(bitmap);
     } else {
       // Push a dummy zero-length buffer, not to be copied
       buffers_.push_back(std::make_shared<Buffer>(nullptr, 0));
@@ -208,50 +219,136 @@ class RecordBatchWriter : public ArrayVisitor {
  private:
   Status Visit(const NullArray& array) override { return Status::NotImplemented("null"); }
 
-  Status VisitPrimitive(const PrimitiveArray& array) {
-    buffers_.push_back(array.data());
+  template <typename ArrayType>
+  Status VisitFixedWidth(const ArrayType& array) {
+    std::shared_ptr<Buffer> data_buffer = array.data();
+
+    if (array.offset() != 0) {
+      // Non-zero offset, slice the buffer
+      const auto& fw_type = static_cast<const FixedWidthType&>(*array.type());
+      const int type_width = fw_type.bit_width() / 8;
+      const int64_t byte_offset = array.offset() * type_width;
+
+      // Send padding if it's available
+      const int64_t buffer_length =
+          std::min(BitUtil::RoundUpToMultipleOf64(array.length() * type_width),
+              data_buffer->size() - byte_offset);
+      data_buffer = SliceBuffer(data_buffer, byte_offset, buffer_length);
+    }
+    buffers_.push_back(data_buffer);
+    return Status::OK();
+  }
+
+  template <typename ArrayType>
+  Status GetZeroBasedValueOffsets(
+      const ArrayType& array, std::shared_ptr<Buffer>* value_offsets) {
+    // Share slicing logic between ListArray and BinaryArray
+
+    auto offsets = array.value_offsets();
+
+    if (array.offset() != 0) {
+      // If we have a non-zero offset, then the value offsets do not start at
+      // zero. We must a) create a new offsets array with shifted offsets and
+      // b) slice the values array accordingly
+
+      std::shared_ptr<MutableBuffer> shifted_offsets;
+      RETURN_NOT_OK(AllocateBuffer(
+          pool_, sizeof(int32_t) * (array.length() + 1), &shifted_offsets));
+
+      int32_t* dest_offsets = reinterpret_cast<int32_t*>(shifted_offsets->mutable_data());
+      const int32_t start_offset = array.value_offset(0);
+
+      for (int i = 0; i < array.length(); ++i) {
+        dest_offsets[i] = array.value_offset(i) - start_offset;
+      }
+      // Final offset
+      dest_offsets[array.length()] = array.value_offset(array.length()) - start_offset;
+      offsets = shifted_offsets;
+    }
+
+    *value_offsets = offsets;
     return Status::OK();
   }
 
   Status VisitBinary(const BinaryArray& array) {
-    buffers_.push_back(array.offsets());
-    buffers_.push_back(array.data());
+    std::shared_ptr<Buffer> value_offsets;
+    RETURN_NOT_OK(GetZeroBasedValueOffsets<BinaryArray>(array, &value_offsets));
+    auto data = array.data();
+
+    if (array.offset() != 0) {
+      // Slice the data buffer to include only the range we need now
+      data = SliceBuffer(data, array.value_offset(0), array.value_offset(array.length()));
+    }
+
+    buffers_.push_back(value_offsets);
+    buffers_.push_back(data);
     return Status::OK();
   }
 
-  Status Visit(const BooleanArray& array) override { return VisitPrimitive(array); }
+  Status Visit(const BooleanArray& array) override {
+    buffers_.push_back(array.data());
+    return Status::OK();
+  }
 
-  Status Visit(const Int8Array& array) override { return VisitPrimitive(array); }
+  Status Visit(const Int8Array& array) override {
+    return VisitFixedWidth<Int8Array>(array);
+  }
 
-  Status Visit(const Int16Array& array) override { return VisitPrimitive(array); }
+  Status Visit(const Int16Array& array) override {
+    return VisitFixedWidth<Int16Array>(array);
+  }
 
-  Status Visit(const Int32Array& array) override { return VisitPrimitive(array); }
+  Status Visit(const Int32Array& array) override {
+    return VisitFixedWidth<Int32Array>(array);
+  }
 
-  Status Visit(const Int64Array& array) override { return VisitPrimitive(array); }
+  Status Visit(const Int64Array& array) override {
+    return VisitFixedWidth<Int64Array>(array);
+  }
 
-  Status Visit(const UInt8Array& array) override { return VisitPrimitive(array); }
+  Status Visit(const UInt8Array& array) override {
+    return VisitFixedWidth<UInt8Array>(array);
+  }
 
-  Status Visit(const UInt16Array& array) override { return VisitPrimitive(array); }
+  Status Visit(const UInt16Array& array) override {
+    return VisitFixedWidth<UInt16Array>(array);
+  }
 
-  Status Visit(const UInt32Array& array) override { return VisitPrimitive(array); }
+  Status Visit(const UInt32Array& array) override {
+    return VisitFixedWidth<UInt32Array>(array);
+  }
 
-  Status Visit(const UInt64Array& array) override { return VisitPrimitive(array); }
+  Status Visit(const UInt64Array& array) override {
+    return VisitFixedWidth<UInt64Array>(array);
+  }
 
-  Status Visit(const HalfFloatArray& array) override { return VisitPrimitive(array); }
+  Status Visit(const HalfFloatArray& array) override {
+    return VisitFixedWidth<HalfFloatArray>(array);
+  }
 
-  Status Visit(const FloatArray& array) override { return VisitPrimitive(array); }
+  Status Visit(const FloatArray& array) override {
+    return VisitFixedWidth<FloatArray>(array);
+  }
 
-  Status Visit(const DoubleArray& array) override { return VisitPrimitive(array); }
+  Status Visit(const DoubleArray& array) override {
+    return VisitFixedWidth<DoubleArray>(array);
+  }
 
   Status Visit(const StringArray& array) override { return VisitBinary(array); }
 
   Status Visit(const BinaryArray& array) override { return VisitBinary(array); }
 
-  Status Visit(const DateArray& array) override { return VisitPrimitive(array); }
+  Status Visit(const DateArray& array) override {
+    return VisitFixedWidth<DateArray>(array);
+  }
 
-  Status Visit(const TimeArray& array) override { return VisitPrimitive(array); }
+  Status Visit(const TimeArray& array) override {
+    return VisitFixedWidth<TimeArray>(array);
+  }
 
-  Status Visit(const TimestampArray& array) override { return VisitPrimitive(array); }
+  Status Visit(const TimestampArray& array) override {
+    return VisitFixedWidth<TimestampArray>(array);
+  }
 
   Status Visit(const IntervalArray& array) override {
     return Status::NotImplemented("interval");
@@ -262,30 +359,109 @@ class RecordBatchWriter : public ArrayVisitor {
   }
 
   Status Visit(const ListArray& array) override {
-    buffers_.push_back(array.offsets());
+    std::shared_ptr<Buffer> value_offsets;
+    RETURN_NOT_OK(GetZeroBasedValueOffsets<ListArray>(array, &value_offsets));
+    buffers_.push_back(value_offsets);
+
     --max_recursion_depth_;
-    RETURN_NOT_OK(VisitArray(*array.values().get()));
+    std::shared_ptr<Array> values = array.values();
+
+    if (array.offset() != 0) {
+      // For non-zero offset, we slice the values array accordingly
+      const int32_t offset = array.value_offset(0);
+      const int32_t length = array.value_offset(array.length()) - offset;
+      values = values->Slice(offset, length);
+    }
+    RETURN_NOT_OK(VisitArray(*values));
     ++max_recursion_depth_;
     return Status::OK();
   }
 
   Status Visit(const StructArray& array) override {
     --max_recursion_depth_;
-    for (const auto& field : array.fields()) {
-      RETURN_NOT_OK(VisitArray(*field.get()));
+    for (std::shared_ptr<Array> field : array.fields()) {
+      if (array.offset() != 0) {
+        // If offset is non-zero, slice the child array
+        field = field->Slice(array.offset(), array.length());
+      }
+      RETURN_NOT_OK(VisitArray(*field));
     }
     ++max_recursion_depth_;
     return Status::OK();
   }
 
   Status Visit(const UnionArray& array) override {
-    buffers_.push_back(array.type_ids());
+    auto type_ids = array.type_ids();
+    if (array.offset() != 0) {
+      type_ids = SliceBuffer(type_ids, array.offset() * sizeof(UnionArray::type_id_t),
+          array.length() * sizeof(UnionArray::type_id_t));
+    }
 
-    if (array.mode() == UnionMode::DENSE) { buffers_.push_back(array.offsets()); }
+    buffers_.push_back(type_ids);
 
     --max_recursion_depth_;
-    for (const auto& field : array.children()) {
-      RETURN_NOT_OK(VisitArray(*field.get()));
+    if (array.mode() == UnionMode::DENSE) {
+      const auto& type = static_cast<const UnionType&>(*array.type());
+      auto value_offsets = array.value_offsets();
+
+      // The Union type codes are not necessary 0-indexed
+      uint8_t max_code = 0;
+      for (uint8_t code : type.type_codes) {
+        if (code > max_code) { max_code = code; }
+      }
+
+      // Allocate an array of child offsets. Set all to -1 to indicate that we
+      // haven't observed a first occurrence of a particular child yet
+      std::vector<int32_t> child_offsets(max_code + 1);
+      std::vector<int32_t> child_lengths(max_code + 1, 0);
+
+      if (array.offset() != 0) {
+        // This is an unpleasant case. Because the offsets are different for
+        // each child array, when we have a sliced array, we need to "rebase"
+        // the value_offsets for each array
+
+        const int32_t* unshifted_offsets = array.raw_value_offsets();
+        const uint8_t* type_ids = array.raw_type_ids();
+
+        // Allocate the shifted offsets
+        std::shared_ptr<MutableBuffer> shifted_offsets_buffer;
+        RETURN_NOT_OK(AllocateBuffer(
+            pool_, array.length() * sizeof(int32_t), &shifted_offsets_buffer));
+        int32_t* shifted_offsets =
+            reinterpret_cast<int32_t*>(shifted_offsets_buffer->mutable_data());
+
+        for (int32_t i = 0; i < array.length(); ++i) {
+          const uint8_t code = type_ids[i];
+          int32_t shift = child_offsets[code];
+          if (shift == -1) { child_offsets[code] = shift = unshifted_offsets[i]; }
+          shifted_offsets[i] = unshifted_offsets[i] - shift;
+
+          // Update the child length to account for observed value
+          ++child_lengths[code];
+        }
+
+        value_offsets = shifted_offsets_buffer;
+      }
+      buffers_.push_back(value_offsets);
+
+      // Visit children and slice accordingly
+      for (int i = 0; i < type.num_children(); ++i) {
+        std::shared_ptr<Array> child = array.child(i);
+        if (array.offset() != 0) {
+          const uint8_t code = type.type_codes[i];
+          child = child->Slice(child_offsets[code], child_lengths[code]);
+        }
+        RETURN_NOT_OK(VisitArray(*child));
+      }
+    } else {
+      for (std::shared_ptr<Array> child : array.children()) {
+        // Sparse union, slicing is simpler
+        if (array.offset() != 0) {
+          // If offset is non-zero, slice the child array
+          child = child->Slice(array.offset(), array.length());
+        }
+        RETURN_NOT_OK(VisitArray(*child));
+      }
     }
     ++max_recursion_depth_;
     return Status::OK();
@@ -298,6 +474,8 @@ class RecordBatchWriter : public ArrayVisitor {
     return Status::OK();
   }
 
+  // In some cases, intermediate buffers may need to be allocated (with sliced arrays)
+  MemoryPool* pool_;
   const RecordBatch& batch_;
 
   std::vector<flatbuf::FieldNode> field_nodes_;
@@ -310,14 +488,14 @@ class RecordBatchWriter : public ArrayVisitor {
 
 Status WriteRecordBatch(const RecordBatch& batch, int64_t buffer_start_offset,
     io::OutputStream* dst, int32_t* metadata_length, int64_t* body_length,
-    int max_recursion_depth) {
+    MemoryPool* pool, int max_recursion_depth) {
   DCHECK_GT(max_recursion_depth, 0);
-  RecordBatchWriter serializer(batch, buffer_start_offset, max_recursion_depth);
+  RecordBatchWriter serializer(pool, batch, buffer_start_offset, max_recursion_depth);
   return serializer.Write(dst, metadata_length, body_length);
 }
 
 Status GetRecordBatchSize(const RecordBatch& batch, int64_t* size) {
-  RecordBatchWriter serializer(batch, 0, kMaxIpcRecursionDepth);
+  RecordBatchWriter serializer(default_memory_pool(), batch, 0, kMaxIpcRecursionDepth);
   RETURN_NOT_OK(serializer.GetTotalSize(size));
   return Status::OK();
 }
@@ -372,7 +550,7 @@ class ArrayLoader : public TypeVisitor {
     BufferMetadata metadata = context_->metadata->buffer(buffer_index);
 
     if (metadata.length == 0) {
-      *out = std::make_shared<Buffer>(nullptr, 0);
+      *out = nullptr;
       return Status::OK();
     } else {
       return file_->ReadAt(metadata.offset, metadata.length, out);
@@ -412,8 +590,8 @@ class ArrayLoader : public TypeVisitor {
       context_->buffer_index++;
       data.reset(new Buffer(nullptr, 0));
     }
-    return MakePrimitiveArray(field_.type, field_meta.length, data, field_meta.null_count,
-        null_bitmap, &result_);
+    return MakePrimitiveArray(field_.type, field_meta.length, data, null_bitmap,
+        field_meta.null_count, 0, &result_);
   }
 
   template <typename CONTAINER>
@@ -428,7 +606,7 @@ class ArrayLoader : public TypeVisitor {
     RETURN_NOT_OK(GetBuffer(context_->buffer_index++, &values));
 
     result_ = std::make_shared<CONTAINER>(
-        field_meta.length, offsets, values, field_meta.null_count, null_bitmap);
+        field_meta.length, offsets, values, null_bitmap, field_meta.null_count);
     return Status::OK();
   }
 
@@ -496,7 +674,7 @@ class ArrayLoader : public TypeVisitor {
     RETURN_NOT_OK(LoadChild(*type.child(0).get(), &values_array));
 
     result_ = std::make_shared<ListArray>(field_.type, field_meta.length, offsets,
-        values_array, field_meta.null_count, null_bitmap);
+        values_array, null_bitmap, field_meta.null_count);
     return Status::OK();
   }
 
@@ -521,7 +699,7 @@ class ArrayLoader : public TypeVisitor {
     RETURN_NOT_OK(LoadChildren(type.children(), &fields));
 
     result_ = std::make_shared<StructArray>(
-        field_.type, field_meta.length, fields, field_meta.null_count, null_bitmap);
+        field_.type, field_meta.length, fields, null_bitmap, field_meta.null_count);
     return Status::OK();
   }
 
@@ -542,7 +720,7 @@ class ArrayLoader : public TypeVisitor {
     RETURN_NOT_OK(LoadChildren(type.children(), &fields));
 
     result_ = std::make_shared<UnionArray>(field_.type, field_meta.length, fields,
-        type_ids, offsets, field_meta.null_count, null_bitmap);
+        type_ids, offsets, null_bitmap, field_meta.null_count);
     return Status::OK();
   }
 

http://git-wip-us.apache.org/repos/asf/arrow/blob/5439b715/cpp/src/arrow/ipc/adapter.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/adapter.h b/cpp/src/arrow/ipc/adapter.h
index f9ef7d9..83542d0 100644
--- a/cpp/src/arrow/ipc/adapter.h
+++ b/cpp/src/arrow/ipc/adapter.h
@@ -30,6 +30,7 @@
 namespace arrow {
 
 class Array;
+class MemoryPool;
 class RecordBatch;
 class Schema;
 class Status;
@@ -71,14 +72,15 @@ constexpr int kMaxIpcRecursionDepth = 64;
 //
 // @param(out) body_length: the size of the contiguous buffer block plus
 // padding bytes
-ARROW_EXPORT Status WriteRecordBatch(const RecordBatch& batch,
+Status ARROW_EXPORT WriteRecordBatch(const RecordBatch& batch,
     int64_t buffer_start_offset, io::OutputStream* dst, int32_t* metadata_length,
-    int64_t* body_length, int max_recursion_depth = kMaxIpcRecursionDepth);
+    int64_t* body_length, MemoryPool* pool,
+    int max_recursion_depth = kMaxIpcRecursionDepth);
 
 // Compute the precise number of bytes needed in a contiguous memory segment to
 // write the record batch. This involves generating the complete serialized
 // Flatbuffers metadata.
-ARROW_EXPORT Status GetRecordBatchSize(const RecordBatch& batch, int64_t* size);
+Status ARROW_EXPORT GetRecordBatchSize(const RecordBatch& batch, int64_t* size);
 
 // ----------------------------------------------------------------------
 // "Read" path; does not copy data if the input supports zero copy reads

http://git-wip-us.apache.org/repos/asf/arrow/blob/5439b715/cpp/src/arrow/ipc/ipc-adapter-test.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/ipc-adapter-test.cc b/cpp/src/arrow/ipc/ipc-adapter-test.cc
index 17868f8..bae6578 100644
--- a/cpp/src/arrow/ipc/ipc-adapter-test.cc
+++ b/cpp/src/arrow/ipc/ipc-adapter-test.cc
@@ -32,6 +32,7 @@
 
 #include "arrow/buffer.h"
 #include "arrow/memory_pool.h"
+#include "arrow/pretty_print.h"
 #include "arrow/status.h"
 #include "arrow/test-util.h"
 #include "arrow/util/bit-util.h"
@@ -56,7 +57,7 @@ class TestWriteRecordBatch : public ::testing::TestWithParam<MakeRecordBatch*>,
     const int64_t buffer_offset = 0;
 
     RETURN_NOT_OK(WriteRecordBatch(
-        batch, buffer_offset, mmap_.get(), &metadata_length, &body_length));
+        batch, buffer_offset, mmap_.get(), &metadata_length, &body_length, pool_));
 
     std::shared_ptr<RecordBatchMetadata> metadata;
     RETURN_NOT_OK(ReadRecordBatchMetadata(0, metadata_length, mmap_.get(), &metadata));
@@ -92,17 +93,49 @@ TEST_P(TestWriteRecordBatch, RoundTrip) {
   }
 }
 
-INSTANTIATE_TEST_CASE_P(RoundTripTests, TestWriteRecordBatch,
-    ::testing::Values(&MakeIntRecordBatch, &MakeListRecordBatch, &MakeNonNullRecordBatch,
-                            &MakeZeroLengthRecordBatch, &MakeDeeplyNestedList,
-                            &MakeStringTypesRecordBatch, &MakeStruct, &MakeUnion));
+TEST_P(TestWriteRecordBatch, SliceRoundTrip) {
+  std::shared_ptr<RecordBatch> batch;
+  ASSERT_OK((*GetParam())(&batch));  // NOLINT clang-tidy gtest issue
+  std::shared_ptr<RecordBatch> batch_result;
+
+  // Skip the zero-length case
+  if (batch->num_rows() < 2) { return; }
+
+  auto sliced_batch = batch->Slice(2, 10);
+
+  ASSERT_OK(RoundTripHelper(*sliced_batch, 1 << 16, &batch_result));
+
+  EXPECT_EQ(sliced_batch->num_rows(), batch_result->num_rows());
+
+  for (int i = 0; i < sliced_batch->num_columns(); ++i) {
+    const auto& left = *sliced_batch->column(i);
+    const auto& right = *batch_result->column(i);
+    if (!left.Equals(right)) {
+      std::stringstream pp_result;
+      std::stringstream pp_expected;
+
+      ASSERT_OK(PrettyPrint(left, 0, &pp_expected));
+      ASSERT_OK(PrettyPrint(right, 0, &pp_result));
+
+      FAIL() << "Index: " << i << " Expected: " << pp_expected.str()
+             << "\nGot: " << pp_result.str();
+    }
+  }
+}
+
+INSTANTIATE_TEST_CASE_P(
+    RoundTripTests, TestWriteRecordBatch,
+    ::testing::Values(&MakeIntRecordBatch, &MakeStringTypesRecordBatch,
+        &MakeNonNullRecordBatch, &MakeZeroLengthRecordBatch, &MakeListRecordBatch,
+        &MakeDeeplyNestedList, &MakeStruct, &MakeUnion));
 
 void TestGetRecordBatchSize(std::shared_ptr<RecordBatch> batch) {
   ipc::MockOutputStream mock;
   int32_t mock_metadata_length = -1;
   int64_t mock_body_length = -1;
   int64_t size = -1;
-  ASSERT_OK(WriteRecordBatch(*batch, 0, &mock, &mock_metadata_length, &mock_body_length));
+  ASSERT_OK(WriteRecordBatch(
+      *batch, 0, &mock, &mock_metadata_length, &mock_body_length, default_memory_pool()));
   ASSERT_OK(GetRecordBatchSize(*batch, &size));
   ASSERT_EQ(mock.GetExtentBytesWritten(), size);
 }
@@ -156,10 +189,11 @@ class RecursionLimits : public ::testing::Test, public io::MemoryMapFixture {
     io::MemoryMapFixture::InitMemoryMap(memory_map_size, path, &mmap_);
 
     if (override_level) {
-      return WriteRecordBatch(
-          *batch, 0, mmap_.get(), metadata_length, body_length, recursion_level + 1);
+      return WriteRecordBatch(*batch, 0, mmap_.get(), metadata_length, body_length, pool_,
+          recursion_level + 1);
     } else {
-      return WriteRecordBatch(*batch, 0, mmap_.get(), metadata_length, body_length);
+      return WriteRecordBatch(
+          *batch, 0, mmap_.get(), metadata_length, body_length, pool_);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/arrow/blob/5439b715/cpp/src/arrow/ipc/ipc-json-test.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/ipc-json-test.cc b/cpp/src/arrow/ipc/ipc-json-test.cc
index 30f968c..3e759cc 100644
--- a/cpp/src/arrow/ipc/ipc-json-test.cc
+++ b/cpp/src/arrow/ipc/ipc-json-test.cc
@@ -80,7 +80,7 @@ template <typename T, typename ValueType>
 void CheckPrimitive(const std::shared_ptr<DataType>& type,
     const std::vector<bool>& is_valid, const std::vector<ValueType>& values) {
   MemoryPool* pool = default_memory_pool();
-  typename TypeTraits<T>::BuilderType builder(pool, type);
+  typename TypeTraits<T>::BuilderType builder(pool);
 
   for (size_t i = 0; i < values.size(); ++i) {
     if (is_valid[i]) {
@@ -146,12 +146,11 @@ TEST(TestJsonArrayWriter, NestedTypes) {
 
   std::vector<int32_t> values = {0, 1, 2, 3, 4, 5, 6};
   std::shared_ptr<Array> values_array;
-  ArrayFromVector<Int32Type, int32_t>(int32(), values_is_valid, values, &values_array);
+  ArrayFromVector<Int32Type, int32_t>(values_is_valid, values, &values_array);
 
   std::vector<int16_t> i16_values = {0, 1, 2, 3, 4, 5, 6};
   std::shared_ptr<Array> i16_values_array;
-  ArrayFromVector<Int16Type, int16_t>(
-      int16(), values_is_valid, i16_values, &i16_values_array);
+  ArrayFromVector<Int16Type, int16_t>(values_is_valid, i16_values, &i16_values_array);
 
   // List
   std::vector<bool> list_is_valid = {true, false, true, true, true};
@@ -161,7 +160,7 @@ TEST(TestJsonArrayWriter, NestedTypes) {
   ASSERT_OK(test::GetBitmapFromBoolVector(list_is_valid, &list_bitmap));
   std::shared_ptr<Buffer> offsets_buffer = test::GetBufferFromVector(offsets);
 
-  ListArray list_array(list(value_type), 5, offsets_buffer, values_array, 1, list_bitmap);
+  ListArray list_array(list(value_type), 5, offsets_buffer, values_array, list_bitmap, 1);
 
   TestArrayRoundTrip(list_array);
 
@@ -175,7 +174,7 @@ TEST(TestJsonArrayWriter, NestedTypes) {
 
   std::vector<std::shared_ptr<Array>> fields = {values_array, values_array, values_array};
   StructArray struct_array(
-      struct_type, static_cast<int>(struct_is_valid.size()), fields, 2, struct_bitmap);
+      struct_type, static_cast<int>(struct_is_valid.size()), fields, struct_bitmap, 2);
   TestArrayRoundTrip(struct_array);
 }
 
@@ -202,15 +201,15 @@ void MakeBatchArrays(const std::shared_ptr<Schema>& schema, const int num_rows,
   test::randint<int32_t>(num_rows, 0, 100, &v2_values);
 
   std::shared_ptr<Array> v1;
-  ArrayFromVector<Int8Type, int8_t>(schema->field(0)->type, is_valid, v1_values, &v1);
+  ArrayFromVector<Int8Type, int8_t>(is_valid, v1_values, &v1);
 
   std::shared_ptr<Array> v2;
-  ArrayFromVector<Int32Type, int32_t>(schema->field(1)->type, is_valid, v2_values, &v2);
+  ArrayFromVector<Int32Type, int32_t>(is_valid, v2_values, &v2);
 
   static const int kBufferSize = 10;
   static uint8_t buffer[kBufferSize];
   static uint32_t seed = 0;
-  StringBuilder string_builder(default_memory_pool(), utf8());
+  StringBuilder string_builder(default_memory_pool());
   for (int i = 0; i < num_rows; ++i) {
     if (!is_valid[i]) {
       string_builder.AppendNull();
@@ -338,13 +337,13 @@ TEST(TestJsonFileReadWrite, MinimalFormatExample) {
   std::vector<bool> foo_valid = {true, false, true, true, true};
   std::vector<int32_t> foo_values = {1, 2, 3, 4, 5};
   std::shared_ptr<Array> foo;
-  ArrayFromVector<Int32Type, int32_t>(int32(), foo_valid, foo_values, &foo);
+  ArrayFromVector<Int32Type, int32_t>(foo_valid, foo_values, &foo);
   ASSERT_TRUE(batch->column(0)->Equals(foo));
 
   std::vector<bool> bar_valid = {true, false, false, true, true};
   std::vector<double> bar_values = {1, 2, 3, 4, 5};
   std::shared_ptr<Array> bar;
-  ArrayFromVector<DoubleType, double>(float64(), bar_valid, bar_values, &bar);
+  ArrayFromVector<DoubleType, double>(bar_valid, bar_values, &bar);
   ASSERT_TRUE(batch->column(1)->Equals(bar));
 }
 

http://git-wip-us.apache.org/repos/asf/arrow/blob/5439b715/cpp/src/arrow/ipc/json-integration-test.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/json-integration-test.cc b/cpp/src/arrow/ipc/json-integration-test.cc
index 95bc742..17ccc4a 100644
--- a/cpp/src/arrow/ipc/json-integration-test.cc
+++ b/cpp/src/arrow/ipc/json-integration-test.cc
@@ -144,10 +144,8 @@ static Status ValidateArrowVsJson(
 
   if (!json_schema->Equals(arrow_schema)) {
     std::stringstream ss;
-    ss << "JSON schema: \n"
-       << json_schema->ToString() << "\n"
-       << "Arrow schema: \n"
-       << arrow_schema->ToString();
+    ss << "JSON schema: \n" << json_schema->ToString() << "\n"
+       << "Arrow schema: \n" << arrow_schema->ToString();
 
     if (FLAGS_verbose) { std::cout << ss.str() << std::endl; }
     return Status::Invalid("Schemas did not match");

http://git-wip-us.apache.org/repos/asf/arrow/blob/5439b715/cpp/src/arrow/ipc/json-internal.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/json-internal.cc b/cpp/src/arrow/ipc/json-internal.cc
index 43bd8a4..1a95b2c 100644
--- a/cpp/src/arrow/ipc/json-internal.cc
+++ b/cpp/src/arrow/ipc/json-internal.cc
@@ -199,8 +199,8 @@ class JsonSchemaWriter : public TypeVisitor {
     // Write type ids
     writer_->Key("typeIds");
     writer_->StartArray();
-    for (size_t i = 0; i < type.type_ids.size(); ++i) {
-      writer_->Uint(type.type_ids[i]);
+    for (size_t i = 0; i < type.type_codes.size(); ++i) {
+      writer_->Uint(type.type_codes[i]);
     }
     writer_->EndArray();
   }
@@ -464,7 +464,7 @@ class JsonArrayWriter : public ArrayVisitor {
   template <typename T>
   Status WriteVarBytes(const T& array) {
     WriteValidityField(array);
-    WriteIntegerField("OFFSET", array.raw_offsets(), array.length() + 1);
+    WriteIntegerField("OFFSET", array.raw_value_offsets(), array.length() + 1);
     WriteDataField(array);
     SetNoChildren();
     return Status::OK();
@@ -532,7 +532,7 @@ class JsonArrayWriter : public ArrayVisitor {
 
   Status Visit(const ListArray& array) override {
     WriteValidityField(array);
-    WriteIntegerField("OFFSET", array.raw_offsets(), array.length() + 1);
+    WriteIntegerField("OFFSET", array.raw_value_offsets(), array.length() + 1);
     auto type = static_cast<const ListType*>(array.type().get());
     return WriteChildren(type->children(), {array.values()});
   }
@@ -549,7 +549,7 @@ class JsonArrayWriter : public ArrayVisitor {
 
     WriteIntegerField("TYPE_ID", array.raw_type_ids(), array.length());
     if (type->mode == UnionMode::DENSE) {
-      WriteIntegerField("OFFSET", array.raw_offsets(), array.length());
+      WriteIntegerField("OFFSET", array.raw_value_offsets(), array.length());
     }
     return WriteChildren(type->children(), array.children());
   }
@@ -718,17 +718,17 @@ class JsonSchemaReader {
       return Status::Invalid(ss.str());
     }
 
-    const auto& json_type_ids = json_type.FindMember("typeIds");
-    RETURN_NOT_ARRAY("typeIds", json_type_ids, json_type);
+    const auto& json_type_codes = json_type.FindMember("typeIds");
+    RETURN_NOT_ARRAY("typeIds", json_type_codes, json_type);
 
-    std::vector<uint8_t> type_ids;
-    const auto& id_array = json_type_ids->value.GetArray();
+    std::vector<uint8_t> type_codes;
+    const auto& id_array = json_type_codes->value.GetArray();
     for (const rj::Value& val : id_array) {
       DCHECK(val.IsUint());
-      type_ids.push_back(val.GetUint());
+      type_codes.push_back(val.GetUint());
     }
 
-    *type = union_(children, type_ids, mode);
+    *type = union_(children, type_codes, mode);
 
     return Status::OK();
   }
@@ -844,7 +844,7 @@ class JsonArrayReader {
   typename std::enable_if<std::is_base_of<BinaryType, T>::value, Status>::type ReadArray(
       const RjObject& json_array, int32_t length, const std::vector<bool>& is_valid,
       const std::shared_ptr<DataType>& type, std::shared_ptr<Array>* array) {
-    typename TypeTraits<T>::BuilderType builder(pool_, type);
+    typename TypeTraits<T>::BuilderType builder(pool_);
 
     const auto& json_data = json_array.FindMember("DATA");
     RETURN_NOT_ARRAY("DATA", json_data, json_array);
@@ -869,8 +869,9 @@ class JsonArrayReader {
   template <typename T>
   Status GetIntArray(
       const RjArray& json_array, const int32_t length, std::shared_ptr<Buffer>* out) {
-    auto buffer = std::make_shared<PoolBuffer>(pool_);
-    RETURN_NOT_OK(buffer->Resize(length * sizeof(T)));
+    std::shared_ptr<MutableBuffer> buffer;
+    RETURN_NOT_OK(AllocateBuffer(pool_, length * sizeof(T), &buffer));
+
     T* values = reinterpret_cast<T*>(buffer->mutable_data());
     for (int i = 0; i < length; ++i) {
       const rj::Value& val = json_array[i];
@@ -901,7 +902,7 @@ class JsonArrayReader {
     DCHECK_EQ(children.size(), 1);
 
     *array = std::make_shared<ListArray>(
-        type, length, offsets_buffer, children[0], null_count, validity_buffer);
+        type, length, offsets_buffer, children[0], validity_buffer, null_count);
 
     return Status::OK();
   }
@@ -918,7 +919,7 @@ class JsonArrayReader {
     RETURN_NOT_OK(GetChildren(json_array, type, &fields));
 
     *array =
-        std::make_shared<StructArray>(type, length, fields, null_count, validity_buffer);
+        std::make_shared<StructArray>(type, length, fields, validity_buffer, null_count);
 
     return Status::OK();
   }
@@ -953,7 +954,7 @@ class JsonArrayReader {
     RETURN_NOT_OK(GetChildren(json_array, type, &children));
 
     *array = std::make_shared<UnionArray>(type, length, children, type_id_buffer,
-        offsets_buffer, null_count, validity_buffer);
+        offsets_buffer, validity_buffer, null_count);
 
     return Status::OK();
   }
@@ -962,7 +963,7 @@ class JsonArrayReader {
   typename std::enable_if<std::is_base_of<NullType, T>::value, Status>::type ReadArray(
       const RjObject& json_array, int32_t length, const std::vector<bool>& is_valid,
       const std::shared_ptr<DataType>& type, std::shared_ptr<Array>* array) {
-    *array = std::make_shared<NullArray>(type, length);
+    *array = std::make_shared<NullArray>(length);
     return Status::OK();
   }
 

http://git-wip-us.apache.org/repos/asf/arrow/blob/5439b715/cpp/src/arrow/ipc/stream.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/stream.cc b/cpp/src/arrow/ipc/stream.cc
index c9057e8..72eb134 100644
--- a/cpp/src/arrow/ipc/stream.cc
+++ b/cpp/src/arrow/ipc/stream.cc
@@ -28,6 +28,7 @@
 #include "arrow/ipc/adapter.h"
 #include "arrow/ipc/metadata.h"
 #include "arrow/ipc/util.h"
+#include "arrow/memory_pool.h"
 #include "arrow/schema.h"
 #include "arrow/status.h"
 #include "arrow/util/logging.h"
@@ -41,7 +42,11 @@ namespace ipc {
 StreamWriter::~StreamWriter() {}
 
 StreamWriter::StreamWriter(io::OutputStream* sink, const std::shared_ptr<Schema>& schema)
-    : sink_(sink), schema_(schema), position_(-1), started_(false) {}
+    : sink_(sink),
+      schema_(schema),
+      pool_(default_memory_pool()),
+      position_(-1),
+      started_(false) {}
 
 Status StreamWriter::UpdatePosition() {
   return sink_->Tell(&position_);
@@ -76,8 +81,8 @@ Status StreamWriter::WriteRecordBatch(const RecordBatch& batch, FileBlock* block
 
   // Frame of reference in file format is 0, see ARROW-384
   const int64_t buffer_start_offset = 0;
-  RETURN_NOT_OK(arrow::ipc::WriteRecordBatch(
-      batch, buffer_start_offset, sink_, &block->metadata_length, &block->body_length));
+  RETURN_NOT_OK(arrow::ipc::WriteRecordBatch(batch, buffer_start_offset, sink_,
+      &block->metadata_length, &block->body_length, pool_));
   RETURN_NOT_OK(UpdatePosition());
 
   DCHECK(position_ % 8 == 0) << "WriteRecordBatch did not perform aligned writes";
@@ -85,6 +90,10 @@ Status StreamWriter::WriteRecordBatch(const RecordBatch& batch, FileBlock* block
   return Status::OK();
 }
 
+void StreamWriter::set_memory_pool(MemoryPool* pool) {
+  pool_ = pool;
+}
+
 // ----------------------------------------------------------------------
 // StreamWriter implementation
 

http://git-wip-us.apache.org/repos/asf/arrow/blob/5439b715/cpp/src/arrow/ipc/stream.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/stream.h b/cpp/src/arrow/ipc/stream.h
index 53f51dc..12414fa 100644
--- a/cpp/src/arrow/ipc/stream.h
+++ b/cpp/src/arrow/ipc/stream.h
@@ -30,6 +30,7 @@ namespace arrow {
 class Array;
 class Buffer;
 struct Field;
+class MemoryPool;
 class RecordBatch;
 class Schema;
 class Status;
@@ -59,6 +60,10 @@ class ARROW_EXPORT StreamWriter {
   /// closing the actual OutputStream
   virtual Status Close();
 
+  // In some cases, writing may require memory allocation. We use the default
+  // memory pool, but provide the option to override
+  void set_memory_pool(MemoryPool* pool);
+
  protected:
   StreamWriter(io::OutputStream* sink, const std::shared_ptr<Schema>& schema);
 
@@ -81,6 +86,9 @@ class ARROW_EXPORT StreamWriter {
 
   io::OutputStream* sink_;
   std::shared_ptr<Schema> schema_;
+
+  MemoryPool* pool_;
+
   int64_t position_;
   bool started_;
 };

http://git-wip-us.apache.org/repos/asf/arrow/blob/5439b715/cpp/src/arrow/ipc/test-common.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/test-common.h b/cpp/src/arrow/ipc/test-common.h
index ca790de..b4930c4 100644
--- a/cpp/src/arrow/ipc/test-common.h
+++ b/cpp/src/arrow/ipc/test-common.h
@@ -21,6 +21,7 @@
 #include <algorithm>
 #include <cstdint>
 #include <memory>
+#include <numeric>
 #include <string>
 #include <vector>
 
@@ -28,6 +29,7 @@
 #include "arrow/buffer.h"
 #include "arrow/builder.h"
 #include "arrow/memory_pool.h"
+#include "arrow/status.h"
 #include "arrow/table.h"
 #include "arrow/test-util.h"
 #include "arrow/type.h"
@@ -104,8 +106,8 @@ Status MakeIntRecordBatch(std::shared_ptr<RecordBatch>* out) {
   const int length = 1000;
 
   // Make the schema
-  auto f0 = std::make_shared<Field>("f0", int32());
-  auto f1 = std::make_shared<Field>("f1", int32());
+  auto f0 = field("f0", int32());
+  auto f1 = field("f1", int32());
   std::shared_ptr<Schema> schema(new Schema({f0, f1}));
 
   // Example data
@@ -119,10 +121,10 @@ Status MakeIntRecordBatch(std::shared_ptr<RecordBatch>* out) {
 
 template <class Builder, class RawType>
 Status MakeRandomBinaryArray(
-    const TypePtr& type, int32_t length, MemoryPool* pool, std::shared_ptr<Array>* out) {
+    int32_t length, MemoryPool* pool, std::shared_ptr<Array>* out) {
   const std::vector<std::string> values = {
       "", "", "abc", "123", "efg", "456!@#!@#", "12312"};
-  Builder builder(pool, type);
+  Builder builder(pool);
   const auto values_len = values.size();
   for (int32_t i = 0; i < length; ++i) {
     int values_index = i % values_len;
@@ -141,22 +143,22 @@ Status MakeStringTypesRecordBatch(std::shared_ptr<RecordBatch>* out) {
   const int32_t length = 500;
   auto string_type = utf8();
   auto binary_type = binary();
-  auto f0 = std::make_shared<Field>("f0", string_type);
-  auto f1 = std::make_shared<Field>("f1", binary_type);
+  auto f0 = field("f0", string_type);
+  auto f1 = field("f1", binary_type);
   std::shared_ptr<Schema> schema(new Schema({f0, f1}));
 
   std::shared_ptr<Array> a0, a1;
   MemoryPool* pool = default_memory_pool();
 
+  // Quirk with RETURN_NOT_OK macro and templated functions
   {
-    auto status =
-        MakeRandomBinaryArray<StringBuilder, char>(string_type, length, pool, &a0);
-    RETURN_NOT_OK(status);
+    auto s = MakeRandomBinaryArray<StringBuilder, char>(length, pool, &a0);
+    RETURN_NOT_OK(s);
   }
+
   {
-    auto status =
-        MakeRandomBinaryArray<BinaryBuilder, uint8_t>(binary_type, length, pool, &a1);
-    RETURN_NOT_OK(status);
+    auto s = MakeRandomBinaryArray<BinaryBuilder, uint8_t>(length, pool, &a1);
+    RETURN_NOT_OK(s);
   }
   out->reset(new RecordBatch(schema, length, {a0, a1}));
   return Status::OK();
@@ -164,9 +166,9 @@ Status MakeStringTypesRecordBatch(std::shared_ptr<RecordBatch>* out) {
 
 Status MakeListRecordBatch(std::shared_ptr<RecordBatch>* out) {
   // Make the schema
-  auto f0 = std::make_shared<Field>("f0", kListInt32);
-  auto f1 = std::make_shared<Field>("f1", kListListInt32);
-  auto f2 = std::make_shared<Field>("f2", int32());
+  auto f0 = field("f0", kListInt32);
+  auto f1 = field("f1", kListListInt32);
+  auto f2 = field("f2", int32());
   std::shared_ptr<Schema> schema(new Schema({f0, f1, f2}));
 
   // Example data
@@ -187,14 +189,13 @@ Status MakeListRecordBatch(std::shared_ptr<RecordBatch>* out) {
 
 Status MakeZeroLengthRecordBatch(std::shared_ptr<RecordBatch>* out) {
   // Make the schema
-  auto f0 = std::make_shared<Field>("f0", kListInt32);
-  auto f1 = std::make_shared<Field>("f1", kListListInt32);
-  auto f2 = std::make_shared<Field>("f2", int32());
+  auto f0 = field("f0", kListInt32);
+  auto f1 = field("f1", kListListInt32);
+  auto f2 = field("f2", int32());
   std::shared_ptr<Schema> schema(new Schema({f0, f1, f2}));
 
   // Example data
   MemoryPool* pool = default_memory_pool();
-  const int length = 200;
   const bool include_nulls = true;
   std::shared_ptr<Array> leaf_values, list_array, list_list_array, flat_array;
   RETURN_NOT_OK(MakeRandomInt32Array(0, include_nulls, pool, &leaf_values));
@@ -202,15 +203,15 @@ Status MakeZeroLengthRecordBatch(std::shared_ptr<RecordBatch>* out) {
   RETURN_NOT_OK(
       MakeRandomListArray(list_array, 0, include_nulls, pool, &list_list_array));
   RETURN_NOT_OK(MakeRandomInt32Array(0, include_nulls, pool, &flat_array));
-  out->reset(new RecordBatch(schema, length, {list_array, list_list_array, flat_array}));
+  out->reset(new RecordBatch(schema, 0, {list_array, list_list_array, flat_array}));
   return Status::OK();
 }
 
 Status MakeNonNullRecordBatch(std::shared_ptr<RecordBatch>* out) {
   // Make the schema
-  auto f0 = std::make_shared<Field>("f0", kListInt32);
-  auto f1 = std::make_shared<Field>("f1", kListListInt32);
-  auto f2 = std::make_shared<Field>("f2", int32());
+  auto f0 = field("f0", kListInt32);
+  auto f1 = field("f1", kListListInt32);
+  auto f2 = field("f2", int32());
   std::shared_ptr<Schema> schema(new Schema({f0, f1, f2}));
 
   // Example data
@@ -242,7 +243,7 @@ Status MakeDeeplyNestedList(std::shared_ptr<RecordBatch>* out) {
     RETURN_NOT_OK(MakeRandomListArray(array, batch_length, include_nulls, pool, &array));
   }
 
-  auto f0 = std::make_shared<Field>("f0", type);
+  auto f0 = field("f0", type);
   std::shared_ptr<Schema> schema(new Schema({f0}));
   std::vector<std::shared_ptr<Array>> arrays = {array};
   out->reset(new RecordBatch(schema, batch_length, arrays));
@@ -260,8 +261,8 @@ Status MakeStruct(std::shared_ptr<RecordBatch>* out) {
   // Define schema
   std::shared_ptr<DataType> type(new StructType(
       {list_schema->field(0), list_schema->field(1), list_schema->field(2)}));
-  auto f0 = std::make_shared<Field>("non_null_struct", type);
-  auto f1 = std::make_shared<Field>("null_struct", type);
+  auto f0 = field("non_null_struct", type);
+  auto f1 = field("null_struct", type);
   std::shared_ptr<Schema> schema(new Schema({f0, f1}));
 
   // construct individual nullable/non-nullable struct arrays
@@ -271,7 +272,7 @@ Status MakeStruct(std::shared_ptr<RecordBatch>* out) {
   std::shared_ptr<Buffer> null_bitmask;
   RETURN_NOT_OK(BitUtil::BytesToBits(null_bytes, &null_bitmask));
   std::shared_ptr<Array> with_nulls(
-      new StructArray(type, list_batch->num_rows(), columns, 1, null_bitmask));
+      new StructArray(type, list_batch->num_rows(), columns, null_bitmask, 1));
 
   // construct batch
   std::vector<std::shared_ptr<Array>> arrays = {no_nulls, with_nulls};
@@ -282,7 +283,7 @@ Status MakeStruct(std::shared_ptr<RecordBatch>* out) {
 Status MakeUnion(std::shared_ptr<RecordBatch>* out) {
   // Define schema
   std::vector<std::shared_ptr<Field>> union_types(
-      {std::make_shared<Field>("u0", int32()), std::make_shared<Field>("u1", uint8())});
+      {field("u0", int32()), field("u1", uint8())});
 
   std::vector<uint8_t> type_codes = {5, 10};
   auto sparse_type =
@@ -291,9 +292,9 @@ Status MakeUnion(std::shared_ptr<RecordBatch>* out) {
   auto dense_type =
       std::make_shared<UnionType>(union_types, type_codes, UnionMode::DENSE);
 
-  auto f0 = std::make_shared<Field>("sparse_nonnull", sparse_type, false);
-  auto f1 = std::make_shared<Field>("sparse", sparse_type);
-  auto f2 = std::make_shared<Field>("dense", dense_type);
+  auto f0 = field("sparse_nonnull", sparse_type, false);
+  auto f1 = field("sparse", sparse_type);
+  auto f2 = field("dense", dense_type);
 
   std::shared_ptr<Schema> schema(new Schema({f0, f1, f2}));
 
@@ -308,21 +309,17 @@ Status MakeUnion(std::shared_ptr<RecordBatch>* out) {
   RETURN_NOT_OK(test::CopyBufferFromVector(type_ids, &type_ids_buffer));
 
   std::vector<int32_t> u0_values = {0, 1, 2, 3, 4, 5, 6};
-  ArrayFromVector<Int32Type, int32_t>(
-      sparse_type->child(0)->type, u0_values, &sparse_children[0]);
+  ArrayFromVector<Int32Type, int32_t>(u0_values, &sparse_children[0]);
 
   std::vector<uint8_t> u1_values = {10, 11, 12, 13, 14, 15, 16};
-  ArrayFromVector<UInt8Type, uint8_t>(
-      sparse_type->child(1)->type, u1_values, &sparse_children[1]);
+  ArrayFromVector<UInt8Type, uint8_t>(u1_values, &sparse_children[1]);
 
   // dense children
   u0_values = {0, 2, 3, 7};
-  ArrayFromVector<Int32Type, int32_t>(
-      dense_type->child(0)->type, u0_values, &dense_children[0]);
+  ArrayFromVector<Int32Type, int32_t>(u0_values, &dense_children[0]);
 
   u1_values = {11, 14, 15};
-  ArrayFromVector<UInt8Type, uint8_t>(
-      dense_type->child(1)->type, u1_values, &dense_children[1]);
+  ArrayFromVector<UInt8Type, uint8_t>(u1_values, &dense_children[1]);
 
   std::shared_ptr<Buffer> offsets_buffer;
   std::vector<int32_t> offsets = {0, 0, 1, 2, 1, 2, 3};
@@ -337,10 +334,10 @@ Status MakeUnion(std::shared_ptr<RecordBatch>* out) {
   auto sparse_no_nulls =
       std::make_shared<UnionArray>(sparse_type, length, sparse_children, type_ids_buffer);
   auto sparse = std::make_shared<UnionArray>(
-      sparse_type, length, sparse_children, type_ids_buffer, nullptr, 1, null_bitmask);
+      sparse_type, length, sparse_children, type_ids_buffer, nullptr, null_bitmask, 1);
 
   auto dense = std::make_shared<UnionArray>(dense_type, length, dense_children,
-      type_ids_buffer, offsets_buffer, 1, null_bitmask);
+      type_ids_buffer, offsets_buffer, null_bitmask, 1);
 
   // construct batch
   std::vector<std::shared_ptr<Array>> arrays = {sparse_no_nulls, sparse, dense};

http://git-wip-us.apache.org/repos/asf/arrow/blob/5439b715/cpp/src/arrow/pretty_print-test.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/pretty_print-test.cc b/cpp/src/arrow/pretty_print-test.cc
index 4725d5d..aca650f 100644
--- a/cpp/src/arrow/pretty_print-test.cc
+++ b/cpp/src/arrow/pretty_print-test.cc
@@ -55,7 +55,7 @@ template <typename TYPE, typename C_TYPE>
 void CheckPrimitive(int indent, const std::vector<bool>& is_valid,
     const std::vector<C_TYPE>& values, const char* expected) {
   std::shared_ptr<Array> array;
-  ArrayFromVector<TYPE, C_TYPE>(std::make_shared<TYPE>(), is_valid, values, &array);
+  ArrayFromVector<TYPE, C_TYPE>(is_valid, values, &array);
   CheckArray(*array.get(), indent, expected);
 }
 
@@ -76,12 +76,12 @@ TEST_F(TestPrettyPrint, DictionaryType) {
 
   std::shared_ptr<Array> dict;
   std::vector<std::string> dict_values = {"foo", "bar", "baz"};
-  ArrayFromVector<StringType, std::string>(utf8(), dict_values, &dict);
+  ArrayFromVector<StringType, std::string>(dict_values, &dict);
   std::shared_ptr<DataType> dict_type = dictionary(int16(), dict);
 
   std::shared_ptr<Array> indices;
   std::vector<int16_t> indices_values = {1, 2, -1, 0, 2, 0};
-  ArrayFromVector<Int16Type, int16_t>(int16(), is_valid, indices_values, &indices);
+  ArrayFromVector<Int16Type, int16_t>(is_valid, indices_values, &indices);
   auto arr = std::make_shared<DictionaryArray>(dict_type, indices);
 
   static const char* expected = R"expected(

http://git-wip-us.apache.org/repos/asf/arrow/blob/5439b715/cpp/src/arrow/pretty_print.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/pretty_print.cc b/cpp/src/arrow/pretty_print.cc
index e30f4cc..23c0580 100644
--- a/cpp/src/arrow/pretty_print.cc
+++ b/cpp/src/arrow/pretty_print.cc
@@ -164,39 +164,56 @@ class ArrayPrinter : public ArrayVisitor {
   Status WriteValidityBitmap(const Array& array) {
     Newline();
     Write("-- is_valid: ");
-    BooleanArray is_valid(array.length(), array.null_bitmap());
-    return PrettyPrint(is_valid, indent_ + 2, sink_);
+
+    if (array.null_count() > 0) {
+      BooleanArray is_valid(
+          array.length(), array.null_bitmap(), nullptr, 0, array.offset());
+      return PrettyPrint(is_valid, indent_ + 2, sink_);
+    } else {
+      Write("all not null");
+      return Status::OK();
+    }
   }
 
   Status Visit(const ListArray& array) override {
     RETURN_NOT_OK(WriteValidityBitmap(array));
 
     Newline();
-    Write("-- offsets: ");
-    Int32Array offsets(array.length() + 1, array.offsets());
-    RETURN_NOT_OK(PrettyPrint(offsets, indent_ + 2, sink_));
+    Write("-- value_offsets: ");
+    Int32Array value_offsets(
+        array.length() + 1, array.value_offsets(), nullptr, 0, array.offset());
+    RETURN_NOT_OK(PrettyPrint(value_offsets, indent_ + 2, sink_));
 
     Newline();
     Write("-- values: ");
-    RETURN_NOT_OK(PrettyPrint(*array.values().get(), indent_ + 2, sink_));
+    auto values = array.values();
+    if (array.offset() != 0) {
+      values = values->Slice(array.value_offset(0), array.value_offset(array.length()));
+    }
+    RETURN_NOT_OK(PrettyPrint(*values, indent_ + 2, sink_));
 
     return Status::OK();
   }
 
-  Status PrintChildren(const std::vector<std::shared_ptr<Array>>& fields) {
+  Status PrintChildren(
+      const std::vector<std::shared_ptr<Array>>& fields, int32_t offset, int32_t length) {
     for (size_t i = 0; i < fields.size(); ++i) {
       Newline();
       std::stringstream ss;
       ss << "-- child " << i << " type: " << fields[i]->type()->ToString() << " values: ";
       Write(ss.str());
-      RETURN_NOT_OK(PrettyPrint(*fields[i].get(), indent_ + 2, sink_));
+
+      std::shared_ptr<Array> field = fields[i];
+      if (offset != 0) { field = field->Slice(offset, length); }
+
+      RETURN_NOT_OK(PrettyPrint(*field, indent_ + 2, sink_));
     }
     return Status::OK();
   }
 
   Status Visit(const StructArray& array) override {
     RETURN_NOT_OK(WriteValidityBitmap(array));
-    return PrintChildren(array.fields());
+    return PrintChildren(array.fields(), array.offset(), array.length());
   }
 
   Status Visit(const UnionArray& array) override {
@@ -204,17 +221,19 @@ class ArrayPrinter : public ArrayVisitor {
 
     Newline();
     Write("-- type_ids: ");
-    UInt8Array type_ids(array.length(), array.type_ids());
+    UInt8Array type_ids(array.length(), array.type_ids(), nullptr, 0, array.offset());
     RETURN_NOT_OK(PrettyPrint(type_ids, indent_ + 2, sink_));
 
     if (array.mode() == UnionMode::DENSE) {
       Newline();
-      Write("-- offsets: ");
-      Int32Array offsets(array.length(), array.offsets());
-      RETURN_NOT_OK(PrettyPrint(offsets, indent_ + 2, sink_));
+      Write("-- value_offsets: ");
+      Int32Array value_offsets(
+          array.length(), array.value_offsets(), nullptr, 0, array.offset());
+      RETURN_NOT_OK(PrettyPrint(value_offsets, indent_ + 2, sink_));
     }
 
-    return PrintChildren(array.children());
+    // Print the children without any offset, because the type ids are absolute
+    return PrintChildren(array.children(), 0, array.length() + array.offset());
   }
 
   Status Visit(const DictionaryArray& array) override {
@@ -222,11 +241,11 @@ class ArrayPrinter : public ArrayVisitor {
 
     Newline();
     Write("-- dictionary: ");
-    RETURN_NOT_OK(PrettyPrint(*array.dictionary().get(), indent_ + 2, sink_));
+    RETURN_NOT_OK(PrettyPrint(*array.dictionary(), indent_ + 2, sink_));
 
     Newline();
     Write("-- indices: ");
-    return PrettyPrint(*array.indices().get(), indent_ + 2, sink_);
+    return PrettyPrint(*array.indices(), indent_ + 2, sink_);
   }
 
   void Write(const char* data) { (*sink_) << data; }
@@ -260,7 +279,7 @@ Status PrettyPrint(const RecordBatch& batch, int indent, std::ostream* sink) {
   for (int i = 0; i < batch.num_columns(); ++i) {
     const std::string& name = batch.column_name(i);
     (*sink) << name << ": ";
-    RETURN_NOT_OK(PrettyPrint(*batch.column(i).get(), indent + 2, sink));
+    RETURN_NOT_OK(PrettyPrint(*batch.column(i), indent + 2, sink));
     (*sink) << "\n";
   }
   return Status::OK();

http://git-wip-us.apache.org/repos/asf/arrow/blob/5439b715/cpp/src/arrow/table-test.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/table-test.cc b/cpp/src/arrow/table-test.cc
index 67c9f67..e7c5d66 100644
--- a/cpp/src/arrow/table-test.cc
+++ b/cpp/src/arrow/table-test.cc
@@ -242,4 +242,30 @@ TEST_F(TestRecordBatch, Equals) {
   ASSERT_FALSE(b1.Equals(b4));
 }
 
+TEST_F(TestRecordBatch, Slice) {
+  const int length = 10;
+
+  auto f0 = std::make_shared<Field>("f0", int32());
+  auto f1 = std::make_shared<Field>("f1", uint8());
+
+  vector<shared_ptr<Field>> fields = {f0, f1};
+  auto schema = std::make_shared<Schema>(fields);
+
+  auto a0 = MakePrimitive<Int32Array>(length);
+  auto a1 = MakePrimitive<UInt8Array>(length);
+
+  RecordBatch batch(schema, length, {a0, a1});
+
+  auto batch_slice = batch.Slice(2);
+  auto batch_slice2 = batch.Slice(1, 5);
+
+  for (int i = 0; i < batch.num_columns(); ++i) {
+    ASSERT_EQ(2, batch_slice->column(i)->offset());
+    ASSERT_EQ(length - 2, batch_slice->column(i)->length());
+
+    ASSERT_EQ(1, batch_slice2->column(i)->offset());
+    ASSERT_EQ(5, batch_slice2->column(i)->length());
+  }
+}
+
 }  // namespace arrow

http://git-wip-us.apache.org/repos/asf/arrow/blob/5439b715/cpp/src/arrow/table.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/table.cc b/cpp/src/arrow/table.cc
index b3563ea..9e31ba5 100644
--- a/cpp/src/arrow/table.cc
+++ b/cpp/src/arrow/table.cc
@@ -60,6 +60,19 @@ bool RecordBatch::ApproxEquals(const RecordBatch& other) const {
   return true;
 }
 
+std::shared_ptr<RecordBatch> RecordBatch::Slice(int32_t offset) {
+  return Slice(offset, this->num_rows() - offset);
+}
+
+std::shared_ptr<RecordBatch> RecordBatch::Slice(int32_t offset, int32_t length) {
+  std::vector<std::shared_ptr<Array>> arrays;
+  arrays.reserve(num_columns());
+  for (const auto& field : columns_) {
+    arrays.emplace_back(field->Slice(offset, length));
+  }
+  return std::make_shared<RecordBatch>(schema_, num_rows_, arrays);
+}
+
 // ----------------------------------------------------------------------
 // Table methods
 
@@ -93,8 +106,7 @@ Status Table::FromRecordBatches(const std::string& name,
     if (!batches[i]->schema()->Equals(schema)) {
       std::stringstream ss;
       ss << "Schema at index " << static_cast<int>(i) << " was different: \n"
-         << schema->ToString() << "\nvs\n"
-         << batches[i]->schema()->ToString();
+         << schema->ToString() << "\nvs\n" << batches[i]->schema()->ToString();
       return Status::Invalid(ss.str());
     }
   }
@@ -126,8 +138,7 @@ Status ConcatenateTables(const std::string& output_name,
     if (!tables[i]->schema()->Equals(schema)) {
       std::stringstream ss;
       ss << "Schema at index " << static_cast<int>(i) << " was different: \n"
-         << schema->ToString() << "\nvs\n"
-         << tables[i]->schema()->ToString();
+         << schema->ToString() << "\nvs\n" << tables[i]->schema()->ToString();
       return Status::Invalid(ss.str());
     }
   }

http://git-wip-us.apache.org/repos/asf/arrow/blob/5439b715/cpp/src/arrow/table.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/table.h b/cpp/src/arrow/table.h
index 583847c..fa56824 100644
--- a/cpp/src/arrow/table.h
+++ b/cpp/src/arrow/table.h
@@ -64,6 +64,10 @@ class ARROW_EXPORT RecordBatch {
   // @returns: the number of rows (the corresponding length of each column)
   int32_t num_rows() const { return num_rows_; }
 
+  /// Slice each of the arrays in the record batch and construct a new RecordBatch object
+  std::shared_ptr<RecordBatch> Slice(int32_t offset);
+  std::shared_ptr<RecordBatch> Slice(int32_t offset, int32_t length);
+
  private:
   std::shared_ptr<Schema> schema_;
   int32_t num_rows_;

http://git-wip-us.apache.org/repos/asf/arrow/blob/5439b715/cpp/src/arrow/test-util.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/test-util.h b/cpp/src/arrow/test-util.h
index 4e52580..ffc7806 100644
--- a/cpp/src/arrow/test-util.h
+++ b/cpp/src/arrow/test-util.h
@@ -61,14 +61,6 @@
     EXPECT_TRUE(s.ok());        \
   } while (0)
 
-// Alias MSVC popcount to GCC name
-#ifdef _MSC_VER
-#include <intrin.h>
-#define __builtin_popcount __popcnt
-#include <nmmintrin.h>
-#define __builtin_popcountll _mm_popcnt_u64
-#endif
-
 namespace arrow {
 
 namespace test {
@@ -175,29 +167,6 @@ void rand_uniform_int(int n, uint32_t seed, T min_value, T max_value, T* out) {
   }
 }
 
-static inline int bitmap_popcount(const uint8_t* data, int length) {
-  // book keeping
-  constexpr int pop_len = sizeof(uint64_t);
-  const uint64_t* i64_data = reinterpret_cast<const uint64_t*>(data);
-  const int fast_counts = length / pop_len;
-  const uint64_t* end = i64_data + fast_counts;
-
-  int count = 0;
-  // popcount as much as possible with the widest possible count
-  for (auto iter = i64_data; iter < end; ++iter) {
-    count += __builtin_popcountll(*iter);
-  }
-
-  // Account for left over bytes (in theory we could fall back to smaller
-  // versions of popcount but the code complexity is likely not worth it)
-  const int loop_tail_index = fast_counts * pop_len;
-  for (int i = loop_tail_index; i < length; ++i) {
-    if (BitUtil::GetBit(data, i)) { ++count; }
-  }
-
-  return count;
-}
-
 static inline int null_count(const std::vector<uint8_t>& valid_bytes) {
   int result = 0;
   for (size_t i = 0; i < valid_bytes.size(); ++i) {
@@ -254,7 +223,7 @@ class TestBase : public ::testing::Test {
 
     auto null_bitmap = std::make_shared<PoolBuffer>(pool_);
     EXPECT_OK(null_bitmap->Resize(BitUtil::BytesForBits(length)));
-    return std::make_shared<ArrayType>(length, data, null_count, null_bitmap);
+    return std::make_shared<ArrayType>(length, data, null_bitmap, null_count);
   }
 
  protected:
@@ -263,11 +232,10 @@ class TestBase : public ::testing::Test {
 };
 
 template <typename TYPE, typename C_TYPE>
-void ArrayFromVector(const std::shared_ptr<DataType>& type,
-    const std::vector<bool>& is_valid, const std::vector<C_TYPE>& values,
+void ArrayFromVector(const std::vector<bool>& is_valid, const std::vector<C_TYPE>& values,
     std::shared_ptr<Array>* out) {
   MemoryPool* pool = default_memory_pool();
-  typename TypeTraits<TYPE>::BuilderType builder(pool, std::make_shared<TYPE>());
+  typename TypeTraits<TYPE>::BuilderType builder(pool);
   for (size_t i = 0; i < values.size(); ++i) {
     if (is_valid[i]) {
       ASSERT_OK(builder.Append(values[i]));
@@ -279,10 +247,9 @@ void ArrayFromVector(const std::shared_ptr<DataType>& type,
 }
 
 template <typename TYPE, typename C_TYPE>
-void ArrayFromVector(const std::shared_ptr<DataType>& type,
-    const std::vector<C_TYPE>& values, std::shared_ptr<Array>* out) {
+void ArrayFromVector(const std::vector<C_TYPE>& values, std::shared_ptr<Array>* out) {
   MemoryPool* pool = default_memory_pool();
-  typename TypeTraits<TYPE>::BuilderType builder(pool, std::make_shared<TYPE>());
+  typename TypeTraits<TYPE>::BuilderType builder(pool);
   for (size_t i = 0; i < values.size(); ++i) {
     ASSERT_OK(builder.Append(values[i]));
   }

http://git-wip-us.apache.org/repos/asf/arrow/blob/5439b715/cpp/src/arrow/type.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/type.cc b/cpp/src/arrow/type.cc
index ba77584..a1c2b79 100644
--- a/cpp/src/arrow/type.cc
+++ b/cpp/src/arrow/type.cc
@@ -115,7 +115,7 @@ std::string UnionType::ToString() const {
 
   for (size_t i = 0; i < children_.size(); ++i) {
     if (i) { s << ", "; }
-    s << children_[i]->ToString() << "=" << static_cast<int>(type_ids[i]);
+    s << children_[i]->ToString() << "=" << static_cast<int>(type_codes[i]);
   }
   s << ">";
   return s.str();
@@ -224,8 +224,8 @@ std::shared_ptr<DataType> struct_(const std::vector<std::shared_ptr<Field>>& fie
 }
 
 std::shared_ptr<DataType> union_(const std::vector<std::shared_ptr<Field>>& child_fields,
-    const std::vector<uint8_t>& type_ids, UnionMode mode) {
-  return std::make_shared<UnionType>(child_fields, type_ids, mode);
+    const std::vector<uint8_t>& type_codes, UnionMode mode) {
+  return std::make_shared<UnionType>(child_fields, type_codes, mode);
 }
 
 std::shared_ptr<DataType> dictionary(const std::shared_ptr<DataType>& index_type,

http://git-wip-us.apache.org/repos/asf/arrow/blob/5439b715/cpp/src/arrow/type.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/type.h b/cpp/src/arrow/type.h
index 8638a3f..927b8a4 100644
--- a/cpp/src/arrow/type.h
+++ b/cpp/src/arrow/type.h
@@ -413,8 +413,8 @@ struct ARROW_EXPORT UnionType : public DataType {
   static constexpr Type::type type_id = Type::UNION;
 
   UnionType(const std::vector<std::shared_ptr<Field>>& fields,
-      const std::vector<uint8_t>& type_ids, UnionMode mode = UnionMode::SPARSE)
-      : DataType(Type::UNION), mode(mode), type_ids(type_ids) {
+      const std::vector<uint8_t>& type_codes, UnionMode mode = UnionMode::SPARSE)
+      : DataType(Type::UNION), mode(mode), type_codes(type_codes) {
     children_ = fields;
   }
 
@@ -429,7 +429,7 @@ struct ARROW_EXPORT UnionType : public DataType {
   // The type id used in the data to indicate each data type in the union. For
   // example, the first type in the union might be denoted by the id 5 (instead
   // of 0).
-  std::vector<uint8_t> type_ids;
+  std::vector<uint8_t> type_codes;
 };
 
 // ----------------------------------------------------------------------
@@ -551,7 +551,7 @@ std::shared_ptr<DataType> ARROW_EXPORT struct_(
 
 std::shared_ptr<DataType> ARROW_EXPORT union_(
     const std::vector<std::shared_ptr<Field>>& child_fields,
-    const std::vector<uint8_t>& type_ids, UnionMode mode = UnionMode::SPARSE);
+    const std::vector<uint8_t>& type_codes, UnionMode mode = UnionMode::SPARSE);
 
 std::shared_ptr<DataType> ARROW_EXPORT dictionary(
     const std::shared_ptr<DataType>& index_type, const std::shared_ptr<Array>& values);

http://git-wip-us.apache.org/repos/asf/arrow/blob/5439b715/cpp/src/arrow/type_traits.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/type_traits.h b/cpp/src/arrow/type_traits.h
index 5cd5f45..c4898b1 100644
--- a/cpp/src/arrow/type_traits.h
+++ b/cpp/src/arrow/type_traits.h
@@ -126,6 +126,15 @@ struct TypeTraits<TimestampType> {
 };
 
 template <>
+struct TypeTraits<TimeType> {
+  using ArrayType = TimeArray;
+  // using BuilderType = TimestampBuilder;
+
+  static inline int bytes_required(int elements) { return elements * sizeof(int64_t); }
+  constexpr static bool is_parameter_free = false;
+};
+
+template <>
 struct TypeTraits<HalfFloatType> {
   using ArrayType = HalfFloatArray;
   using BuilderType = HalfFloatBuilder;

http://git-wip-us.apache.org/repos/asf/arrow/blob/5439b715/cpp/src/arrow/util/bit-util-test.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/util/bit-util-test.cc b/cpp/src/arrow/util/bit-util-test.cc
index cfdee04..cb2fd1a 100644
--- a/cpp/src/arrow/util/bit-util-test.cc
+++ b/cpp/src/arrow/util/bit-util-test.cc
@@ -17,11 +17,17 @@
 
 #include "arrow/util/bit-util.h"
 
+#include <cstdint>
+#include <vector>
+
 #include "gtest/gtest.h"
 
+#include "arrow/buffer.h"
+#include "arrow/test-util.h"
+
 namespace arrow {
 
-TEST(UtilTests, TestIsMultipleOf64) {
+TEST(BitUtilTests, TestIsMultipleOf64) {
   using BitUtil::IsMultipleOf64;
   EXPECT_TRUE(IsMultipleOf64(64));
   EXPECT_TRUE(IsMultipleOf64(0));
@@ -31,7 +37,7 @@ TEST(UtilTests, TestIsMultipleOf64) {
   EXPECT_FALSE(IsMultipleOf64(32));
 }
 
-TEST(UtilTests, TestNextPower2) {
+TEST(BitUtilTests, TestNextPower2) {
   using BitUtil::NextPower2;
 
   ASSERT_EQ(8, NextPower2(6));
@@ -51,4 +57,56 @@ TEST(UtilTests, TestNextPower2) {
   ASSERT_EQ(1LL << 62, NextPower2((1LL << 62) - 1));
 }
 
+static inline int64_t SlowCountBits(
+    const uint8_t* data, int64_t bit_offset, int64_t length) {
+  int64_t count = 0;
+  for (int64_t i = bit_offset; i < bit_offset + length; ++i) {
+    if (BitUtil::GetBit(data, i)) { ++count; }
+  }
+  return count;
+}
+
+TEST(BitUtilTests, TestCountSetBits) {
+  const int kBufferSize = 1000;
+  uint8_t buffer[kBufferSize] = {0};
+
+  test::random_bytes(kBufferSize, 0, buffer);
+
+  const int num_bits = kBufferSize * 8;
+
+  std::vector<int64_t> offsets = {
+      0, 12, 16, 32, 37, 63, 64, 128, num_bits - 30, num_bits - 64};
+  for (int64_t offset : offsets) {
+    int64_t result = CountSetBits(buffer, offset, num_bits - offset);
+    int64_t expected = SlowCountBits(buffer, offset, num_bits - offset);
+
+    ASSERT_EQ(expected, result);
+  }
+}
+
+TEST(BitUtilTests, TestCopyBitmap) {
+  const int kBufferSize = 1000;
+
+  std::shared_ptr<MutableBuffer> buffer;
+  ASSERT_OK(AllocateBuffer(default_memory_pool(), kBufferSize, &buffer));
+  memset(buffer->mutable_data(), 0, kBufferSize);
+  test::random_bytes(kBufferSize, 0, buffer->mutable_data());
+
+  const int num_bits = kBufferSize * 8;
+
+  const uint8_t* src = buffer->data();
+
+  std::vector<int64_t> offsets = {0, 12, 16, 32, 37, 63, 64, 128};
+  for (int64_t offset : offsets) {
+    const int64_t copy_length = num_bits - offset;
+
+    std::shared_ptr<Buffer> copy;
+    ASSERT_OK(CopyBitmap(default_memory_pool(), src, offset, copy_length, &copy));
+
+    for (int64_t i = 0; i < copy_length; ++i) {
+      ASSERT_EQ(BitUtil::GetBit(src, i + offset), BitUtil::GetBit(copy->data(), i));
+    }
+  }
+}
+
 }  // namespace arrow

http://git-wip-us.apache.org/repos/asf/arrow/blob/5439b715/cpp/src/arrow/util/bit-util.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/util/bit-util.cc b/cpp/src/arrow/util/bit-util.cc
index 9c82407..f3fbb41 100644
--- a/cpp/src/arrow/util/bit-util.cc
+++ b/cpp/src/arrow/util/bit-util.cc
@@ -15,10 +15,20 @@
 // specific language governing permissions and limitations
 // under the License.
 
+// Alias MSVC popcount to GCC name
+#ifdef _MSC_VER
+#include <intrin.h>
+#define __builtin_popcount __popcnt
+#include <nmmintrin.h>
+#define __builtin_popcountll _mm_popcnt_u64
+#endif
+
+#include <algorithm>
 #include <cstring>
 #include <vector>
 
 #include "arrow/buffer.h"
+#include "arrow/memory_pool.h"
 #include "arrow/status.h"
 #include "arrow/util/bit-util.h"
 
@@ -34,8 +44,9 @@ Status BitUtil::BytesToBits(
     const std::vector<uint8_t>& bytes, std::shared_ptr<Buffer>* out) {
   int bit_length = BitUtil::BytesForBits(bytes.size());
 
-  auto buffer = std::make_shared<PoolBuffer>();
-  RETURN_NOT_OK(buffer->Resize(bit_length));
+  std::shared_ptr<MutableBuffer> buffer;
+  RETURN_NOT_OK(AllocateBuffer(default_memory_pool(), bit_length, &buffer));
+
   memset(buffer->mutable_data(), 0, bit_length);
   BytesToBits(bytes, buffer->mutable_data());
 
@@ -43,4 +54,72 @@ Status BitUtil::BytesToBits(
   return Status::OK();
 }
 
+int64_t CountSetBits(const uint8_t* data, int64_t bit_offset, int64_t length) {
+  constexpr int64_t pop_len = sizeof(uint64_t) * 8;
+
+  int64_t count = 0;
+
+  // The first bit offset where we can use a 64-bit wide hardware popcount
+  const int64_t fast_count_start = BitUtil::RoundUp(bit_offset, pop_len);
+
+  // The number of bits until fast_count_start
+  const int64_t initial_bits = std::min(length, fast_count_start - bit_offset);
+  for (int64_t i = bit_offset; i < bit_offset + initial_bits; ++i) {
+    if (BitUtil::GetBit(data, i)) { ++count; }
+  }
+
+  const int64_t fast_counts = (length - initial_bits) / pop_len;
+
+  // Advance until the first aligned 8-byte word after the initial bits
+  const uint64_t* u64_data =
+      reinterpret_cast<const uint64_t*>(data) + fast_count_start / pop_len;
+
+  const uint64_t* end = u64_data + fast_counts;
+
+  // popcount as much as possible with the widest possible count
+  for (auto iter = u64_data; iter < end; ++iter) {
+    count += __builtin_popcountll(*iter);
+  }
+
+  // Account for left over bit (in theory we could fall back to smaller
+  // versions of popcount but the code complexity is likely not worth it)
+  const int64_t tail_index = bit_offset + initial_bits + fast_counts * pop_len;
+  for (int64_t i = tail_index; i < bit_offset + length; ++i) {
+    if (BitUtil::GetBit(data, i)) { ++count; }
+  }
+
+  return count;
+}
+
+Status GetEmptyBitmap(
+    MemoryPool* pool, int64_t length, std::shared_ptr<MutableBuffer>* result) {
+  RETURN_NOT_OK(AllocateBuffer(pool, BitUtil::BytesForBits(length), result));
+  memset((*result)->mutable_data(), 0, (*result)->size());
+  return Status::OK();
+}
+
+Status CopyBitmap(MemoryPool* pool, const uint8_t* data, int32_t offset, int32_t length,
+    std::shared_ptr<Buffer>* out) {
+  std::shared_ptr<MutableBuffer> buffer;
+  RETURN_NOT_OK(GetEmptyBitmap(pool, length, &buffer));
+  uint8_t* dest = buffer->mutable_data();
+  for (int64_t i = 0; i < length; ++i) {
+    BitUtil::SetBitTo(dest, i, BitUtil::GetBit(data, i + offset));
+  }
+  *out = buffer;
+  return Status::OK();
+}
+
+bool BitmapEquals(const uint8_t* left, int64_t left_offset, const uint8_t* right,
+    int64_t right_offset, int64_t bit_length) {
+  // TODO(wesm): Make this faster using word-wise comparisons
+  for (int64_t i = 0; i < bit_length; ++i) {
+    if (BitUtil::GetBit(left, left_offset + i) !=
+        BitUtil::GetBit(right, right_offset + i)) {
+      return false;
+    }
+  }
+  return true;
+}
+
 }  // namespace arrow

http://git-wip-us.apache.org/repos/asf/arrow/blob/5439b715/cpp/src/arrow/util/bit-util.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/util/bit-util.h b/cpp/src/arrow/util/bit-util.h
index 5c8055f..a0fbdd2 100644
--- a/cpp/src/arrow/util/bit-util.h
+++ b/cpp/src/arrow/util/bit-util.h
@@ -28,6 +28,8 @@
 namespace arrow {
 
 class Buffer;
+class MemoryPool;
+class MutableBuffer;
 class Status;
 
 namespace BitUtil {
@@ -62,6 +64,12 @@ static inline void SetBit(uint8_t* bits, int i) {
   bits[i / 8] |= kBitmask[i % 8];
 }
 
+static inline void SetBitTo(uint8_t* bits, int i, bool bit_is_set) {
+  // See https://graphics.stanford.edu/~seander/bithacks.html
+  // "Conditionally set or clear bits without branching"
+  bits[i / 8] ^= (-bit_is_set ^ bits[i / 8]) & kBitmask[i % 8];
+}
+
 static inline int64_t NextPower2(int64_t n) {
   n--;
   n |= n >> 1;
@@ -82,6 +90,11 @@ static inline bool IsMultipleOf8(int64_t n) {
   return (n & 7) == 0;
 }
 
+/// Returns 'value' rounded up to the nearest multiple of 'factor'
+inline int64_t RoundUp(int64_t value, int64_t factor) {
+  return (value + (factor - 1)) / factor * factor;
+}
+
 inline int64_t RoundUpToMultipleOf64(int64_t num) {
   // TODO(wesm): is this definitely needed?
   // DCHECK_GE(num, 0);
@@ -98,6 +111,38 @@ void BytesToBits(const std::vector<uint8_t>& bytes, uint8_t* bits);
 ARROW_EXPORT Status BytesToBits(const std::vector<uint8_t>&, std::shared_ptr<Buffer>*);
 
 }  // namespace BitUtil
+
+// ----------------------------------------------------------------------
+// Bitmap utilities
+
+Status ARROW_EXPORT GetEmptyBitmap(
+    MemoryPool* pool, int64_t length, std::shared_ptr<MutableBuffer>* result);
+
+/// Copy a bit range of an existing bitmap
+///
+/// \param[in] pool memory pool to allocate memory from
+/// \param[in] bitmap source data
+/// \param[in] offset bit offset into the source data
+/// \param[in] length number of bits to copy
+/// \param[out] out the resulting copy
+///
+/// \return Status message
+Status ARROW_EXPORT CopyBitmap(MemoryPool* pool, const uint8_t* bitmap, int32_t offset,
+    int32_t length, std::shared_ptr<Buffer>* out);
+
+/// Compute the number of 1's in the given data array
+///
+/// \param[in] data a packed LSB-ordered bitmap as a byte array
+/// \param[in] bit_offset a bitwise offset into the bitmap
+/// \param[in] length the number of bits to inspect in the bitmap relative to the offset
+///
+/// \return The number of set (1) bits in the range
+int64_t ARROW_EXPORT CountSetBits(
+    const uint8_t* data, int64_t bit_offset, int64_t length);
+
+bool ARROW_EXPORT BitmapEquals(const uint8_t* left, int64_t left_offset,
+    const uint8_t* right, int64_t right_offset, int64_t bit_length);
+
 }  // namespace arrow
 
 #endif  // ARROW_UTIL_BIT_UTIL_H

http://git-wip-us.apache.org/repos/asf/arrow/blob/5439b715/cpp/src/arrow/util/logging.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/util/logging.h b/cpp/src/arrow/util/logging.h
index b22f07d..06ee841 100644
--- a/cpp/src/arrow/util/logging.h
+++ b/cpp/src/arrow/util/logging.h
@@ -118,9 +118,9 @@ class CerrLog {
 class FatalLog : public CerrLog {
  public:
   explicit FatalLog(int /* severity */)  // NOLINT
-      : CerrLog(ARROW_FATAL){}           // NOLINT
+      : CerrLog(ARROW_FATAL) {}          // NOLINT
 
-            [[noreturn]] ~FatalLog() {
+  [[noreturn]] ~FatalLog() {
     if (has_logged_) { std::cerr << std::endl; }
     std::exit(1);
   }

http://git-wip-us.apache.org/repos/asf/arrow/blob/5439b715/cpp/src/arrow/util/macros.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/util/macros.h b/cpp/src/arrow/util/macros.h
index c4a62a4..81a9b0c 100644
--- a/cpp/src/arrow/util/macros.h
+++ b/cpp/src/arrow/util/macros.h
@@ -25,6 +25,6 @@
   TypeName& operator=(const TypeName&) = delete
 #endif
 
-#define UNUSED(x) (void)x
+#define UNUSED(x) (void) x
 
 #endif  // ARROW_UTIL_MACROS_H

http://git-wip-us.apache.org/repos/asf/arrow/blob/5439b715/python/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/python/CMakeLists.txt b/python/CMakeLists.txt
index 842a219..ba26692 100644
--- a/python/CMakeLists.txt
+++ b/python/CMakeLists.txt
@@ -95,7 +95,7 @@ if ("${COMPILER_FAMILY}" STREQUAL "clang")
   set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Qunused-arguments")
 
   # Cython warnings in clang
-  set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wno-parentheses-equality")
+  set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wno-parentheses-equality -Wno-constant-logical-operand")
 endif()
 
 set(PYARROW_LINK "a")

http://git-wip-us.apache.org/repos/asf/arrow/blob/5439b715/python/pyarrow/includes/libarrow.pxd
----------------------------------------------------------------------
diff --git a/python/pyarrow/includes/libarrow.pxd b/python/pyarrow/includes/libarrow.pxd
index 38883e8..ebfdc41 100644
--- a/python/pyarrow/includes/libarrow.pxd
+++ b/python/pyarrow/includes/libarrow.pxd
@@ -179,8 +179,8 @@ cdef extern from "arrow/api.h" namespace "arrow" nogil:
         double Value(int i)
 
     cdef cppclass CListArray" arrow::ListArray"(CArray):
-        const int32_t* offsets()
-        int32_t offset(int i)
+        const int32_t* raw_value_offsets()
+        int32_t value_offset(int i)
         int32_t value_length(int i)
         shared_ptr[CArray] values()
         shared_ptr[CDataType] value_type()

http://git-wip-us.apache.org/repos/asf/arrow/blob/5439b715/python/pyarrow/scalar.pyx
----------------------------------------------------------------------
diff --git a/python/pyarrow/scalar.pyx b/python/pyarrow/scalar.pyx
index 30b9040..9d2b2b1 100644
--- a/python/pyarrow/scalar.pyx
+++ b/python/pyarrow/scalar.pyx
@@ -202,7 +202,7 @@ cdef class ListValue(ArrayValue):
         self.value_type = box_data_type(self.ap.value_type())
 
     cdef getitem(self, int i):
-        cdef int j = self.ap.offset(self.index) + i
+        cdef int j = self.ap.value_offset(self.index) + i
         return box_arrow_scalar(self.value_type, self.ap.values(), j)
 
     def as_py(self):

http://git-wip-us.apache.org/repos/asf/arrow/blob/5439b715/python/src/pyarrow/adapters/builtin.cc
----------------------------------------------------------------------
diff --git a/python/src/pyarrow/adapters/builtin.cc b/python/src/pyarrow/adapters/builtin.cc
index 1abfb40..5fd8eef 100644
--- a/python/src/pyarrow/adapters/builtin.cc
+++ b/python/src/pyarrow/adapters/builtin.cc
@@ -505,7 +505,7 @@ Status ConvertPySequence(
 
   // Handle NA / NullType case
   if (type->type == Type::NA) {
-    out->reset(new arrow::NullArray(type, size));
+    out->reset(new arrow::NullArray(size));
     return Status::OK();
   }