You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2018/12/14 20:49:20 UTC

[arrow] branch master updated: ARROW-3762: [C++/Python] Support reading Parquet BYTE_ARRAY columns containing over 2GB of data

This is an automated email from the ASF dual-hosted git repository.

wesm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new 73f94c9  ARROW-3762: [C++/Python] Support reading Parquet BYTE_ARRAY columns containing over 2GB of data
73f94c9 is described below

commit 73f94c93d7eee25a43415dfa7a806b887942abd1
Author: Wes McKinney <we...@apache.org>
AuthorDate: Fri Dec 14 14:49:12 2018 -0600

    ARROW-3762: [C++/Python] Support reading Parquet BYTE_ARRAY columns containing over 2GB of data
    
    This patch ended up being a bit more of a bloodbath than I planned: please accept my apologies.
    
    Associated changes in this patch:
    
    * Split up builder.h/builder.cc into a new arrow/array directory. Public arrow/builder.h API preserved. I think this code is going to keep growing more specialized components, so I think we should get out ahead of it by having a subdirectory to contain files related to implementation details
    * Implement ChunkedBinaryBuilder, ChunkedStringBuilder classes, add tests and benchmarks
    * Deprecate parquet::arrow methods returning Array
    * Allow implicit construction of Datum from its variant types (makes for a lot nicer syntax)
    
    As far as what code to review, focus efforts on
    
    * src/parquet/arrow
    * src/arrow/array/builder_binary.h/cc, array-binary-test.cc, builder-benchmark
    * src/arrow/compute changes
    * Python changes
    
    I'm going to tackle ARROW-2970 which should not be complicated after this patch; I will submit that as a PR after this is reviews and merged.
    
    Author: Wes McKinney <we...@apache.org>
    
    Closes #3171 from wesm/ARROW-3762 and squashes the following commits:
    
    822451280 <Wes McKinney> Fix int conversion warning on Windows
    695ffc9df <Wes McKinney> Remove unimplemented and unused ChunkedBinaryBuilder ctor
    5a525115c <Wes McKinney> Use strnlen to compute string length. Inline BinaryBuilder::AppendNextOffset
    b90eb4b71 <Wes McKinney> Restore sstream include to pretty_print.cc
    3669201be <Wes McKinney> Fix deprecated API use
    5fdbbb261 <Wes McKinney> Rename columnar/ directory to array/
    8ffaec1ef <Wes McKinney> Address preliminary code comments. Check in missing files
    81e787c69 <Wes McKinney> Fix up Python bindings, unit test
    2efae064c <Wes McKinney> Finish scaffolding. Get fully compiling again and original parquet-arrow test suite passing
    3d075e4aa <Wes McKinney> Additional refactoring to make things chunked. Allow implicit construction of arrow::compute::Datum
    922811278 <Wes McKinney> More refactoring
    716322377 <Wes McKinney> Split up builder.h, builder.cc into smaller headers, compilation units. add failing test case for ARROW-3762. Add ChunkedBinaryBuilder, make BinaryBuilder Append methods inline
---
 cpp/cmake_modules/SetupCxxFlags.cmake              |    3 +
 cpp/examples/parquet/CMakeLists.txt                |    2 +-
 cpp/examples/parquet/parquet-arrow/CMakeLists.txt  |    2 +-
 .../parquet-arrow/{src => }/reader-writer.cc       |    4 +-
 cpp/src/arrow/CMakeLists.txt                       |   15 +-
 cpp/src/arrow/allocator-test.cc                    |    1 +
 cpp/src/arrow/array-binary-test.cc                 |  114 +-
 cpp/src/arrow/array-dict-test.cc                   |    8 +-
 cpp/src/arrow/array-list-test.cc                   |    4 +-
 cpp/src/arrow/array-struct-test.cc                 |    4 +-
 cpp/src/arrow/array-test.cc                        |    2 -
 cpp/src/arrow/array.cc                             |    1 +
 cpp/src/arrow/array.h                              |    1 -
 .../{parquet/arrow => arrow/array}/CMakeLists.txt  |   18 +-
 cpp/src/arrow/array/README.md                      |   20 +
 .../builder_adaptive.cc}                           |    4 +-
 cpp/src/arrow/array/builder_adaptive.h             |  174 +++
 cpp/src/arrow/array/builder_base.cc                |  176 +++
 cpp/src/arrow/array/builder_base.h                 |  227 ++++
 .../{builder-binary.cc => array/builder_binary.cc} |   78 +-
 cpp/src/arrow/array/builder_binary.h               |  304 +++++
 cpp/src/arrow/array/builder_decimal.cc             |   64 ++
 .../string_view.h => array/builder_decimal.h}      |   30 +-
 .../{builder-dict.cc => array/builder_dict.cc}     |    4 +-
 cpp/src/arrow/array/builder_dict.h                 |  167 +++
 cpp/src/arrow/array/builder_nested.cc              |  156 +++
 cpp/src/arrow/array/builder_nested.h               |  121 ++
 cpp/src/arrow/array/builder_primitive.cc           |  272 +++++
 cpp/src/arrow/array/builder_primitive.h            |  401 +++++++
 cpp/src/arrow/builder-benchmark.cc                 |   30 +-
 cpp/src/arrow/builder.cc                           |  503 +--------
 cpp/src/arrow/builder.h                            | 1177 +-------------------
 cpp/src/arrow/compute/compute-test.cc              |   61 +-
 cpp/src/arrow/compute/kernel.h                     |   35 +-
 cpp/src/arrow/csv/column-builder.h                 |   21 +-
 cpp/src/arrow/csv/converter.cc                     |    1 +
 cpp/src/arrow/csv/parser.h                         |    1 +
 cpp/src/arrow/csv/reader.cc                        |    2 +
 cpp/src/arrow/io/buffered.cc                       |    2 +-
 cpp/src/arrow/io/buffered.h                        |    1 +
 cpp/src/arrow/ipc/feather-test.cc                  |    1 +
 cpp/src/arrow/ipc/json-simple-test.cc              |    1 +
 cpp/src/arrow/memory_pool-test.h                   |    1 +
 cpp/src/arrow/memory_pool.cc                       |   12 +-
 cpp/src/arrow/pretty_print-test.cc                 |    4 +-
 cpp/src/arrow/pretty_print.cc                      |    2 +-
 cpp/src/arrow/pretty_print.h                       |    5 +-
 cpp/src/arrow/python/numpy_to_arrow.cc             |   27 +-
 cpp/src/arrow/python/python-test.cc                |    1 +
 cpp/src/arrow/record_batch.h                       |    1 +
 cpp/src/arrow/table.h                              |    5 +
 cpp/src/arrow/tensor.cc                            |    1 +
 cpp/src/arrow/test-util.cc                         |   13 +-
 cpp/src/arrow/test-util.h                          |   18 +-
 cpp/src/arrow/util/compression_lz4.cc              |    1 +
 cpp/src/arrow/util/int-util-test.cc                |    2 -
 cpp/src/arrow/util/string_view.h                   |    2 +-
 cpp/src/parquet/arrow/CMakeLists.txt               |    5 +-
 cpp/src/parquet/arrow/arrow-reader-writer-test.cc  |   15 +-
 cpp/src/parquet/arrow/reader.cc                    |  567 ++++++----
 cpp/src/parquet/arrow/reader.h                     |   38 +-
 cpp/src/parquet/arrow/record_reader.cc             |  103 +-
 cpp/src/parquet/arrow/record_reader.h              |    7 +-
 python/pyarrow/_parquet.pxd                        |    6 +-
 python/pyarrow/_parquet.pyx                        |   23 +-
 python/pyarrow/lib.pxd                             |    2 +
 python/pyarrow/tests/test_parquet.py               |   27 +
 67 files changed, 2945 insertions(+), 2156 deletions(-)

diff --git a/cpp/cmake_modules/SetupCxxFlags.cmake b/cpp/cmake_modules/SetupCxxFlags.cmake
index 893ec36..61fd14c 100644
--- a/cpp/cmake_modules/SetupCxxFlags.cmake
+++ b/cpp/cmake_modules/SetupCxxFlags.cmake
@@ -25,6 +25,9 @@ CHECK_CXX_COMPILER_FLAG("-maltivec" CXX_SUPPORTS_ALTIVEC)
 # Arm64 compiler flags
 CHECK_CXX_COMPILER_FLAG("-march=armv8-a+crc" CXX_SUPPORTS_ARMCRC)
 
+# Support C11
+set(CMAKE_C_STANDARD 11)
+
 # This ensures that things like gnu++11 get passed correctly
 set(CMAKE_CXX_STANDARD 11)
 
diff --git a/cpp/examples/parquet/CMakeLists.txt b/cpp/examples/parquet/CMakeLists.txt
index 98c5cd9..db172a2 100644
--- a/cpp/examples/parquet/CMakeLists.txt
+++ b/cpp/examples/parquet/CMakeLists.txt
@@ -22,7 +22,7 @@ target_include_directories(parquet-low-level-example2 PRIVATE low-level-api/)
 target_link_libraries(parquet-low-level-example parquet_static)
 target_link_libraries(parquet-low-level-example2 parquet_static)
 
-add_executable(parquet-arrow-example parquet-arrow/src/reader-writer.cc)
+add_executable(parquet-arrow-example parquet-arrow/reader-writer.cc)
 target_link_libraries(parquet-arrow-example parquet_shared)
 
 add_dependencies(parquet
diff --git a/cpp/examples/parquet/parquet-arrow/CMakeLists.txt b/cpp/examples/parquet/parquet-arrow/CMakeLists.txt
index d9e01ac..915930e 100644
--- a/cpp/examples/parquet/parquet-arrow/CMakeLists.txt
+++ b/cpp/examples/parquet/parquet-arrow/CMakeLists.txt
@@ -38,5 +38,5 @@ find_package(Parquet)
 
 include_directories(SYSTEM ${ARROW_INCLUDE_DIR} ${PARQUET_INCLUDE_DIR})
 
-add_executable(parquet-arrow-example src/reader-writer.cc)
+add_executable(parquet-arrow-example reader-writer.cc)
 target_link_libraries(parquet-arrow-example ${PARQUET_SHARED_LIB} ${ARROW_SHARED_LIB})
diff --git a/cpp/examples/parquet/parquet-arrow/src/reader-writer.cc b/cpp/examples/parquet/parquet-arrow/reader-writer.cc
similarity index 98%
rename from cpp/examples/parquet/parquet-arrow/src/reader-writer.cc
rename to cpp/examples/parquet/parquet-arrow/reader-writer.cc
index 8d47448..a5f928b 100644
--- a/cpp/examples/parquet/parquet-arrow/src/reader-writer.cc
+++ b/cpp/examples/parquet/parquet-arrow/reader-writer.cc
@@ -100,7 +100,7 @@ void read_single_column() {
   std::unique_ptr<parquet::arrow::FileReader> reader;
   PARQUET_THROW_NOT_OK(
       parquet::arrow::OpenFile(infile, arrow::default_memory_pool(), &reader));
-  std::shared_ptr<arrow::Array> array;
+  std::shared_ptr<arrow::ChunkedArray> array;
   PARQUET_THROW_NOT_OK(reader->ReadColumn(0, &array));
   PARQUET_THROW_NOT_OK(arrow::PrettyPrint(*array, 4, &std::cout));
   std::cout << std::endl;
@@ -119,7 +119,7 @@ void read_single_column_chunk() {
   std::unique_ptr<parquet::arrow::FileReader> reader;
   PARQUET_THROW_NOT_OK(
       parquet::arrow::OpenFile(infile, arrow::default_memory_pool(), &reader));
-  std::shared_ptr<arrow::Array> array;
+  std::shared_ptr<arrow::ChunkedArray> array;
   PARQUET_THROW_NOT_OK(reader->RowGroup(0)->Column(0)->Read(&array));
   PARQUET_THROW_NOT_OK(arrow::PrettyPrint(*array, 4, &std::cout));
   std::cout << std::endl;
diff --git a/cpp/src/arrow/CMakeLists.txt b/cpp/src/arrow/CMakeLists.txt
index e12d2d2..b13c9b6 100644
--- a/cpp/src/arrow/CMakeLists.txt
+++ b/cpp/src/arrow/CMakeLists.txt
@@ -53,11 +53,17 @@ endfunction()
 
 set(ARROW_SRCS
   array.cc
-  buffer.cc
+
   builder.cc
-  builder-adaptive.cc
-  builder-binary.cc
-  builder-dict.cc
+  array/builder_adaptive.cc
+  array/builder_base.cc
+  array/builder_binary.cc
+  array/builder_decimal.cc
+  array/builder_dict.cc
+  array/builder_nested.cc
+  array/builder_primitive.cc
+
+  buffer.cc
   compare.cc
   memory_pool.cc
   pretty_print.cc
@@ -275,6 +281,7 @@ ADD_ARROW_TEST(tensor-test)
 ADD_ARROW_BENCHMARK(builder-benchmark)
 ADD_ARROW_BENCHMARK(column-benchmark)
 
+add_subdirectory(array)
 add_subdirectory(csv)
 add_subdirectory(io)
 add_subdirectory(util)
diff --git a/cpp/src/arrow/allocator-test.cc b/cpp/src/arrow/allocator-test.cc
index cdffbd7..1a94467 100644
--- a/cpp/src/arrow/allocator-test.cc
+++ b/cpp/src/arrow/allocator-test.cc
@@ -17,6 +17,7 @@
 
 #include <cstdint>
 #include <limits>
+#include <memory>
 #include <new>
 
 #include <gtest/gtest.h>
diff --git a/cpp/src/arrow/array-binary-test.cc b/cpp/src/arrow/array-binary-test.cc
index 4376695..6f938c8 100644
--- a/cpp/src/arrow/array-binary-test.cc
+++ b/cpp/src/arrow/array-binary-test.cc
@@ -15,10 +15,8 @@
 // specific language governing permissions and limitations
 // under the License.
 
-#include <algorithm>
 #include <cstdint>
 #include <cstring>
-#include <limits>
 #include <memory>
 #include <string>
 #include <vector>
@@ -28,10 +26,14 @@
 #include "arrow/array.h"
 #include "arrow/buffer.h"
 #include "arrow/builder.h"
+#include "arrow/memory_pool.h"
 #include "arrow/status.h"
 #include "arrow/test-common.h"
 #include "arrow/test-util.h"
 #include "arrow/type.h"
+#include "arrow/type_traits.h"
+#include "arrow/util/bit-util.h"
+#include "arrow/util/checked_cast.h"
 
 namespace arrow {
 
@@ -676,4 +678,112 @@ TEST_F(TestStringArray, TestSliceEquality) { CheckSliceEquality<BinaryType>(); }
 
 TEST_F(TestBinaryArray, LengthZeroCtor) { BinaryArray array(0, nullptr, nullptr); }
 
+// ----------------------------------------------------------------------
+// ChunkedBinaryBuilder tests
+
+class TestChunkedBinaryBuilder : public ::testing::Test {
+ public:
+  void SetUp() {}
+
+  void Init(int32_t chunksize) {
+    builder_.reset(new internal::ChunkedBinaryBuilder(chunksize));
+  }
+
+ protected:
+  std::unique_ptr<internal::ChunkedBinaryBuilder> builder_;
+};
+
+TEST_F(TestChunkedBinaryBuilder, BasicOperation) {
+  const int32_t chunksize = 1000;
+  Init(chunksize);
+
+  const int elem_size = 10;
+  uint8_t buf[elem_size];
+
+  BinaryBuilder unchunked_builder;
+
+  const int iterations = 1000;
+  for (int i = 0; i < iterations; ++i) {
+    random_bytes(elem_size, i, buf);
+
+    ASSERT_OK(unchunked_builder.Append(buf, elem_size));
+    ASSERT_OK(builder_->Append(buf, elem_size));
+  }
+
+  std::shared_ptr<Array> unchunked;
+  ASSERT_OK(unchunked_builder.Finish(&unchunked));
+
+  ArrayVector chunks;
+  ASSERT_OK(builder_->Finish(&chunks));
+
+  // This assumes that everything is evenly divisible
+  ArrayVector expected_chunks;
+  const int elems_per_chunk = chunksize / elem_size;
+  for (int i = 0; i < iterations / elems_per_chunk; ++i) {
+    expected_chunks.emplace_back(unchunked->Slice(i * elems_per_chunk, elems_per_chunk));
+  }
+
+  ASSERT_EQ(expected_chunks.size(), chunks.size());
+  for (size_t i = 0; i < chunks.size(); ++i) {
+    AssertArraysEqual(*expected_chunks[i], *chunks[i]);
+  }
+}
+
+TEST_F(TestChunkedBinaryBuilder, NoData) {
+  Init(1000);
+
+  ArrayVector chunks;
+  ASSERT_OK(builder_->Finish(&chunks));
+
+  ASSERT_EQ(1, chunks.size());
+  ASSERT_EQ(0, chunks[0]->length());
+}
+
+TEST_F(TestChunkedBinaryBuilder, LargeElements) {
+  Init(100);
+
+  const int bufsize = 101;
+  uint8_t buf[bufsize];
+
+  const int iterations = 100;
+  for (int i = 0; i < iterations; ++i) {
+    random_bytes(bufsize, i, buf);
+    ASSERT_OK(builder_->Append(buf, bufsize));
+  }
+
+  ArrayVector chunks;
+  ASSERT_OK(builder_->Finish(&chunks));
+  ASSERT_EQ(iterations, static_cast<int>(chunks.size()));
+
+  int64_t total_data_size = 0;
+  for (auto chunk : chunks) {
+    ASSERT_EQ(1, chunk->length());
+    total_data_size +=
+        static_cast<int64_t>(static_cast<const BinaryArray&>(*chunk).GetView(0).size());
+  }
+  ASSERT_EQ(iterations * bufsize, total_data_size);
+}
+
+TEST(TestChunkedStringBuilder, BasicOperation) {
+  const int chunksize = 100;
+  internal::ChunkedStringBuilder builder(chunksize);
+
+  std::string value = "0123456789";
+
+  const int iterations = 100;
+  for (int i = 0; i < iterations; ++i) {
+    ASSERT_OK(builder.Append(value));
+  }
+
+  ArrayVector chunks;
+  ASSERT_OK(builder.Finish(&chunks));
+
+  ASSERT_EQ(10, chunks.size());
+
+  // Type is correct
+  for (auto chunk : chunks) {
+    ASSERT_TRUE(chunk->type()->Equals(*::arrow::utf8()));
+  }
+}
+
 }  // namespace arrow
diff --git a/cpp/src/arrow/array-dict-test.cc b/cpp/src/arrow/array-dict-test.cc
index cc471a3..87cb229 100644
--- a/cpp/src/arrow/array-dict-test.cc
+++ b/cpp/src/arrow/array-dict-test.cc
@@ -15,23 +15,23 @@
 // specific language governing permissions and limitations
 // under the License.
 
-#include <algorithm>
+#include <array>
 #include <cstdint>
-#include <cstring>
-#include <limits>
 #include <memory>
+#include <ostream>
 #include <string>
 #include <vector>
 
 #include <gtest/gtest.h>
 
 #include "arrow/array.h"
-#include "arrow/buffer.h"
 #include "arrow/builder.h"
+#include "arrow/memory_pool.h"
 #include "arrow/status.h"
 #include "arrow/test-common.h"
 #include "arrow/test-util.h"
 #include "arrow/type.h"
+#include "arrow/util/decimal.h"
 
 namespace arrow {
 
diff --git a/cpp/src/arrow/array-list-test.cc b/cpp/src/arrow/array-list-test.cc
index 207acd4..c49c5e3 100644
--- a/cpp/src/arrow/array-list-test.cc
+++ b/cpp/src/arrow/array-list-test.cc
@@ -15,10 +15,8 @@
 // specific language governing permissions and limitations
 // under the License.
 
-#include <algorithm>
 #include <cstdint>
 #include <cstring>
-#include <limits>
 #include <memory>
 #include <string>
 #include <vector>
@@ -32,6 +30,8 @@
 #include "arrow/test-common.h"
 #include "arrow/test-util.h"
 #include "arrow/type.h"
+#include "arrow/util/bit-util.h"
+#include "arrow/util/checked_cast.h"
 
 namespace arrow {
 
diff --git a/cpp/src/arrow/array-struct-test.cc b/cpp/src/arrow/array-struct-test.cc
index dc8bafd..68c35f5 100644
--- a/cpp/src/arrow/array-struct-test.cc
+++ b/cpp/src/arrow/array-struct-test.cc
@@ -15,10 +15,8 @@
 // specific language governing permissions and limitations
 // under the License.
 
-#include <algorithm>
 #include <cstdint>
 #include <cstring>
-#include <limits>
 #include <memory>
 #include <string>
 #include <vector>
@@ -26,12 +24,12 @@
 #include <gtest/gtest.h>
 
 #include "arrow/array.h"
-#include "arrow/buffer.h"
 #include "arrow/builder.h"
 #include "arrow/status.h"
 #include "arrow/test-common.h"
 #include "arrow/test-util.h"
 #include "arrow/type.h"
+#include "arrow/util/checked_cast.h"
 
 namespace arrow {
 
diff --git a/cpp/src/arrow/array-test.cc b/cpp/src/arrow/array-test.cc
index de0885e..bdb7eda 100644
--- a/cpp/src/arrow/array-test.cc
+++ b/cpp/src/arrow/array-test.cc
@@ -23,7 +23,6 @@
 #include <limits>
 #include <memory>
 #include <numeric>
-#include <ostream>
 #include <string>
 #include <type_traits>
 #include <vector>
@@ -40,7 +39,6 @@
 #include "arrow/test-common.h"
 #include "arrow/test-util.h"
 #include "arrow/type.h"
-#include "arrow/type_traits.h"
 #include "arrow/util/bit-util.h"
 #include "arrow/util/checked_cast.h"
 #include "arrow/util/decimal.h"
diff --git a/cpp/src/arrow/array.cc b/cpp/src/arrow/array.cc
index 05d66d5..ff94aa2 100644
--- a/cpp/src/arrow/array.cc
+++ b/cpp/src/arrow/array.cc
@@ -18,6 +18,7 @@
 #include "arrow/array.h"
 
 #include <algorithm>
+#include <cstddef>
 #include <cstdint>
 #include <limits>
 #include <sstream>
diff --git a/cpp/src/arrow/array.h b/cpp/src/arrow/array.h
index 37fa5ae..52c5207 100644
--- a/cpp/src/arrow/array.h
+++ b/cpp/src/arrow/array.h
@@ -18,7 +18,6 @@
 #ifndef ARROW_ARRAY_H
 #define ARROW_ARRAY_H
 
-#include <cstddef>
 #include <cstdint>
 #include <iosfwd>
 #include <memory>
diff --git a/cpp/src/parquet/arrow/CMakeLists.txt b/cpp/src/arrow/array/CMakeLists.txt
similarity index 76%
copy from cpp/src/parquet/arrow/CMakeLists.txt
copy to cpp/src/arrow/array/CMakeLists.txt
index 9372c31..a789c88 100644
--- a/cpp/src/parquet/arrow/CMakeLists.txt
+++ b/cpp/src/arrow/array/CMakeLists.txt
@@ -15,11 +15,13 @@
 # specific language governing permissions and limitations
 # under the License.
 
-ADD_PARQUET_TEST(arrow-schema-test)
-ADD_PARQUET_TEST(arrow-reader-writer-test)
-
-ADD_ARROW_BENCHMARK(reader-writer-benchmark
-  PREFIX "parquet-arrow"
-  EXTRA_LINK_LIBS ${PARQUET_BENCHMARK_LINK_LIBRARIES})
-
-ARROW_INSTALL_ALL_HEADERS("parquet/arrow")
+# Headers: top level
+install(FILES
+  builder_adaptive.h
+  builder_base.h
+  builder_binary.h
+  builder_decimal.h
+  builder_dict.h
+  builder_nested.h
+  builder_primitive.h
+  DESTINATION "${CMAKE_INSTALL_INCLUDEDIR}/arrow/array")
diff --git a/cpp/src/arrow/array/README.md b/cpp/src/arrow/array/README.md
new file mode 100644
index 0000000..0958019
--- /dev/null
+++ b/cpp/src/arrow/array/README.md
@@ -0,0 +1,20 @@
+<!---
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing,
+  software distributed under the License is distributed on an
+  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  KIND, either express or implied.  See the License for the
+  specific language governing permissions and limitations
+  under the License.
+-->
+
+## Implementation details related to columnnar (array) data structures
diff --git a/cpp/src/arrow/builder-adaptive.cc b/cpp/src/arrow/array/builder_adaptive.cc
similarity index 99%
rename from cpp/src/arrow/builder-adaptive.cc
rename to cpp/src/arrow/array/builder_adaptive.cc
index a715f46..599e9e1 100644
--- a/cpp/src/arrow/builder-adaptive.cc
+++ b/cpp/src/arrow/array/builder_adaptive.cc
@@ -15,13 +15,15 @@
 // specific language governing permissions and limitations
 // under the License.
 
+#include "arrow/array/builder_adaptive.h"
+
+#include <algorithm>
 #include <cstddef>
 #include <cstdint>
 #include <utility>
 
 #include "arrow/array.h"
 #include "arrow/buffer.h"
-#include "arrow/builder.h"
 #include "arrow/status.h"
 #include "arrow/type.h"
 #include "arrow/type_traits.h"
diff --git a/cpp/src/arrow/array/builder_adaptive.h b/cpp/src/arrow/array/builder_adaptive.h
new file mode 100644
index 0000000..6523de4
--- /dev/null
+++ b/cpp/src/arrow/array/builder_adaptive.h
@@ -0,0 +1,174 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <memory>
+
+#include "arrow/array/builder_base.h"
+
+namespace arrow {
+
+namespace internal {
+
+class ARROW_EXPORT AdaptiveIntBuilderBase : public ArrayBuilder {
+ public:
+  explicit AdaptiveIntBuilderBase(MemoryPool* pool);
+
+  /// Write nulls as uint8_t* (0 value indicates null) into pre-allocated memory
+  Status AppendNulls(const uint8_t* valid_bytes, int64_t length) {
+    ARROW_RETURN_NOT_OK(CommitPendingData());
+    ARROW_RETURN_NOT_OK(Reserve(length));
+    memset(data_->mutable_data() + length_ * int_size_, 0, int_size_ * length);
+    UnsafeAppendToBitmap(valid_bytes, length);
+    return Status::OK();
+  }
+
+  Status AppendNull() {
+    pending_data_[pending_pos_] = 0;
+    pending_valid_[pending_pos_] = 0;
+    pending_has_nulls_ = true;
+    ++pending_pos_;
+
+    if (ARROW_PREDICT_FALSE(pending_pos_ >= pending_size_)) {
+      return CommitPendingData();
+    }
+    return Status::OK();
+  }
+
+  void Reset() override;
+  Status Resize(int64_t capacity) override;
+
+ protected:
+  virtual Status CommitPendingData() = 0;
+
+  std::shared_ptr<ResizableBuffer> data_;
+  uint8_t* raw_data_;
+  uint8_t int_size_;
+
+  static constexpr int32_t pending_size_ = 1024;
+  uint8_t pending_valid_[pending_size_];
+  uint64_t pending_data_[pending_size_];
+  int32_t pending_pos_;
+  bool pending_has_nulls_;
+};
+
+}  // namespace internal
+
+class ARROW_EXPORT AdaptiveUIntBuilder : public internal::AdaptiveIntBuilderBase {
+ public:
+  explicit AdaptiveUIntBuilder(MemoryPool* pool ARROW_MEMORY_POOL_DEFAULT);
+
+  using ArrayBuilder::Advance;
+  using internal::AdaptiveIntBuilderBase::Reset;
+
+  /// Scalar append
+  Status Append(const uint64_t val) {
+    pending_data_[pending_pos_] = val;
+    pending_valid_[pending_pos_] = 1;
+    ++pending_pos_;
+
+    if (ARROW_PREDICT_FALSE(pending_pos_ >= pending_size_)) {
+      return CommitPendingData();
+    }
+    return Status::OK();
+  }
+
+  /// \brief Append a sequence of elements in one shot
+  /// \param[in] values a contiguous C array of values
+  /// \param[in] length the number of values to append
+  /// \param[in] valid_bytes an optional sequence of bytes where non-zero
+  /// indicates a valid (non-null) value
+  /// \return Status
+  Status AppendValues(const uint64_t* values, int64_t length,
+                      const uint8_t* valid_bytes = NULLPTR);
+
+  Status FinishInternal(std::shared_ptr<ArrayData>* out) override;
+
+ protected:
+  Status CommitPendingData() override;
+  Status ExpandIntSize(uint8_t new_int_size);
+
+  Status AppendValuesInternal(const uint64_t* values, int64_t length,
+                              const uint8_t* valid_bytes);
+
+  template <typename new_type, typename old_type>
+  typename std::enable_if<sizeof(old_type) >= sizeof(new_type), Status>::type
+  ExpandIntSizeInternal();
+#define __LESS(a, b) (a) < (b)
+  template <typename new_type, typename old_type>
+  typename std::enable_if<__LESS(sizeof(old_type), sizeof(new_type)), Status>::type
+  ExpandIntSizeInternal();
+#undef __LESS
+
+  template <typename new_type>
+  Status ExpandIntSizeN();
+};
+
+class ARROW_EXPORT AdaptiveIntBuilder : public internal::AdaptiveIntBuilderBase {
+ public:
+  explicit AdaptiveIntBuilder(MemoryPool* pool ARROW_MEMORY_POOL_DEFAULT);
+
+  using ArrayBuilder::Advance;
+  using internal::AdaptiveIntBuilderBase::Reset;
+
+  /// Scalar append
+  Status Append(const int64_t val) {
+    auto v = static_cast<uint64_t>(val);
+
+    pending_data_[pending_pos_] = v;
+    pending_valid_[pending_pos_] = 1;
+    ++pending_pos_;
+
+    if (ARROW_PREDICT_FALSE(pending_pos_ >= pending_size_)) {
+      return CommitPendingData();
+    }
+    return Status::OK();
+  }
+
+  /// \brief Append a sequence of elements in one shot
+  /// \param[in] values a contiguous C array of values
+  /// \param[in] length the number of values to append
+  /// \param[in] valid_bytes an optional sequence of bytes where non-zero
+  /// indicates a valid (non-null) value
+  /// \return Status
+  Status AppendValues(const int64_t* values, int64_t length,
+                      const uint8_t* valid_bytes = NULLPTR);
+
+  Status FinishInternal(std::shared_ptr<ArrayData>* out) override;
+
+ protected:
+  Status CommitPendingData() override;
+  Status ExpandIntSize(uint8_t new_int_size);
+
+  Status AppendValuesInternal(const int64_t* values, int64_t length,
+                              const uint8_t* valid_bytes);
+
+  template <typename new_type, typename old_type>
+  typename std::enable_if<sizeof(old_type) >= sizeof(new_type), Status>::type
+  ExpandIntSizeInternal();
+#define __LESS(a, b) (a) < (b)
+  template <typename new_type, typename old_type>
+  typename std::enable_if<__LESS(sizeof(old_type), sizeof(new_type)), Status>::type
+  ExpandIntSizeInternal();
+#undef __LESS
+
+  template <typename new_type>
+  Status ExpandIntSizeN();
+};
+
+}  // namespace arrow
diff --git a/cpp/src/arrow/array/builder_base.cc b/cpp/src/arrow/array/builder_base.cc
new file mode 100644
index 0000000..321aa44
--- /dev/null
+++ b/cpp/src/arrow/array/builder_base.cc
@@ -0,0 +1,176 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/array/builder_base.h"
+
+#include <algorithm>
+#include <cstddef>
+#include <cstdint>
+#include <cstring>
+#include <sstream>
+#include <utility>
+#include <vector>
+
+#include "arrow/array.h"
+#include "arrow/buffer.h"
+#include "arrow/status.h"
+#include "arrow/type.h"
+#include "arrow/type_traits.h"
+#include "arrow/util/bit-util.h"
+#include "arrow/util/int-util.h"
+#include "arrow/util/logging.h"
+
+namespace arrow {
+
+Status ArrayBuilder::TrimBuffer(const int64_t bytes_filled, ResizableBuffer* buffer) {
+  if (buffer) {
+    if (bytes_filled < buffer->size()) {
+      // Trim buffer
+      RETURN_NOT_OK(buffer->Resize(bytes_filled));
+    }
+    // zero the padding
+    buffer->ZeroPadding();
+  } else {
+    // Null buffers are allowed in place of 0-byte buffers
+    DCHECK_EQ(bytes_filled, 0);
+  }
+  return Status::OK();
+}
+
+Status ArrayBuilder::AppendToBitmap(bool is_valid) {
+  if (length_ == capacity_) {
+    // If the capacity was not already a multiple of 2, do so here
+    // TODO(emkornfield) doubling isn't great default allocation practice
+    // see https://github.com/facebook/folly/blob/master/folly/docs/FBVector.md
+    // fo discussion
+    RETURN_NOT_OK(Resize(BitUtil::NextPower2(capacity_ + 1)));
+  }
+  UnsafeAppendToBitmap(is_valid);
+  return Status::OK();
+}
+
+Status ArrayBuilder::AppendToBitmap(const uint8_t* valid_bytes, int64_t length) {
+  RETURN_NOT_OK(Reserve(length));
+
+  UnsafeAppendToBitmap(valid_bytes, length);
+  return Status::OK();
+}
+
+Status ArrayBuilder::Resize(int64_t capacity) {
+  // Target size of validity (null) bitmap data
+  const int64_t new_bitmap_size = BitUtil::BytesForBits(capacity);
+  RETURN_NOT_OK(CheckCapacity(capacity, capacity_));
+
+  if (capacity_ == 0) {
+    RETURN_NOT_OK(AllocateResizableBuffer(pool_, new_bitmap_size, &null_bitmap_));
+    null_bitmap_data_ = null_bitmap_->mutable_data();
+
+    // Padding is zeroed by AllocateResizableBuffer
+    memset(null_bitmap_data_, 0, static_cast<size_t>(new_bitmap_size));
+  } else {
+    const int64_t old_bitmap_capacity = null_bitmap_->capacity();
+    RETURN_NOT_OK(null_bitmap_->Resize(new_bitmap_size));
+
+    const int64_t new_bitmap_capacity = null_bitmap_->capacity();
+    null_bitmap_data_ = null_bitmap_->mutable_data();
+
+    // Zero the region between the original capacity and the new capacity,
+    // including padding, which has not been zeroed, unlike
+    // AllocateResizableBuffer
+    if (old_bitmap_capacity < new_bitmap_capacity) {
+      memset(null_bitmap_data_ + old_bitmap_capacity, 0,
+             static_cast<size_t>(new_bitmap_capacity - old_bitmap_capacity));
+    }
+  }
+  capacity_ = capacity;
+  return Status::OK();
+}
+
+Status ArrayBuilder::Advance(int64_t elements) {
+  if (length_ + elements > capacity_) {
+    return Status::Invalid("Builder must be expanded");
+  }
+  length_ += elements;
+  return Status::OK();
+}
+
+Status ArrayBuilder::Finish(std::shared_ptr<Array>* out) {
+  std::shared_ptr<ArrayData> internal_data;
+  RETURN_NOT_OK(FinishInternal(&internal_data));
+  *out = MakeArray(internal_data);
+  return Status::OK();
+}
+
+Status ArrayBuilder::Reserve(int64_t additional_elements) {
+  if (length_ + additional_elements > capacity_) {
+    // TODO(emkornfield) power of 2 growth is potentially suboptimal
+    int64_t new_size = BitUtil::NextPower2(length_ + additional_elements);
+    return Resize(new_size);
+  }
+  return Status::OK();
+}
+
+void ArrayBuilder::Reset() {
+  capacity_ = length_ = null_count_ = 0;
+  null_bitmap_ = nullptr;
+}
+
+Status ArrayBuilder::SetNotNull(int64_t length) {
+  RETURN_NOT_OK(Reserve(length));
+  UnsafeSetNotNull(length);
+  return Status::OK();
+}
+
+void ArrayBuilder::UnsafeAppendToBitmap(const uint8_t* valid_bytes, int64_t length) {
+  if (valid_bytes == nullptr) {
+    UnsafeSetNotNull(length);
+    return;
+  }
+  UnsafeAppendToBitmap(valid_bytes, valid_bytes + length);
+}
+
+void ArrayBuilder::UnsafeAppendToBitmap(const std::vector<bool>& is_valid) {
+  UnsafeAppendToBitmap(is_valid.begin(), is_valid.end());
+}
+
+void ArrayBuilder::UnsafeSetNotNull(int64_t length) {
+  const int64_t new_length = length + length_;
+
+  // Fill up the bytes until we have a byte alignment
+  int64_t pad_to_byte = std::min<int64_t>(8 - (length_ % 8), length);
+
+  if (pad_to_byte == 8) {
+    pad_to_byte = 0;
+  }
+  for (int64_t i = length_; i < length_ + pad_to_byte; ++i) {
+    BitUtil::SetBit(null_bitmap_data_, i);
+  }
+
+  // Fast bitsetting
+  int64_t fast_length = (length - pad_to_byte) / 8;
+  memset(null_bitmap_data_ + ((length_ + pad_to_byte) / 8), 0xFF,
+         static_cast<size_t>(fast_length));
+
+  // Trailing bits
+  for (int64_t i = length_ + pad_to_byte + (fast_length * 8); i < new_length; ++i) {
+    BitUtil::SetBit(null_bitmap_data_, i);
+  }
+
+  length_ = new_length;
+}
+
+}  // namespace arrow
diff --git a/cpp/src/arrow/array/builder_base.h b/cpp/src/arrow/array/builder_base.h
new file mode 100644
index 0000000..ae400fc
--- /dev/null
+++ b/cpp/src/arrow/array/builder_base.h
@@ -0,0 +1,227 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "arrow/array/builder_base.h"
+
+#include <algorithm>  // IWYU pragma: keep
+#include <array>
+#include <cstddef>
+#include <cstdint>
+#include <cstring>
+#include <iterator>
+#include <limits>
+#include <memory>
+#include <string>
+#include <type_traits>
+#include <vector>
+
+#include "arrow/buffer.h"
+#include "arrow/memory_pool.h"
+#include "arrow/status.h"
+#include "arrow/type.h"
+#include "arrow/type_traits.h"
+#include "arrow/util/bit-util.h"
+#include "arrow/util/macros.h"
+#include "arrow/util/string_view.h"
+#include "arrow/util/type_traits.h"
+#include "arrow/util/visibility.h"
+
+namespace arrow {
+
+class Array;
+struct ArrayData;
+
+constexpr int64_t kMinBuilderCapacity = 1 << 5;
+constexpr int64_t kListMaximumElements = std::numeric_limits<int32_t>::max() - 1;
+
+/// Base class for all data array builders.
+///
+/// This class provides a facilities for incrementally building the null bitmap
+/// (see Append methods) and as a side effect the current number of slots and
+/// the null count.
+///
+/// \note Users are expected to use builders as one of the concrete types below.
+/// For example, ArrayBuilder* pointing to BinaryBuilder should be downcast before use.
+class ARROW_EXPORT ArrayBuilder {
+ public:
+  explicit ArrayBuilder(const std::shared_ptr<DataType>& type, MemoryPool* pool)
+      : type_(type),
+        pool_(pool),
+        null_bitmap_(NULLPTR),
+        null_count_(0),
+        null_bitmap_data_(NULLPTR),
+        length_(0),
+        capacity_(0) {}
+
+  virtual ~ArrayBuilder() = default;
+
+  /// For nested types. Since the objects are owned by this class instance, we
+  /// skip shared pointers and just return a raw pointer
+  ArrayBuilder* child(int i) { return children_[i].get(); }
+
+  int num_children() const { return static_cast<int>(children_.size()); }
+
+  int64_t length() const { return length_; }
+  int64_t null_count() const { return null_count_; }
+  int64_t capacity() const { return capacity_; }
+
+  /// \brief Ensure that enough memory has been allocated to fit the indicated
+  /// number of total elements in the builder, including any that have already
+  /// been appended. Does not account for reallocations that may be due to
+  /// variable size data, like binary values. To make space for incremental
+  /// appends, use Reserve instead.
+  ///
+  /// \param[in] capacity the minimum number of total array values to
+  ///            accommodate. Must be greater than the current capacity.
+  /// \return Status
+  virtual Status Resize(int64_t capacity);
+
+  /// \brief Ensure that there is enough space allocated to add the indicated
+  /// number of elements without any further calls to Resize. The memory
+  /// allocated is rounded up to the next highest power of 2 similar to memory
+  /// allocations in STL containers like std::vector
+  /// \param[in] additional_capacity the number of additional array values
+  /// \return Status
+  Status Reserve(int64_t additional_capacity);
+
+  /// Reset the builder.
+  virtual void Reset();
+
+  /// For cases where raw data was memcpy'd into the internal buffers, allows us
+  /// to advance the length of the builder. It is your responsibility to use
+  /// this function responsibly.
+  Status Advance(int64_t elements);
+
+  /// \brief Return result of builder as an internal generic ArrayData
+  /// object. Resets builder except for dictionary builder
+  ///
+  /// \param[out] out the finalized ArrayData object
+  /// \return Status
+  virtual Status FinishInternal(std::shared_ptr<ArrayData>* out) = 0;
+
+  /// \brief Return result of builder as an Array object.
+  ///
+  /// The builder is reset except for DictionaryBuilder.
+  ///
+  /// \param[out] out the finalized Array object
+  /// \return Status
+  Status Finish(std::shared_ptr<Array>* out);
+
+  std::shared_ptr<DataType> type() const { return type_; }
+
+ protected:
+  ArrayBuilder() {}
+
+  /// Append to null bitmap
+  Status AppendToBitmap(bool is_valid);
+
+  /// Vector append. Treat each zero byte as a null.   If valid_bytes is null
+  /// assume all of length bits are valid.
+  Status AppendToBitmap(const uint8_t* valid_bytes, int64_t length);
+
+  /// Set the next length bits to not null (i.e. valid).
+  Status SetNotNull(int64_t length);
+
+  // Unsafe operations (don't check capacity/don't resize)
+
+  void UnsafeAppendNull() { UnsafeAppendToBitmap(false); }
+
+  // Append to null bitmap, update the length
+  void UnsafeAppendToBitmap(bool is_valid) {
+    if (is_valid) {
+      BitUtil::SetBit(null_bitmap_data_, length_);
+    } else {
+      ++null_count_;
+    }
+    ++length_;
+  }
+
+  template <typename IterType>
+  void UnsafeAppendToBitmap(const IterType& begin, const IterType& end) {
+    int64_t byte_offset = length_ / 8;
+    int64_t bit_offset = length_ % 8;
+    uint8_t bitset = null_bitmap_data_[byte_offset];
+
+    for (auto iter = begin; iter != end; ++iter) {
+      if (bit_offset == 8) {
+        bit_offset = 0;
+        null_bitmap_data_[byte_offset] = bitset;
+        byte_offset++;
+        // TODO: Except for the last byte, this shouldn't be needed
+        bitset = null_bitmap_data_[byte_offset];
+      }
+
+      if (*iter) {
+        bitset |= BitUtil::kBitmask[bit_offset];
+      } else {
+        bitset &= BitUtil::kFlippedBitmask[bit_offset];
+        ++null_count_;
+      }
+
+      bit_offset++;
+    }
+
+    if (bit_offset != 0) {
+      null_bitmap_data_[byte_offset] = bitset;
+    }
+
+    length_ += std::distance(begin, end);
+  }
+
+  // Vector append. Treat each zero byte as a nullzero. If valid_bytes is null
+  // assume all of length bits are valid.
+  void UnsafeAppendToBitmap(const uint8_t* valid_bytes, int64_t length);
+
+  void UnsafeAppendToBitmap(const std::vector<bool>& is_valid);
+
+  // Set the next length bits to not null (i.e. valid).
+  void UnsafeSetNotNull(int64_t length);
+
+  static Status TrimBuffer(const int64_t bytes_filled, ResizableBuffer* buffer);
+
+  static Status CheckCapacity(int64_t new_capacity, int64_t old_capacity) {
+    if (new_capacity < 0) {
+      return Status::Invalid("Resize capacity must be positive");
+    }
+    if (new_capacity < old_capacity) {
+      return Status::Invalid("Resize cannot downsize");
+    }
+    return Status::OK();
+  }
+
+  std::shared_ptr<DataType> type_;
+  MemoryPool* pool_;
+
+  // When null_bitmap are first appended to the builder, the null bitmap is allocated
+  std::shared_ptr<ResizableBuffer> null_bitmap_;
+  int64_t null_count_;
+  uint8_t* null_bitmap_data_;
+
+  // Array length, so far. Also, the index of the next element to be added
+  int64_t length_;
+  int64_t capacity_;
+
+  // Child value array builders. These are owned by this class
+  std::vector<std::shared_ptr<ArrayBuilder>> children_;
+
+ private:
+  ARROW_DISALLOW_COPY_AND_ASSIGN(ArrayBuilder);
+};
+
+}  // namespace arrow
diff --git a/cpp/src/arrow/builder-binary.cc b/cpp/src/arrow/array/builder_binary.cc
similarity index 86%
rename from cpp/src/arrow/builder-binary.cc
rename to cpp/src/arrow/array/builder_binary.cc
index c250837..ad6ba11 100644
--- a/cpp/src/arrow/builder-binary.cc
+++ b/cpp/src/arrow/array/builder_binary.cc
@@ -15,6 +15,8 @@
 // specific language governing permissions and limitations
 // under the License.
 
+#include "arrow/array/builder_binary.h"
+
 #include <algorithm>
 #include <cstddef>
 #include <cstdint>
@@ -27,7 +29,6 @@
 
 #include "arrow/array.h"
 #include "arrow/buffer.h"
-#include "arrow/builder.h"
 #include "arrow/status.h"
 #include "arrow/type.h"
 #include "arrow/type_traits.h"
@@ -68,32 +69,11 @@ Status BinaryBuilder::ReserveData(int64_t elements) {
   return Status::OK();
 }
 
-Status BinaryBuilder::AppendNextOffset() {
-  const int64_t num_bytes = value_data_builder_.length();
-  if (ARROW_PREDICT_FALSE(num_bytes > kBinaryMemoryLimit)) {
-    std::stringstream ss;
-    ss << "BinaryArray cannot contain more than " << kBinaryMemoryLimit << " bytes, have "
-       << num_bytes;
-    return Status::CapacityError(ss.str());
-  }
-  return offsets_builder_.Append(static_cast<int32_t>(num_bytes));
-}
-
-Status BinaryBuilder::Append(const uint8_t* value, int32_t length) {
-  RETURN_NOT_OK(Reserve(1));
-  RETURN_NOT_OK(AppendNextOffset());
-  RETURN_NOT_OK(value_data_builder_.Append(value, length));
-
-  UnsafeAppendToBitmap(true);
-  return Status::OK();
-}
-
-Status BinaryBuilder::AppendNull() {
-  RETURN_NOT_OK(AppendNextOffset());
-  RETURN_NOT_OK(Reserve(1));
-
-  UnsafeAppendToBitmap(false);
-  return Status::OK();
+Status BinaryBuilder::AppendOverflow(int64_t num_bytes) {
+  std::stringstream ss;
+  ss << "BinaryArray cannot contain more than " << kBinaryMemoryLimit << " bytes, have "
+     << num_bytes;
+  return Status::CapacityError(ss.str());
 }
 
 Status BinaryBuilder::FinishInternal(std::shared_ptr<ArrayData>* out) {
@@ -292,24 +272,46 @@ util::string_view FixedSizeBinaryBuilder::GetView(int64_t i) const {
 }
 
 // ----------------------------------------------------------------------
-// Decimal128Builder
+// ChunkedArray builders
 
-Decimal128Builder::Decimal128Builder(const std::shared_ptr<DataType>& type,
-                                     MemoryPool* pool)
-    : FixedSizeBinaryBuilder(type, pool) {}
+namespace internal {
 
-Status Decimal128Builder::Append(const Decimal128& value) {
-  RETURN_NOT_OK(FixedSizeBinaryBuilder::Reserve(1));
-  return FixedSizeBinaryBuilder::Append(value.ToBytes());
+ChunkedBinaryBuilder::ChunkedBinaryBuilder(int32_t max_chunk_size, MemoryPool* pool)
+    : max_chunk_size_(max_chunk_size),
+      chunk_data_size_(0),
+      builder_(new BinaryBuilder(pool)) {}
+
+Status ChunkedBinaryBuilder::Finish(ArrayVector* out) {
+  if (builder_->length() > 0 || chunks_.size() == 0) {
+    std::shared_ptr<Array> chunk;
+    RETURN_NOT_OK(builder_->Finish(&chunk));
+    chunks_.emplace_back(std::move(chunk));
+  }
+  *out = std::move(chunks_);
+  return Status::OK();
 }
 
-Status Decimal128Builder::FinishInternal(std::shared_ptr<ArrayData>* out) {
-  std::shared_ptr<Buffer> data;
-  RETURN_NOT_OK(byte_builder_.Finish(&data));
+Status ChunkedBinaryBuilder::NextChunk() {
+  std::shared_ptr<Array> chunk;
+  RETURN_NOT_OK(builder_->Finish(&chunk));
+  chunks_.emplace_back(std::move(chunk));
 
-  *out = ArrayData::Make(type_, length_, {null_bitmap_, data}, null_count_);
+  chunk_data_size_ = 0;
+  return Status::OK();
+}
 
+Status ChunkedStringBuilder::Finish(ArrayVector* out) {
+  RETURN_NOT_OK(ChunkedBinaryBuilder::Finish(out));
+
+  // Change data type to string/utf8
+  for (size_t i = 0; i < out->size(); ++i) {
+    std::shared_ptr<ArrayData> data = (*out)[i]->data();
+    data->type = ::arrow::utf8();
+    (*out)[i] = std::make_shared<StringArray>(data);
+  }
   return Status::OK();
 }
 
+}  // namespace internal
+
 }  // namespace arrow
diff --git a/cpp/src/arrow/array/builder_binary.h b/cpp/src/arrow/array/builder_binary.h
new file mode 100644
index 0000000..7c101bd
--- /dev/null
+++ b/cpp/src/arrow/array/builder_binary.h
@@ -0,0 +1,304 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <limits>
+#include <memory>
+#include <sstream>
+#include <string>
+#include <vector>
+
+#include "arrow/array.h"
+#include "arrow/array/builder_base.h"
+#include "arrow/status.h"
+#include "arrow/type_traits.h"
+#include "arrow/util/macros.h"
+#include "arrow/util/string_view.h"
+
+namespace arrow {
+
+constexpr int64_t kBinaryMemoryLimit = std::numeric_limits<int32_t>::max() - 1;
+
+// ----------------------------------------------------------------------
+// Binary and String
+
+/// \class BinaryBuilder
+/// \brief Builder class for variable-length binary data
+class ARROW_EXPORT BinaryBuilder : public ArrayBuilder {
+ public:
+  explicit BinaryBuilder(MemoryPool* pool ARROW_MEMORY_POOL_DEFAULT);
+
+  BinaryBuilder(const std::shared_ptr<DataType>& type, MemoryPool* pool);
+
+  Status Append(const uint8_t* value, int32_t length) {
+    ARROW_RETURN_NOT_OK(Reserve(1));
+    ARROW_RETURN_NOT_OK(AppendNextOffset());
+    ARROW_RETURN_NOT_OK(value_data_builder_.Append(value, length));
+
+    UnsafeAppendToBitmap(true);
+    return Status::OK();
+  }
+
+  Status AppendNull() {
+    ARROW_RETURN_NOT_OK(AppendNextOffset());
+    ARROW_RETURN_NOT_OK(Reserve(1));
+    UnsafeAppendToBitmap(false);
+    return Status::OK();
+  }
+
+  Status Append(const char* value, int32_t length) {
+    return Append(reinterpret_cast<const uint8_t*>(value), length);
+  }
+
+  Status Append(util::string_view value) {
+    return Append(value.data(), static_cast<int32_t>(value.size()));
+  }
+
+  /// \brief Append without checking capacity
+  ///
+  /// Offsets and data should have been presized using Reserve() and
+  /// ReserveData(), respectively.
+  void UnsafeAppend(const uint8_t* value, int32_t length) {
+    UnsafeAppendNextOffset();
+    value_data_builder_.UnsafeAppend(value, length);
+    UnsafeAppendToBitmap(true);
+  }
+
+  void UnsafeAppend(const char* value, int32_t length) {
+    UnsafeAppend(reinterpret_cast<const uint8_t*>(value), length);
+  }
+
+  void UnsafeAppend(const std::string& value) {
+    UnsafeAppend(value.c_str(), static_cast<int32_t>(value.size()));
+  }
+
+  void UnsafeAppendNull() {
+    const int64_t num_bytes = value_data_builder_.length();
+    offsets_builder_.UnsafeAppend(static_cast<int32_t>(num_bytes));
+    UnsafeAppendToBitmap(false);
+  }
+
+  void Reset() override;
+  Status Resize(int64_t capacity) override;
+
+  /// \brief Ensures there is enough allocated capacity to append the indicated
+  /// number of bytes to the value data buffer without additional allocations
+  Status ReserveData(int64_t elements);
+
+  Status FinishInternal(std::shared_ptr<ArrayData>* out) override;
+
+  /// \return size of values buffer so far
+  int64_t value_data_length() const { return value_data_builder_.length(); }
+  /// \return capacity of values buffer
+  int64_t value_data_capacity() const { return value_data_builder_.capacity(); }
+
+  /// Temporary access to a value.
+  ///
+  /// This pointer becomes invalid on the next modifying operation.
+  const uint8_t* GetValue(int64_t i, int32_t* out_length) const;
+
+  /// Temporary access to a value.
+  ///
+  /// This view becomes invalid on the next modifying operation.
+  util::string_view GetView(int64_t i) const;
+
+ protected:
+  TypedBufferBuilder<int32_t> offsets_builder_;
+  TypedBufferBuilder<uint8_t> value_data_builder_;
+
+  Status AppendOverflow(int64_t num_bytes);
+
+  Status AppendNextOffset() {
+    const int64_t num_bytes = value_data_builder_.length();
+    if (ARROW_PREDICT_FALSE(num_bytes > kBinaryMemoryLimit)) {
+      return AppendOverflow(num_bytes);
+    }
+    return offsets_builder_.Append(static_cast<int32_t>(num_bytes));
+  }
+
+  void UnsafeAppendNextOffset() {
+    const int64_t num_bytes = value_data_builder_.length();
+    offsets_builder_.UnsafeAppend(static_cast<int32_t>(num_bytes));
+  }
+};
+
+/// \class StringBuilder
+/// \brief Builder class for UTF8 strings
+class ARROW_EXPORT StringBuilder : public BinaryBuilder {
+ public:
+  using BinaryBuilder::BinaryBuilder;
+  explicit StringBuilder(MemoryPool* pool ARROW_MEMORY_POOL_DEFAULT);
+
+  using BinaryBuilder::Append;
+  using BinaryBuilder::Reset;
+  using BinaryBuilder::UnsafeAppend;
+
+  /// \brief Append a sequence of strings in one shot.
+  ///
+  /// \param[in] values a vector of strings
+  /// \param[in] valid_bytes an optional sequence of bytes where non-zero
+  /// indicates a valid (non-null) value
+  /// \return Status
+  Status AppendValues(const std::vector<std::string>& values,
+                      const uint8_t* valid_bytes = NULLPTR);
+
+  /// \brief Append a sequence of nul-terminated strings in one shot.
+  ///        If one of the values is NULL, it is processed as a null
+  ///        value even if the corresponding valid_bytes entry is 1.
+  ///
+  /// \param[in] values a contiguous C array of nul-terminated char *
+  /// \param[in] length the number of values to append
+  /// \param[in] valid_bytes an optional sequence of bytes where non-zero
+  /// indicates a valid (non-null) value
+  /// \return Status
+  Status AppendValues(const char** values, int64_t length,
+                      const uint8_t* valid_bytes = NULLPTR);
+};
+
+// ----------------------------------------------------------------------
+// FixedSizeBinaryBuilder
+
+class ARROW_EXPORT FixedSizeBinaryBuilder : public ArrayBuilder {
+ public:
+  FixedSizeBinaryBuilder(const std::shared_ptr<DataType>& type,
+                         MemoryPool* pool ARROW_MEMORY_POOL_DEFAULT);
+
+  Status Append(const uint8_t* value) {
+    ARROW_RETURN_NOT_OK(Reserve(1));
+    UnsafeAppendToBitmap(true);
+    return byte_builder_.Append(value, byte_width_);
+  }
+
+  Status Append(const char* value) {
+    return Append(reinterpret_cast<const uint8_t*>(value));
+  }
+
+  Status Append(const util::string_view& view) {
+#ifndef NDEBUG
+    CheckValueSize(static_cast<int64_t>(view.size()));
+#endif
+    return Append(reinterpret_cast<const uint8_t*>(view.data()));
+  }
+
+  Status Append(const std::string& s) {
+#ifndef NDEBUG
+    CheckValueSize(static_cast<int64_t>(s.size()));
+#endif
+    return Append(reinterpret_cast<const uint8_t*>(s.data()));
+  }
+
+  template <size_t NBYTES>
+  Status Append(const std::array<uint8_t, NBYTES>& value) {
+    ARROW_RETURN_NOT_OK(Reserve(1));
+    UnsafeAppendToBitmap(true);
+    return byte_builder_.Append(value);
+  }
+
+  Status AppendValues(const uint8_t* data, int64_t length,
+                      const uint8_t* valid_bytes = NULLPTR);
+  Status AppendNull();
+
+  void Reset() override;
+  Status Resize(int64_t capacity) override;
+  Status FinishInternal(std::shared_ptr<ArrayData>* out) override;
+
+  /// \return size of values buffer so far
+  int64_t value_data_length() const { return byte_builder_.length(); }
+
+  int32_t byte_width() const { return byte_width_; }
+
+  /// Temporary access to a value.
+  ///
+  /// This pointer becomes invalid on the next modifying operation.
+  const uint8_t* GetValue(int64_t i) const;
+
+  /// Temporary access to a value.
+  ///
+  /// This view becomes invalid on the next modifying operation.
+  util::string_view GetView(int64_t i) const;
+
+ protected:
+  int32_t byte_width_;
+  BufferBuilder byte_builder_;
+
+#ifndef NDEBUG
+  void CheckValueSize(int64_t size);
+#endif
+};
+
+// ----------------------------------------------------------------------
+// Chunked builders: build a sequence of BinaryArray or StringArray that are
+// limited to a particular size (to the upper limit of 2GB)
+
+namespace internal {
+
+class ARROW_EXPORT ChunkedBinaryBuilder {
+ public:
+  ChunkedBinaryBuilder(int32_t max_chunk_size,
+                       MemoryPool* pool ARROW_MEMORY_POOL_DEFAULT);
+
+  virtual ~ChunkedBinaryBuilder() = default;
+
+  Status Append(const uint8_t* value, int32_t length) {
+    if (ARROW_PREDICT_FALSE(length + chunk_data_size_ > max_chunk_size_)) {
+      // Move onto next chunk, unless the builder length is currently 0, which
+      // means that max_chunk_size_ is less than the item length
+      if (builder_->length() > 0) {
+        ARROW_RETURN_NOT_OK(NextChunk());
+      }
+      // else fall through
+    }
+
+    chunk_data_size_ += length;
+    return builder_->Append(value, length);
+  }
+
+  Status Append(const util::string_view& value) {
+    return Append(reinterpret_cast<const uint8_t*>(value.data()),
+                  static_cast<int32_t>(value.size()));
+  }
+
+  Status AppendNull() {
+    if (ARROW_PREDICT_FALSE(builder_->length() == std::numeric_limits<int32_t>::max())) {
+      ARROW_RETURN_NOT_OK(NextChunk());
+    }
+    return builder_->AppendNull();
+  }
+
+  virtual Status Finish(ArrayVector* out);
+
+ protected:
+  Status NextChunk();
+
+  int32_t max_chunk_size_;
+  int32_t chunk_data_size_;
+
+  std::unique_ptr<BinaryBuilder> builder_;
+  std::vector<std::shared_ptr<Array>> chunks_;
+};
+
+class ARROW_EXPORT ChunkedStringBuilder : public ChunkedBinaryBuilder {
+ public:
+  using ChunkedBinaryBuilder::ChunkedBinaryBuilder;
+
+  Status Finish(ArrayVector* out) override;
+};
+
+}  // namespace internal
+
+}  // namespace arrow
diff --git a/cpp/src/arrow/array/builder_decimal.cc b/cpp/src/arrow/array/builder_decimal.cc
new file mode 100644
index 0000000..d64c4db
--- /dev/null
+++ b/cpp/src/arrow/array/builder_decimal.cc
@@ -0,0 +1,64 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/array/builder_decimal.h"
+
+#include <algorithm>
+#include <cstddef>
+#include <cstdint>
+#include <cstring>
+#include <memory>
+#include <numeric>
+#include <sstream>
+#include <string>
+#include <utility>
+#include <vector>
+
+#include "arrow/array.h"
+#include "arrow/buffer.h"
+#include "arrow/status.h"
+#include "arrow/type.h"
+#include "arrow/type_traits.h"
+#include "arrow/util/bit-util.h"
+#include "arrow/util/checked_cast.h"
+#include "arrow/util/decimal.h"
+#include "arrow/util/logging.h"
+
+namespace arrow {
+
+// ----------------------------------------------------------------------
+// Decimal128Builder
+
+Decimal128Builder::Decimal128Builder(const std::shared_ptr<DataType>& type,
+                                     MemoryPool* pool)
+    : FixedSizeBinaryBuilder(type, pool) {}
+
+Status Decimal128Builder::Append(const Decimal128& value) {
+  RETURN_NOT_OK(FixedSizeBinaryBuilder::Reserve(1));
+  return FixedSizeBinaryBuilder::Append(value.ToBytes());
+}
+
+Status Decimal128Builder::FinishInternal(std::shared_ptr<ArrayData>* out) {
+  std::shared_ptr<Buffer> data;
+  RETURN_NOT_OK(byte_builder_.Finish(&data));
+
+  *out = ArrayData::Make(type_, length_, {null_bitmap_, data}, null_count_);
+
+  return Status::OK();
+}
+
+}  // namespace arrow
diff --git a/cpp/src/arrow/util/string_view.h b/cpp/src/arrow/array/builder_decimal.h
similarity index 57%
copy from cpp/src/arrow/util/string_view.h
copy to cpp/src/arrow/array/builder_decimal.h
index 2ee594a..fb40a79 100644
--- a/cpp/src/arrow/util/string_view.h
+++ b/cpp/src/arrow/array/builder_decimal.h
@@ -15,17 +15,31 @@
 // specific language governing permissions and limitations
 // under the License.
 
-#ifndef ARROW_UTIL_STRING_VIEW_H
-#define ARROW_UTIL_STRING_VIEW_H
+#pragma once
 
-#include "arrow/util/string_view/string_view.hpp"
+#include <memory>
+
+#include "arrow/array/builder_base.h"
+#include "arrow/array/builder_binary.h"
 
 namespace arrow {
-namespace util {
 
-using nonstd::string_view;
+class Decimal128;
 
-}  // namespace util
-}  // namespace arrow
+class ARROW_EXPORT Decimal128Builder : public FixedSizeBinaryBuilder {
+ public:
+  explicit Decimal128Builder(const std::shared_ptr<DataType>& type,
+                             MemoryPool* pool ARROW_MEMORY_POOL_DEFAULT);
+
+  using FixedSizeBinaryBuilder::Append;
+  using FixedSizeBinaryBuilder::AppendValues;
+  using FixedSizeBinaryBuilder::Reset;
+
+  Status Append(const Decimal128& val);
 
-#endif  // ARROW_UTIL_STRING_VIEW_H
+  Status FinishInternal(std::shared_ptr<ArrayData>* out) override;
+};
+
+using DecimalBuilder = Decimal128Builder;
+
+}  // namespace arrow
diff --git a/cpp/src/arrow/builder-dict.cc b/cpp/src/arrow/array/builder_dict.cc
similarity index 99%
rename from cpp/src/arrow/builder-dict.cc
rename to cpp/src/arrow/array/builder_dict.cc
index b021c3a..0891e4c 100644
--- a/cpp/src/arrow/builder-dict.cc
+++ b/cpp/src/arrow/array/builder_dict.cc
@@ -15,13 +15,15 @@
 // specific language governing permissions and limitations
 // under the License.
 
+#include "arrow/array/builder_dict.h"
+
+#include <algorithm>
 #include <cstdint>
 #include <utility>
 #include <vector>
 
 #include "arrow/array.h"
 #include "arrow/buffer.h"
-#include "arrow/builder.h"
 #include "arrow/status.h"
 #include "arrow/type.h"
 #include "arrow/type_traits.h"
diff --git a/cpp/src/arrow/array/builder_dict.h b/cpp/src/arrow/array/builder_dict.h
new file mode 100644
index 0000000..6f02716
--- /dev/null
+++ b/cpp/src/arrow/array/builder_dict.h
@@ -0,0 +1,167 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <memory>
+
+#include "arrow/array/builder_adaptive.h"  // IWYU pragma: export
+#include "arrow/array/builder_base.h"      // IWYU pragma: export
+
+namespace arrow {
+
+// ----------------------------------------------------------------------
+// Dictionary builder
+
+namespace internal {
+
+template <typename T>
+struct DictionaryScalar {
+  using type = typename T::c_type;
+};
+
+template <>
+struct DictionaryScalar<BinaryType> {
+  using type = util::string_view;
+};
+
+template <>
+struct DictionaryScalar<StringType> {
+  using type = util::string_view;
+};
+
+template <>
+struct DictionaryScalar<FixedSizeBinaryType> {
+  using type = util::string_view;
+};
+
+}  // namespace internal
+
+/// \brief Array builder for created encoded DictionaryArray from dense array
+///
+/// Unlike other builders, dictionary builder does not completely reset the state
+/// on Finish calls. The arrays built after the initial Finish call will reuse
+/// the previously created encoding and build a delta dictionary when new terms
+/// occur.
+///
+/// data
+template <typename T>
+class ARROW_EXPORT DictionaryBuilder : public ArrayBuilder {
+ public:
+  using Scalar = typename internal::DictionaryScalar<T>::type;
+
+  // WARNING: the type given below is the value type, not the DictionaryType.
+  // The DictionaryType is instantiated on the Finish() call.
+  DictionaryBuilder(const std::shared_ptr<DataType>& type, MemoryPool* pool);
+
+  template <typename T1 = T>
+  explicit DictionaryBuilder(
+      typename std::enable_if<TypeTraits<T1>::is_parameter_free, MemoryPool*>::type pool)
+      : DictionaryBuilder<T1>(TypeTraits<T1>::type_singleton(), pool) {}
+
+  ~DictionaryBuilder() override;
+
+  /// \brief Append a scalar value
+  Status Append(const Scalar& value);
+
+  /// \brief Append a fixed-width string (only for FixedSizeBinaryType)
+  template <typename T1 = T>
+  Status Append(typename std::enable_if<std::is_base_of<FixedSizeBinaryType, T1>::value,
+                                        const uint8_t*>::type value) {
+    return Append(util::string_view(reinterpret_cast<const char*>(value), byte_width_));
+  }
+
+  /// \brief Append a fixed-width string (only for FixedSizeBinaryType)
+  template <typename T1 = T>
+  Status Append(typename std::enable_if<std::is_base_of<FixedSizeBinaryType, T1>::value,
+                                        const char*>::type value) {
+    return Append(util::string_view(value, byte_width_));
+  }
+
+  /// \brief Append a scalar null value
+  Status AppendNull();
+
+  /// \brief Append a whole dense array to the builder
+  Status AppendArray(const Array& array);
+
+  void Reset() override;
+  Status Resize(int64_t capacity) override;
+  Status FinishInternal(std::shared_ptr<ArrayData>* out) override;
+
+  /// is the dictionary builder in the delta building mode
+  bool is_building_delta() { return delta_offset_ > 0; }
+
+ protected:
+  class MemoTableImpl;
+  std::unique_ptr<MemoTableImpl> memo_table_;
+
+  int32_t delta_offset_;
+  // Only used for FixedSizeBinaryType
+  int32_t byte_width_;
+
+  AdaptiveIntBuilder values_builder_;
+};
+
+template <>
+class ARROW_EXPORT DictionaryBuilder<NullType> : public ArrayBuilder {
+ public:
+  DictionaryBuilder(const std::shared_ptr<DataType>& type, MemoryPool* pool);
+  explicit DictionaryBuilder(MemoryPool* pool);
+
+  /// \brief Append a scalar null value
+  Status AppendNull();
+
+  /// \brief Append a whole dense array to the builder
+  Status AppendArray(const Array& array);
+
+  Status Resize(int64_t capacity) override;
+  Status FinishInternal(std::shared_ptr<ArrayData>* out) override;
+
+ protected:
+  AdaptiveIntBuilder values_builder_;
+};
+
+class ARROW_EXPORT BinaryDictionaryBuilder : public DictionaryBuilder<BinaryType> {
+ public:
+  using DictionaryBuilder::Append;
+  using DictionaryBuilder::DictionaryBuilder;
+
+  Status Append(const uint8_t* value, int32_t length) {
+    return Append(reinterpret_cast<const char*>(value), length);
+  }
+
+  Status Append(const char* value, int32_t length) {
+    return Append(util::string_view(value, length));
+  }
+};
+
+/// \brief Dictionary array builder with convenience methods for strings
+class ARROW_EXPORT StringDictionaryBuilder : public DictionaryBuilder<StringType> {
+ public:
+  using DictionaryBuilder::Append;
+  using DictionaryBuilder::DictionaryBuilder;
+
+  Status Append(const uint8_t* value, int32_t length) {
+    return Append(reinterpret_cast<const char*>(value), length);
+  }
+
+  Status Append(const char* value, int32_t length) {
+    return Append(util::string_view(value, length));
+  }
+};
+
+}  // namespace arrow
diff --git a/cpp/src/arrow/array/builder_nested.cc b/cpp/src/arrow/array/builder_nested.cc
new file mode 100644
index 0000000..e733243
--- /dev/null
+++ b/cpp/src/arrow/array/builder_nested.cc
@@ -0,0 +1,156 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/array/builder_nested.h"
+
+#include <algorithm>
+#include <cstddef>
+#include <cstdint>
+#include <cstring>
+#include <sstream>
+#include <utility>
+#include <vector>
+
+#include "arrow/array.h"
+#include "arrow/buffer.h"
+#include "arrow/status.h"
+#include "arrow/type.h"
+#include "arrow/type_traits.h"
+#include "arrow/util/bit-util.h"
+#include "arrow/util/int-util.h"
+#include "arrow/util/logging.h"
+
+namespace arrow {
+
+// ----------------------------------------------------------------------
+// ListBuilder
+
+ListBuilder::ListBuilder(MemoryPool* pool,
+                         std::shared_ptr<ArrayBuilder> const& value_builder,
+                         const std::shared_ptr<DataType>& type)
+    : ArrayBuilder(type ? type
+                        : std::static_pointer_cast<DataType>(
+                              std::make_shared<ListType>(value_builder->type())),
+                   pool),
+      offsets_builder_(pool),
+      value_builder_(value_builder) {}
+
+Status ListBuilder::AppendValues(const int32_t* offsets, int64_t length,
+                                 const uint8_t* valid_bytes) {
+  RETURN_NOT_OK(Reserve(length));
+  UnsafeAppendToBitmap(valid_bytes, length);
+  offsets_builder_.UnsafeAppend(offsets, length);
+  return Status::OK();
+}
+
+Status ListBuilder::AppendNextOffset() {
+  int64_t num_values = value_builder_->length();
+  if (ARROW_PREDICT_FALSE(num_values > kListMaximumElements)) {
+    std::stringstream ss;
+    ss << "ListArray cannot contain more then INT32_MAX - 1 child elements,"
+       << " have " << num_values;
+    return Status::CapacityError(ss.str());
+  }
+  return offsets_builder_.Append(static_cast<int32_t>(num_values));
+}
+
+Status ListBuilder::Append(bool is_valid) {
+  RETURN_NOT_OK(Reserve(1));
+  UnsafeAppendToBitmap(is_valid);
+  return AppendNextOffset();
+}
+
+Status ListBuilder::Resize(int64_t capacity) {
+  DCHECK_LE(capacity, kListMaximumElements);
+  RETURN_NOT_OK(CheckCapacity(capacity, capacity_));
+
+  // one more then requested for offsets
+  RETURN_NOT_OK(offsets_builder_.Resize((capacity + 1) * sizeof(int32_t)));
+  return ArrayBuilder::Resize(capacity);
+}
+
+Status ListBuilder::FinishInternal(std::shared_ptr<ArrayData>* out) {
+  RETURN_NOT_OK(AppendNextOffset());
+
+  // Offset padding zeroed by BufferBuilder
+  std::shared_ptr<Buffer> offsets;
+  RETURN_NOT_OK(offsets_builder_.Finish(&offsets));
+
+  std::shared_ptr<ArrayData> items;
+  if (values_) {
+    items = values_->data();
+  } else {
+    if (value_builder_->length() == 0) {
+      // Try to make sure we get a non-null values buffer (ARROW-2744)
+      RETURN_NOT_OK(value_builder_->Resize(0));
+    }
+    RETURN_NOT_OK(value_builder_->FinishInternal(&items));
+  }
+
+  *out = ArrayData::Make(type_, length_, {null_bitmap_, offsets}, null_count_);
+  (*out)->child_data.emplace_back(std::move(items));
+  Reset();
+  return Status::OK();
+}
+
+void ListBuilder::Reset() {
+  ArrayBuilder::Reset();
+  values_.reset();
+  offsets_builder_.Reset();
+  value_builder_->Reset();
+}
+
+ArrayBuilder* ListBuilder::value_builder() const {
+  DCHECK(!values_) << "Using value builder is pointless when values_ is set";
+  return value_builder_.get();
+}
+
+// ----------------------------------------------------------------------
+// Struct
+
+StructBuilder::StructBuilder(const std::shared_ptr<DataType>& type, MemoryPool* pool,
+                             std::vector<std::shared_ptr<ArrayBuilder>>&& field_builders)
+    : ArrayBuilder(type, pool) {
+  children_ = std::move(field_builders);
+}
+
+void StructBuilder::Reset() {
+  ArrayBuilder::Reset();
+  for (const auto& field_builder : children_) {
+    field_builder->Reset();
+  }
+}
+
+Status StructBuilder::FinishInternal(std::shared_ptr<ArrayData>* out) {
+  RETURN_NOT_OK(TrimBuffer(BitUtil::BytesForBits(length_), null_bitmap_.get()));
+  *out = ArrayData::Make(type_, length_, {null_bitmap_}, null_count_);
+
+  (*out)->child_data.resize(children_.size());
+  for (size_t i = 0; i < children_.size(); ++i) {
+    if (length_ == 0) {
+      // Try to make sure the child buffers are initialized
+      RETURN_NOT_OK(children_[i]->Resize(0));
+    }
+    RETURN_NOT_OK(children_[i]->FinishInternal(&(*out)->child_data[i]));
+  }
+
+  null_bitmap_ = nullptr;
+  capacity_ = length_ = null_count_ = 0;
+  return Status::OK();
+}
+
+}  // namespace arrow
diff --git a/cpp/src/arrow/array/builder_nested.h b/cpp/src/arrow/array/builder_nested.h
new file mode 100644
index 0000000..863e6fe
--- /dev/null
+++ b/cpp/src/arrow/array/builder_nested.h
@@ -0,0 +1,121 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <memory>
+#include <vector>
+
+#include "arrow/array/builder_base.h"
+
+namespace arrow {
+
+// ----------------------------------------------------------------------
+// List builder
+
+/// \class ListBuilder
+/// \brief Builder class for variable-length list array value types
+///
+/// To use this class, you must append values to the child array builder and use
+/// the Append function to delimit each distinct list value (once the values
+/// have been appended to the child array) or use the bulk API to append
+/// a sequence of offests and null values.
+///
+/// A note on types.  Per arrow/type.h all types in the c++ implementation are
+/// logical so even though this class always builds list array, this can
+/// represent multiple different logical types.  If no logical type is provided
+/// at construction time, the class defaults to List<T> where t is taken from the
+/// value_builder/values that the object is constructed with.
+class ARROW_EXPORT ListBuilder : public ArrayBuilder {
+ public:
+  /// Use this constructor to incrementally build the value array along with offsets and
+  /// null bitmap.
+  ListBuilder(MemoryPool* pool, std::shared_ptr<ArrayBuilder> const& value_builder,
+              const std::shared_ptr<DataType>& type = NULLPTR);
+
+  Status Resize(int64_t capacity) override;
+  void Reset() override;
+  Status FinishInternal(std::shared_ptr<ArrayData>* out) override;
+
+  /// \brief Vector append
+  ///
+  /// If passed, valid_bytes is of equal length to values, and any zero byte
+  /// will be considered as a null for that slot
+  Status AppendValues(const int32_t* offsets, int64_t length,
+                      const uint8_t* valid_bytes = NULLPTR);
+
+  /// \brief Start a new variable-length list slot
+  ///
+  /// This function should be called before beginning to append elements to the
+  /// value builder
+  Status Append(bool is_valid = true);
+
+  Status AppendNull() { return Append(false); }
+
+  ArrayBuilder* value_builder() const;
+
+ protected:
+  TypedBufferBuilder<int32_t> offsets_builder_;
+  std::shared_ptr<ArrayBuilder> value_builder_;
+  std::shared_ptr<Array> values_;
+
+  Status AppendNextOffset();
+};
+
+// ----------------------------------------------------------------------
+// Struct
+
+// ---------------------------------------------------------------------------------
+// StructArray builder
+/// Append, Resize and Reserve methods are acting on StructBuilder.
+/// Please make sure all these methods of all child-builders' are consistently
+/// called to maintain data-structure consistency.
+class ARROW_EXPORT StructBuilder : public ArrayBuilder {
+ public:
+  StructBuilder(const std::shared_ptr<DataType>& type, MemoryPool* pool,
+                std::vector<std::shared_ptr<ArrayBuilder>>&& field_builders);
+
+  Status FinishInternal(std::shared_ptr<ArrayData>* out) override;
+
+  /// Null bitmap is of equal length to every child field, and any zero byte
+  /// will be considered as a null for that field, but users must using app-
+  /// end methods or advance methods of the child builders' independently to
+  /// insert data.
+  Status AppendValues(int64_t length, const uint8_t* valid_bytes) {
+    ARROW_RETURN_NOT_OK(Reserve(length));
+    UnsafeAppendToBitmap(valid_bytes, length);
+    return Status::OK();
+  }
+
+  /// Append an element to the Struct. All child-builders' Append method must
+  /// be called independently to maintain data-structure consistency.
+  Status Append(bool is_valid = true) {
+    ARROW_RETURN_NOT_OK(Reserve(1));
+    UnsafeAppendToBitmap(is_valid);
+    return Status::OK();
+  }
+
+  Status AppendNull() { return Append(false); }
+
+  void Reset() override;
+
+  ArrayBuilder* field_builder(int i) const { return children_[i].get(); }
+
+  int num_fields() const { return static_cast<int>(children_.size()); }
+};
+
+}  // namespace arrow
diff --git a/cpp/src/arrow/array/builder_primitive.cc b/cpp/src/arrow/array/builder_primitive.cc
new file mode 100644
index 0000000..bc14000
--- /dev/null
+++ b/cpp/src/arrow/array/builder_primitive.cc
@@ -0,0 +1,272 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/array/builder_primitive.h"
+
+#include <algorithm>
+#include <cstddef>
+#include <cstdint>
+#include <cstring>
+#include <sstream>
+#include <utility>
+#include <vector>
+
+#include "arrow/array.h"
+#include "arrow/buffer.h"
+#include "arrow/status.h"
+#include "arrow/type.h"
+#include "arrow/type_traits.h"
+#include "arrow/util/bit-util.h"
+#include "arrow/util/int-util.h"
+#include "arrow/util/logging.h"
+
+namespace arrow {
+
+// ----------------------------------------------------------------------
+// Null builder
+
+Status NullBuilder::FinishInternal(std::shared_ptr<ArrayData>* out) {
+  *out = ArrayData::Make(null(), length_, {nullptr}, length_);
+  length_ = null_count_ = 0;
+  return Status::OK();
+}
+
+// ----------------------------------------------------------------------
+
+template <typename T>
+void PrimitiveBuilder<T>::Reset() {
+  data_.reset();
+  raw_data_ = nullptr;
+}
+
+template <typename T>
+Status PrimitiveBuilder<T>::Resize(int64_t capacity) {
+  RETURN_NOT_OK(CheckCapacity(capacity, capacity_));
+  capacity = std::max(capacity, kMinBuilderCapacity);
+
+  int64_t nbytes = TypeTraits<T>::bytes_required(capacity);
+  if (capacity_ == 0) {
+    RETURN_NOT_OK(AllocateResizableBuffer(pool_, nbytes, &data_));
+  } else {
+    RETURN_NOT_OK(data_->Resize(nbytes));
+  }
+
+  raw_data_ = reinterpret_cast<value_type*>(data_->mutable_data());
+  return ArrayBuilder::Resize(capacity);
+}
+
+template <typename T>
+Status PrimitiveBuilder<T>::AppendValues(const value_type* values, int64_t length,
+                                         const uint8_t* valid_bytes) {
+  RETURN_NOT_OK(Reserve(length));
+
+  if (length > 0) {
+    std::memcpy(raw_data_ + length_, values,
+                static_cast<std::size_t>(TypeTraits<T>::bytes_required(length)));
+  }
+
+  // length_ is update by these
+  ArrayBuilder::UnsafeAppendToBitmap(valid_bytes, length);
+  return Status::OK();
+}
+
+template <typename T>
+Status PrimitiveBuilder<T>::AppendValues(const value_type* values, int64_t length,
+                                         const std::vector<bool>& is_valid) {
+  RETURN_NOT_OK(Reserve(length));
+  DCHECK_EQ(length, static_cast<int64_t>(is_valid.size()));
+
+  if (length > 0) {
+    std::memcpy(raw_data_ + length_, values,
+                static_cast<std::size_t>(TypeTraits<T>::bytes_required(length)));
+  }
+
+  // length_ is update by these
+  ArrayBuilder::UnsafeAppendToBitmap(is_valid);
+  return Status::OK();
+}
+
+template <typename T>
+Status PrimitiveBuilder<T>::AppendValues(const std::vector<value_type>& values,
+                                         const std::vector<bool>& is_valid) {
+  return AppendValues(values.data(), static_cast<int64_t>(values.size()), is_valid);
+}
+
+template <typename T>
+Status PrimitiveBuilder<T>::AppendValues(const std::vector<value_type>& values) {
+  return AppendValues(values.data(), static_cast<int64_t>(values.size()));
+}
+
+template <typename T>
+Status PrimitiveBuilder<T>::FinishInternal(std::shared_ptr<ArrayData>* out) {
+  RETURN_NOT_OK(TrimBuffer(BitUtil::BytesForBits(length_), null_bitmap_.get()));
+  RETURN_NOT_OK(TrimBuffer(TypeTraits<T>::bytes_required(length_), data_.get()));
+
+  *out = ArrayData::Make(type_, length_, {null_bitmap_, data_}, null_count_);
+
+  data_ = null_bitmap_ = nullptr;
+  capacity_ = length_ = null_count_ = 0;
+
+  return Status::OK();
+}
+
+template class PrimitiveBuilder<UInt8Type>;
+template class PrimitiveBuilder<UInt16Type>;
+template class PrimitiveBuilder<UInt32Type>;
+template class PrimitiveBuilder<UInt64Type>;
+template class PrimitiveBuilder<Int8Type>;
+template class PrimitiveBuilder<Int16Type>;
+template class PrimitiveBuilder<Int32Type>;
+template class PrimitiveBuilder<Int64Type>;
+template class PrimitiveBuilder<Date32Type>;
+template class PrimitiveBuilder<Date64Type>;
+template class PrimitiveBuilder<Time32Type>;
+template class PrimitiveBuilder<Time64Type>;
+template class PrimitiveBuilder<TimestampType>;
+template class PrimitiveBuilder<HalfFloatType>;
+template class PrimitiveBuilder<FloatType>;
+template class PrimitiveBuilder<DoubleType>;
+
+BooleanBuilder::BooleanBuilder(MemoryPool* pool)
+    : ArrayBuilder(boolean(), pool), data_(nullptr), raw_data_(nullptr) {}
+
+BooleanBuilder::BooleanBuilder(const std::shared_ptr<DataType>& type, MemoryPool* pool)
+    : BooleanBuilder(pool) {
+  DCHECK_EQ(Type::BOOL, type->id());
+}
+
+void BooleanBuilder::Reset() {
+  ArrayBuilder::Reset();
+  data_.reset();
+  raw_data_ = nullptr;
+}
+
+Status BooleanBuilder::Resize(int64_t capacity) {
+  RETURN_NOT_OK(CheckCapacity(capacity, capacity_));
+  capacity = std::max(capacity, kMinBuilderCapacity);
+
+  const int64_t new_bitmap_size = BitUtil::BytesForBits(capacity);
+  if (capacity_ == 0) {
+    RETURN_NOT_OK(AllocateResizableBuffer(pool_, new_bitmap_size, &data_));
+    raw_data_ = reinterpret_cast<uint8_t*>(data_->mutable_data());
+
+    // We zero the memory for booleans to keep things simple; for some reason if
+    // we do not, even though we may write every bit (through in-place | or &),
+    // valgrind will still show a warning. If we do not zero the bytes here, we
+    // will have to be careful to zero them in AppendNull and AppendNulls. Also,
+    // zeroing the bits results in deterministic bits when each byte may have a
+    // mix of nulls and not nulls.
+    //
+    // We only zero up to new_bitmap_size because the padding was zeroed by
+    // AllocateResizableBuffer
+    memset(raw_data_, 0, static_cast<size_t>(new_bitmap_size));
+  } else {
+    const int64_t old_bitmap_capacity = data_->capacity();
+    RETURN_NOT_OK(data_->Resize(new_bitmap_size));
+    const int64_t new_bitmap_capacity = data_->capacity();
+    raw_data_ = reinterpret_cast<uint8_t*>(data_->mutable_data());
+
+    // See comment above about why we zero memory for booleans
+    memset(raw_data_ + old_bitmap_capacity, 0,
+           static_cast<size_t>(new_bitmap_capacity - old_bitmap_capacity));
+  }
+
+  return ArrayBuilder::Resize(capacity);
+}
+
+Status BooleanBuilder::FinishInternal(std::shared_ptr<ArrayData>* out) {
+  int64_t bit_offset = length_ % 8;
+  if (bit_offset > 0) {
+    // Adjust last byte
+    data_->mutable_data()[length_ / 8] &= BitUtil::kPrecedingBitmask[bit_offset];
+  }
+
+  RETURN_NOT_OK(TrimBuffer(BitUtil::BytesForBits(length_), null_bitmap_.get()));
+  RETURN_NOT_OK(TrimBuffer(BitUtil::BytesForBits(length_), data_.get()));
+
+  *out = ArrayData::Make(boolean(), length_, {null_bitmap_, data_}, null_count_);
+
+  data_ = null_bitmap_ = nullptr;
+  capacity_ = length_ = null_count_ = 0;
+  return Status::OK();
+}
+
+Status BooleanBuilder::AppendValues(const uint8_t* values, int64_t length,
+                                    const uint8_t* valid_bytes) {
+  RETURN_NOT_OK(Reserve(length));
+
+  int64_t i = 0;
+  internal::GenerateBitsUnrolled(raw_data_, length_, length,
+                                 [values, &i]() -> bool { return values[i++] != 0; });
+
+  // this updates length_
+  ArrayBuilder::UnsafeAppendToBitmap(valid_bytes, length);
+  return Status::OK();
+}
+
+Status BooleanBuilder::AppendValues(const uint8_t* values, int64_t length,
+                                    const std::vector<bool>& is_valid) {
+  RETURN_NOT_OK(Reserve(length));
+  DCHECK_EQ(length, static_cast<int64_t>(is_valid.size()));
+
+  int64_t i = 0;
+  internal::GenerateBitsUnrolled(raw_data_, length_, length,
+                                 [values, &i]() -> bool { return values[i++]; });
+
+  // this updates length_
+  ArrayBuilder::UnsafeAppendToBitmap(is_valid);
+  return Status::OK();
+}
+
+Status BooleanBuilder::AppendValues(const std::vector<uint8_t>& values,
+                                    const std::vector<bool>& is_valid) {
+  return AppendValues(values.data(), static_cast<int64_t>(values.size()), is_valid);
+}
+
+Status BooleanBuilder::AppendValues(const std::vector<uint8_t>& values) {
+  return AppendValues(values.data(), static_cast<int64_t>(values.size()));
+}
+
+Status BooleanBuilder::AppendValues(const std::vector<bool>& values,
+                                    const std::vector<bool>& is_valid) {
+  const int64_t length = static_cast<int64_t>(values.size());
+  RETURN_NOT_OK(Reserve(length));
+  DCHECK_EQ(length, static_cast<int64_t>(is_valid.size()));
+
+  int64_t i = 0;
+  internal::GenerateBitsUnrolled(raw_data_, length_, length,
+                                 [&values, &i]() -> bool { return values[i++]; });
+
+  // this updates length_
+  ArrayBuilder::UnsafeAppendToBitmap(is_valid);
+  return Status::OK();
+}
+
+Status BooleanBuilder::AppendValues(const std::vector<bool>& values) {
+  const int64_t length = static_cast<int64_t>(values.size());
+  RETURN_NOT_OK(Reserve(length));
+
+  int64_t i = 0;
+  internal::GenerateBitsUnrolled(raw_data_, length_, length,
+                                 [&values, &i]() -> bool { return values[i++]; });
+
+  // this updates length_
+  ArrayBuilder::UnsafeSetNotNull(length);
+  return Status::OK();
+}
+
+}  // namespace arrow
diff --git a/cpp/src/arrow/array/builder_primitive.h b/cpp/src/arrow/array/builder_primitive.h
new file mode 100644
index 0000000..13f6c22
--- /dev/null
+++ b/cpp/src/arrow/array/builder_primitive.h
@@ -0,0 +1,401 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <algorithm>
+#include <memory>
+#include <vector>
+
+#include "arrow/array/builder_base.h"
+#include "arrow/type.h"
+
+namespace arrow {
+
+class ARROW_EXPORT NullBuilder : public ArrayBuilder {
+ public:
+  explicit NullBuilder(MemoryPool* pool ARROW_MEMORY_POOL_DEFAULT)
+      : ArrayBuilder(null(), pool) {}
+
+  Status AppendNull() {
+    ++null_count_;
+    ++length_;
+    return Status::OK();
+  }
+
+  Status Append(std::nullptr_t value) { return AppendNull(); }
+
+  Status FinishInternal(std::shared_ptr<ArrayData>* out) override;
+};
+
+template <typename Type>
+class ARROW_EXPORT PrimitiveBuilder : public ArrayBuilder {
+ public:
+  using value_type = typename Type::c_type;
+
+  explicit PrimitiveBuilder(const std::shared_ptr<DataType>& type, MemoryPool* pool)
+      : ArrayBuilder(type, pool), data_(NULLPTR), raw_data_(NULLPTR) {}
+
+  using ArrayBuilder::Advance;
+
+  /// Write nulls as uint8_t* (0 value indicates null) into pre-allocated memory
+  /// The memory at the corresponding data slot is set to 0 to prevent uninitialized
+  /// memory access
+  Status AppendNulls(const uint8_t* valid_bytes, int64_t length) {
+    ARROW_RETURN_NOT_OK(Reserve(length));
+    memset(raw_data_ + length_, 0,
+           static_cast<size_t>(TypeTraits<Type>::bytes_required(length)));
+    UnsafeAppendToBitmap(valid_bytes, length);
+    return Status::OK();
+  }
+
+  /// \brief Append a single null element
+  Status AppendNull() {
+    ARROW_RETURN_NOT_OK(Reserve(1));
+    memset(raw_data_ + length_, 0, sizeof(value_type));
+    UnsafeAppendToBitmap(false);
+    return Status::OK();
+  }
+
+  value_type GetValue(int64_t index) const {
+    return reinterpret_cast<const value_type*>(data_->data())[index];
+  }
+
+  /// \brief Append a sequence of elements in one shot
+  /// \param[in] values a contiguous C array of values
+  /// \param[in] length the number of values to append
+  /// \param[in] valid_bytes an optional sequence of bytes where non-zero
+  /// indicates a valid (non-null) value
+  /// \return Status
+  Status AppendValues(const value_type* values, int64_t length,
+                      const uint8_t* valid_bytes = NULLPTR);
+
+  /// \brief Append a sequence of elements in one shot
+  /// \param[in] values a contiguous C array of values
+  /// \param[in] length the number of values to append
+  /// \param[in] is_valid an std::vector<bool> indicating valid (1) or null
+  /// (0). Equal in length to values
+  /// \return Status
+  Status AppendValues(const value_type* values, int64_t length,
+                      const std::vector<bool>& is_valid);
+
+  /// \brief Append a sequence of elements in one shot
+  /// \param[in] values a std::vector of values
+  /// \param[in] is_valid an std::vector<bool> indicating valid (1) or null
+  /// (0). Equal in length to values
+  /// \return Status
+  Status AppendValues(const std::vector<value_type>& values,
+                      const std::vector<bool>& is_valid);
+
+  /// \brief Append a sequence of elements in one shot
+  /// \param[in] values a std::vector of values
+  /// \return Status
+  Status AppendValues(const std::vector<value_type>& values);
+
+  /// \brief Append a sequence of elements in one shot
+  /// \param[in] values_begin InputIterator to the beginning of the values
+  /// \param[in] values_end InputIterator pointing to the end of the values
+  /// \return Status
+
+  template <typename ValuesIter>
+  Status AppendValues(ValuesIter values_begin, ValuesIter values_end) {
+    int64_t length = static_cast<int64_t>(std::distance(values_begin, values_end));
+    ARROW_RETURN_NOT_OK(Reserve(length));
+
+    std::copy(values_begin, values_end, raw_data_ + length_);
+
+    // this updates the length_
+    UnsafeSetNotNull(length);
+    return Status::OK();
+  }
+
+  /// \brief Append a sequence of elements in one shot, with a specified nullmap
+  /// \param[in] values_begin InputIterator to the beginning of the values
+  /// \param[in] values_end InputIterator pointing to the end of the values
+  /// \param[in] valid_begin InputIterator with elements indication valid(1)
+  ///  or null(0) values.
+  /// \return Status
+  template <typename ValuesIter, typename ValidIter>
+  typename std::enable_if<!std::is_pointer<ValidIter>::value, Status>::type AppendValues(
+      ValuesIter values_begin, ValuesIter values_end, ValidIter valid_begin) {
+    static_assert(!internal::is_null_pointer<ValidIter>::value,
+                  "Don't pass a NULLPTR directly as valid_begin, use the 2-argument "
+                  "version instead");
+    int64_t length = static_cast<int64_t>(std::distance(values_begin, values_end));
+    ARROW_RETURN_NOT_OK(Reserve(length));
+
+    std::copy(values_begin, values_end, raw_data_ + length_);
+
+    // this updates the length_
+    UnsafeAppendToBitmap(valid_begin, std::next(valid_begin, length));
+    return Status::OK();
+  }
+
+  // Same as above, with a pointer type ValidIter
+  template <typename ValuesIter, typename ValidIter>
+  typename std::enable_if<std::is_pointer<ValidIter>::value, Status>::type AppendValues(
+      ValuesIter values_begin, ValuesIter values_end, ValidIter valid_begin) {
+    int64_t length = static_cast<int64_t>(std::distance(values_begin, values_end));
+    ARROW_RETURN_NOT_OK(Reserve(length));
+
+    std::copy(values_begin, values_end, raw_data_ + length_);
+
+    // this updates the length_
+    if (valid_begin == NULLPTR) {
+      UnsafeSetNotNull(length);
+    } else {
+      UnsafeAppendToBitmap(valid_begin, std::next(valid_begin, length));
+    }
+
+    return Status::OK();
+  }
+
+  Status FinishInternal(std::shared_ptr<ArrayData>* out) override;
+  void Reset() override;
+
+  Status Resize(int64_t capacity) override;
+
+ protected:
+  std::shared_ptr<ResizableBuffer> data_;
+  value_type* raw_data_;
+};
+
+/// Base class for all Builders that emit an Array of a scalar numerical type.
+template <typename T>
+class ARROW_EXPORT NumericBuilder : public PrimitiveBuilder<T> {
+ public:
+  using typename PrimitiveBuilder<T>::value_type;
+  using PrimitiveBuilder<T>::PrimitiveBuilder;
+
+  template <typename T1 = T>
+  explicit NumericBuilder(
+      typename std::enable_if<TypeTraits<T1>::is_parameter_free, MemoryPool*>::type pool
+          ARROW_MEMORY_POOL_DEFAULT)
+      : PrimitiveBuilder<T1>(TypeTraits<T1>::type_singleton(), pool) {}
+
+  using ArrayBuilder::UnsafeAppendNull;
+  using PrimitiveBuilder<T>::AppendValues;
+  using PrimitiveBuilder<T>::Resize;
+  using PrimitiveBuilder<T>::Reserve;
+
+  /// Append a single scalar and increase the size if necessary.
+  Status Append(const value_type val) {
+    ARROW_RETURN_NOT_OK(ArrayBuilder::Reserve(1));
+    UnsafeAppend(val);
+    return Status::OK();
+  }
+
+  /// Append a single scalar under the assumption that the underlying Buffer is
+  /// large enough.
+  ///
+  /// This method does not capacity-check; make sure to call Reserve
+  /// beforehand.
+  void UnsafeAppend(const value_type val) {
+    BitUtil::SetBit(null_bitmap_data_, length_);
+    raw_data_[length_++] = val;
+  }
+
+ protected:
+  using PrimitiveBuilder<T>::length_;
+  using PrimitiveBuilder<T>::null_bitmap_data_;
+  using PrimitiveBuilder<T>::raw_data_;
+};
+
+// Builders
+
+using UInt8Builder = NumericBuilder<UInt8Type>;
+using UInt16Builder = NumericBuilder<UInt16Type>;
+using UInt32Builder = NumericBuilder<UInt32Type>;
+using UInt64Builder = NumericBuilder<UInt64Type>;
+
+using Int8Builder = NumericBuilder<Int8Type>;
+using Int16Builder = NumericBuilder<Int16Type>;
+using Int32Builder = NumericBuilder<Int32Type>;
+using Int64Builder = NumericBuilder<Int64Type>;
+using TimestampBuilder = NumericBuilder<TimestampType>;
+using Time32Builder = NumericBuilder<Time32Type>;
+using Time64Builder = NumericBuilder<Time64Type>;
+using Date32Builder = NumericBuilder<Date32Type>;
+using Date64Builder = NumericBuilder<Date64Type>;
+
+using HalfFloatBuilder = NumericBuilder<HalfFloatType>;
+using FloatBuilder = NumericBuilder<FloatType>;
+using DoubleBuilder = NumericBuilder<DoubleType>;
+
+class ARROW_EXPORT BooleanBuilder : public ArrayBuilder {
+ public:
+  using value_type = bool;
+  explicit BooleanBuilder(MemoryPool* pool ARROW_MEMORY_POOL_DEFAULT);
+
+  explicit BooleanBuilder(const std::shared_ptr<DataType>& type, MemoryPool* pool);
+
+  using ArrayBuilder::Advance;
+  using ArrayBuilder::UnsafeAppendNull;
+
+  /// Write nulls as uint8_t* (0 value indicates null) into pre-allocated memory
+  Status AppendNulls(const uint8_t* valid_bytes, int64_t length) {
+    ARROW_RETURN_NOT_OK(Reserve(length));
+    UnsafeAppendToBitmap(valid_bytes, length);
+
+    return Status::OK();
+  }
+
+  Status AppendNull() {
+    ARROW_RETURN_NOT_OK(Reserve(1));
+    UnsafeAppendToBitmap(false);
+
+    return Status::OK();
+  }
+
+  /// Scalar append
+  Status Append(const bool val) {
+    ARROW_RETURN_NOT_OK(Reserve(1));
+    UnsafeAppend(val);
+    return Status::OK();
+  }
+
+  Status Append(const uint8_t val) { return Append(val != 0); }
+
+  /// Scalar append, without checking for capacity
+  void UnsafeAppend(const bool val) {
+    BitUtil::SetBit(null_bitmap_data_, length_);
+    if (val) {
+      BitUtil::SetBit(raw_data_, length_);
+    } else {
+      BitUtil::ClearBit(raw_data_, length_);
+    }
+    ++length_;
+  }
+
+  void UnsafeAppend(const uint8_t val) { UnsafeAppend(val != 0); }
+
+  /// \brief Append a sequence of elements in one shot
+  /// \param[in] values a contiguous array of bytes (non-zero is 1)
+  /// \param[in] length the number of values to append
+  /// \param[in] valid_bytes an optional sequence of bytes where non-zero
+  /// indicates a valid (non-null) value
+  /// \return Status
+  Status AppendValues(const uint8_t* values, int64_t length,
+                      const uint8_t* valid_bytes = NULLPTR);
+
+  /// \brief Append a sequence of elements in one shot
+  /// \param[in] values a contiguous C array of values
+  /// \param[in] length the number of values to append
+  /// \param[in] is_valid an std::vector<bool> indicating valid (1) or null
+  /// (0). Equal in length to values
+  /// \return Status
+  Status AppendValues(const uint8_t* values, int64_t length,
+                      const std::vector<bool>& is_valid);
+
+  /// \brief Append a sequence of elements in one shot
+  /// \param[in] values a std::vector of bytes
+  /// \param[in] is_valid an std::vector<bool> indicating valid (1) or null
+  /// (0). Equal in length to values
+  /// \return Status
+  Status AppendValues(const std::vector<uint8_t>& values,
+                      const std::vector<bool>& is_valid);
+
+  /// \brief Append a sequence of elements in one shot
+  /// \param[in] values a std::vector of bytes
+  /// \return Status
+  Status AppendValues(const std::vector<uint8_t>& values);
+
+  /// \brief Append a sequence of elements in one shot
+  /// \param[in] values an std::vector<bool> indicating true (1) or false
+  /// \param[in] is_valid an std::vector<bool> indicating valid (1) or null
+  /// (0). Equal in length to values
+  /// \return Status
+  Status AppendValues(const std::vector<bool>& values, const std::vector<bool>& is_valid);
+
+  /// \brief Append a sequence of elements in one shot
+  /// \param[in] values an std::vector<bool> indicating true (1) or false
+  /// \return Status
+  Status AppendValues(const std::vector<bool>& values);
+
+  /// \brief Append a sequence of elements in one shot
+  /// \param[in] values_begin InputIterator to the beginning of the values
+  /// \param[in] values_end InputIterator pointing to the end of the values
+  ///  or null(0) values
+  /// \return Status
+  template <typename ValuesIter>
+  Status AppendValues(ValuesIter values_begin, ValuesIter values_end) {
+    int64_t length = static_cast<int64_t>(std::distance(values_begin, values_end));
+    ARROW_RETURN_NOT_OK(Reserve(length));
+    auto iter = values_begin;
+    internal::GenerateBitsUnrolled(raw_data_, length_, length,
+                                   [&iter]() -> bool { return *(iter++); });
+
+    // this updates length_
+    UnsafeSetNotNull(length);
+    return Status::OK();
+  }
+
+  /// \brief Append a sequence of elements in one shot, with a specified nullmap
+  /// \param[in] values_begin InputIterator to the beginning of the values
+  /// \param[in] values_end InputIterator pointing to the end of the values
+  /// \param[in] valid_begin InputIterator with elements indication valid(1)
+  ///  or null(0) values
+  /// \return Status
+  template <typename ValuesIter, typename ValidIter>
+  typename std::enable_if<!std::is_pointer<ValidIter>::value, Status>::type AppendValues(
+      ValuesIter values_begin, ValuesIter values_end, ValidIter valid_begin) {
+    static_assert(!internal::is_null_pointer<ValidIter>::value,
+                  "Don't pass a NULLPTR directly as valid_begin, use the 2-argument "
+                  "version instead");
+    int64_t length = static_cast<int64_t>(std::distance(values_begin, values_end));
+    ARROW_RETURN_NOT_OK(Reserve(length));
+
+    auto iter = values_begin;
+    internal::GenerateBitsUnrolled(raw_data_, length_, length,
+                                   [&iter]() -> bool { return *(iter++); });
+
+    // this updates length_
+    ArrayBuilder::UnsafeAppendToBitmap(valid_begin, std::next(valid_begin, length));
+    return Status::OK();
+  }
+
+  // Same as above, for a pointer type ValidIter
+  template <typename ValuesIter, typename ValidIter>
+  typename std::enable_if<std::is_pointer<ValidIter>::value, Status>::type AppendValues(
+      ValuesIter values_begin, ValuesIter values_end, ValidIter valid_begin) {
+    int64_t length = static_cast<int64_t>(std::distance(values_begin, values_end));
+    ARROW_RETURN_NOT_OK(Reserve(length));
+
+    auto iter = values_begin;
+    internal::GenerateBitsUnrolled(raw_data_, length_, length,
+                                   [&iter]() -> bool { return *(iter++); });
+
+    // this updates the length_
+    if (valid_begin == NULLPTR) {
+      UnsafeSetNotNull(length);
+    } else {
+      UnsafeAppendToBitmap(valid_begin, std::next(valid_begin, length));
+    }
+
+    return Status::OK();
+  }
+
+  Status FinishInternal(std::shared_ptr<ArrayData>* out) override;
+  void Reset() override;
+  Status Resize(int64_t capacity) override;
+
+ protected:
+  std::shared_ptr<ResizableBuffer> data_;
+  uint8_t* raw_data_;
+};
+
+}  // namespace arrow
diff --git a/cpp/src/arrow/builder-benchmark.cc b/cpp/src/arrow/builder-benchmark.cc
index f96728d..fae9c89 100644
--- a/cpp/src/arrow/builder-benchmark.cc
+++ b/cpp/src/arrow/builder-benchmark.cc
@@ -163,10 +163,11 @@ static void BM_BuildBooleanArrayNoNulls(
 }
 
 static void BM_BuildBinaryArray(benchmark::State& state) {  // NOLINT non-const reference
-  const int64_t iterations = 1 << 20;
-
+  // About 160MB
+  const int64_t iterations = 1 << 24;
   std::string value = "1234567890";
-  while (state.KeepRunning()) {
+
+  for (auto _ : state) {
     BinaryBuilder builder;
     for (int64_t i = 0; i < iterations; i++) {
       ABORT_NOT_OK(builder.Append(value));
@@ -177,6 +178,26 @@ static void BM_BuildBinaryArray(benchmark::State& state) {  // NOLINT non-const
   state.SetBytesProcessed(state.iterations() * iterations * value.size());
 }
 
+static void BM_BuildChunkedBinaryArray(
+    benchmark::State& state) {  // NOLINT non-const reference
+  // About 160MB
+  const int64_t iterations = 1 << 24;
+  std::string value = "1234567890";
+
+  for (auto _ : state) {
+    // 1MB chunks
+    const int32_t chunksize = 1 << 20;
+    internal::ChunkedBinaryBuilder builder(chunksize);
+    for (int64_t i = 0; i < iterations; i++) {
+      ABORT_NOT_OK(builder.Append(reinterpret_cast<const uint8_t*>(value.data()),
+                                  static_cast<int32_t>(value.size())));
+    }
+    ArrayVector out;
+    ABORT_NOT_OK(builder.Finish(&out));
+  }
+  state.SetBytesProcessed(state.iterations() * iterations * value.size());
+}
+
 static void BM_BuildFixedSizeBinaryArray(
     benchmark::State& state) {  // NOLINT non-const reference
   const int64_t iterations = 1 << 20;
@@ -371,7 +392,8 @@ BENCHMARK(BM_BuildAdaptiveUIntNoNullsScalarAppend)
     ->Repetitions(kRepetitions)
     ->Unit(benchmark::kMicrosecond);
 
-BENCHMARK(BM_BuildBinaryArray)->Repetitions(kRepetitions)->Unit(benchmark::kMicrosecond);
+BENCHMARK(BM_BuildBinaryArray)->MinTime(1.0)->Unit(benchmark::kMicrosecond);
+BENCHMARK(BM_BuildChunkedBinaryArray)->MinTime(1.0)->Unit(benchmark::kMicrosecond);
 BENCHMARK(BM_BuildFixedSizeBinaryArray)
     ->Repetitions(kRepetitions)
     ->Unit(benchmark::kMicrosecond);
diff --git a/cpp/src/arrow/builder.cc b/cpp/src/arrow/builder.cc
index aef4df0..ff2b453 100644
--- a/cpp/src/arrow/builder.cc
+++ b/cpp/src/arrow/builder.cc
@@ -15,513 +15,20 @@
 // specific language governing permissions and limitations
 // under the License.
 
-#include <algorithm>
-#include <cstddef>
-#include <cstdint>
-#include <cstring>
+#include "arrow/builder.h"
+
 #include <sstream>
+#include <string>
 #include <utility>
 #include <vector>
 
-#include "arrow/array.h"
-#include "arrow/buffer.h"
-#include "arrow/builder.h"
 #include "arrow/status.h"
 #include "arrow/type.h"
-#include "arrow/type_traits.h"
-#include "arrow/util/bit-util.h"
 #include "arrow/util/checked_cast.h"
-#include "arrow/util/int-util.h"
-#include "arrow/util/logging.h"
 
 namespace arrow {
 
-using internal::checked_cast;
-
-Status ArrayBuilder::TrimBuffer(const int64_t bytes_filled, ResizableBuffer* buffer) {
-  if (buffer) {
-    if (bytes_filled < buffer->size()) {
-      // Trim buffer
-      RETURN_NOT_OK(buffer->Resize(bytes_filled));
-    }
-    // zero the padding
-    buffer->ZeroPadding();
-  } else {
-    // Null buffers are allowed in place of 0-byte buffers
-    DCHECK_EQ(bytes_filled, 0);
-  }
-  return Status::OK();
-}
-
-Status ArrayBuilder::AppendToBitmap(bool is_valid) {
-  if (length_ == capacity_) {
-    // If the capacity was not already a multiple of 2, do so here
-    // TODO(emkornfield) doubling isn't great default allocation practice
-    // see https://github.com/facebook/folly/blob/master/folly/docs/FBVector.md
-    // fo discussion
-    RETURN_NOT_OK(Resize(BitUtil::NextPower2(capacity_ + 1)));
-  }
-  UnsafeAppendToBitmap(is_valid);
-  return Status::OK();
-}
-
-Status ArrayBuilder::AppendToBitmap(const uint8_t* valid_bytes, int64_t length) {
-  RETURN_NOT_OK(Reserve(length));
-
-  UnsafeAppendToBitmap(valid_bytes, length);
-  return Status::OK();
-}
-
-Status ArrayBuilder::Resize(int64_t capacity) {
-  // Target size of validity (null) bitmap data
-  const int64_t new_bitmap_size = BitUtil::BytesForBits(capacity);
-  RETURN_NOT_OK(CheckCapacity(capacity, capacity_));
-
-  if (capacity_ == 0) {
-    RETURN_NOT_OK(AllocateResizableBuffer(pool_, new_bitmap_size, &null_bitmap_));
-    null_bitmap_data_ = null_bitmap_->mutable_data();
-
-    // Padding is zeroed by AllocateResizableBuffer
-    memset(null_bitmap_data_, 0, static_cast<size_t>(new_bitmap_size));
-  } else {
-    const int64_t old_bitmap_capacity = null_bitmap_->capacity();
-    RETURN_NOT_OK(null_bitmap_->Resize(new_bitmap_size));
-
-    const int64_t new_bitmap_capacity = null_bitmap_->capacity();
-    null_bitmap_data_ = null_bitmap_->mutable_data();
-
-    // Zero the region between the original capacity and the new capacity,
-    // including padding, which has not been zeroed, unlike
-    // AllocateResizableBuffer
-    if (old_bitmap_capacity < new_bitmap_capacity) {
-      memset(null_bitmap_data_ + old_bitmap_capacity, 0,
-             static_cast<size_t>(new_bitmap_capacity - old_bitmap_capacity));
-    }
-  }
-  capacity_ = capacity;
-  return Status::OK();
-}
-
-Status ArrayBuilder::Advance(int64_t elements) {
-  if (length_ + elements > capacity_) {
-    return Status::Invalid("Builder must be expanded");
-  }
-  length_ += elements;
-  return Status::OK();
-}
-
-Status ArrayBuilder::Finish(std::shared_ptr<Array>* out) {
-  std::shared_ptr<ArrayData> internal_data;
-  RETURN_NOT_OK(FinishInternal(&internal_data));
-  *out = MakeArray(internal_data);
-  return Status::OK();
-}
-
-Status ArrayBuilder::Reserve(int64_t additional_elements) {
-  if (length_ + additional_elements > capacity_) {
-    // TODO(emkornfield) power of 2 growth is potentially suboptimal
-    int64_t new_size = BitUtil::NextPower2(length_ + additional_elements);
-    return Resize(new_size);
-  }
-  return Status::OK();
-}
-
-void ArrayBuilder::Reset() {
-  capacity_ = length_ = null_count_ = 0;
-  null_bitmap_ = nullptr;
-}
-
-Status ArrayBuilder::SetNotNull(int64_t length) {
-  RETURN_NOT_OK(Reserve(length));
-  UnsafeSetNotNull(length);
-  return Status::OK();
-}
-
-void ArrayBuilder::UnsafeAppendToBitmap(const uint8_t* valid_bytes, int64_t length) {
-  if (valid_bytes == nullptr) {
-    UnsafeSetNotNull(length);
-    return;
-  }
-  UnsafeAppendToBitmap(valid_bytes, valid_bytes + length);
-}
-
-void ArrayBuilder::UnsafeAppendToBitmap(const std::vector<bool>& is_valid) {
-  UnsafeAppendToBitmap(is_valid.begin(), is_valid.end());
-}
-
-void ArrayBuilder::UnsafeSetNotNull(int64_t length) {
-  const int64_t new_length = length + length_;
-
-  // Fill up the bytes until we have a byte alignment
-  int64_t pad_to_byte = std::min<int64_t>(8 - (length_ % 8), length);
-
-  if (pad_to_byte == 8) {
-    pad_to_byte = 0;
-  }
-  for (int64_t i = length_; i < length_ + pad_to_byte; ++i) {
-    BitUtil::SetBit(null_bitmap_data_, i);
-  }
-
-  // Fast bitsetting
-  int64_t fast_length = (length - pad_to_byte) / 8;
-  memset(null_bitmap_data_ + ((length_ + pad_to_byte) / 8), 0xFF,
-         static_cast<size_t>(fast_length));
-
-  // Trailing bits
-  for (int64_t i = length_ + pad_to_byte + (fast_length * 8); i < new_length; ++i) {
-    BitUtil::SetBit(null_bitmap_data_, i);
-  }
-
-  length_ = new_length;
-}
-
-// ----------------------------------------------------------------------
-// Null builder
-
-Status NullBuilder::FinishInternal(std::shared_ptr<ArrayData>* out) {
-  *out = ArrayData::Make(null(), length_, {nullptr}, length_);
-  length_ = null_count_ = 0;
-  return Status::OK();
-}
-
-// ----------------------------------------------------------------------
-
-template <typename T>
-void PrimitiveBuilder<T>::Reset() {
-  data_.reset();
-  raw_data_ = nullptr;
-}
-
-template <typename T>
-Status PrimitiveBuilder<T>::Resize(int64_t capacity) {
-  RETURN_NOT_OK(CheckCapacity(capacity, capacity_));
-  capacity = std::max(capacity, kMinBuilderCapacity);
-
-  int64_t nbytes = TypeTraits<T>::bytes_required(capacity);
-  if (capacity_ == 0) {
-    RETURN_NOT_OK(AllocateResizableBuffer(pool_, nbytes, &data_));
-  } else {
-    RETURN_NOT_OK(data_->Resize(nbytes));
-  }
-
-  raw_data_ = reinterpret_cast<value_type*>(data_->mutable_data());
-  return ArrayBuilder::Resize(capacity);
-}
-
-template <typename T>
-Status PrimitiveBuilder<T>::AppendValues(const value_type* values, int64_t length,
-                                         const uint8_t* valid_bytes) {
-  RETURN_NOT_OK(Reserve(length));
-
-  if (length > 0) {
-    std::memcpy(raw_data_ + length_, values,
-                static_cast<std::size_t>(TypeTraits<T>::bytes_required(length)));
-  }
-
-  // length_ is update by these
-  ArrayBuilder::UnsafeAppendToBitmap(valid_bytes, length);
-  return Status::OK();
-}
-
-template <typename T>
-Status PrimitiveBuilder<T>::AppendValues(const value_type* values, int64_t length,
-                                         const std::vector<bool>& is_valid) {
-  RETURN_NOT_OK(Reserve(length));
-  DCHECK_EQ(length, static_cast<int64_t>(is_valid.size()));
-
-  if (length > 0) {
-    std::memcpy(raw_data_ + length_, values,
-                static_cast<std::size_t>(TypeTraits<T>::bytes_required(length)));
-  }
-
-  // length_ is update by these
-  ArrayBuilder::UnsafeAppendToBitmap(is_valid);
-  return Status::OK();
-}
-
-template <typename T>
-Status PrimitiveBuilder<T>::AppendValues(const std::vector<value_type>& values,
-                                         const std::vector<bool>& is_valid) {
-  return AppendValues(values.data(), static_cast<int64_t>(values.size()), is_valid);
-}
-
-template <typename T>
-Status PrimitiveBuilder<T>::AppendValues(const std::vector<value_type>& values) {
-  return AppendValues(values.data(), static_cast<int64_t>(values.size()));
-}
-
-template <typename T>
-Status PrimitiveBuilder<T>::FinishInternal(std::shared_ptr<ArrayData>* out) {
-  RETURN_NOT_OK(TrimBuffer(BitUtil::BytesForBits(length_), null_bitmap_.get()));
-  RETURN_NOT_OK(TrimBuffer(TypeTraits<T>::bytes_required(length_), data_.get()));
-
-  *out = ArrayData::Make(type_, length_, {null_bitmap_, data_}, null_count_);
-
-  data_ = null_bitmap_ = nullptr;
-  capacity_ = length_ = null_count_ = 0;
-
-  return Status::OK();
-}
-
-template class PrimitiveBuilder<UInt8Type>;
-template class PrimitiveBuilder<UInt16Type>;
-template class PrimitiveBuilder<UInt32Type>;
-template class PrimitiveBuilder<UInt64Type>;
-template class PrimitiveBuilder<Int8Type>;
-template class PrimitiveBuilder<Int16Type>;
-template class PrimitiveBuilder<Int32Type>;
-template class PrimitiveBuilder<Int64Type>;
-template class PrimitiveBuilder<Date32Type>;
-template class PrimitiveBuilder<Date64Type>;
-template class PrimitiveBuilder<Time32Type>;
-template class PrimitiveBuilder<Time64Type>;
-template class PrimitiveBuilder<TimestampType>;
-template class PrimitiveBuilder<HalfFloatType>;
-template class PrimitiveBuilder<FloatType>;
-template class PrimitiveBuilder<DoubleType>;
-
-BooleanBuilder::BooleanBuilder(MemoryPool* pool)
-    : ArrayBuilder(boolean(), pool), data_(nullptr), raw_data_(nullptr) {}
-
-BooleanBuilder::BooleanBuilder(const std::shared_ptr<DataType>& type, MemoryPool* pool)
-    : BooleanBuilder(pool) {
-  DCHECK_EQ(Type::BOOL, type->id());
-}
-
-void BooleanBuilder::Reset() {
-  ArrayBuilder::Reset();
-  data_.reset();
-  raw_data_ = nullptr;
-}
-
-Status BooleanBuilder::Resize(int64_t capacity) {
-  RETURN_NOT_OK(CheckCapacity(capacity, capacity_));
-  capacity = std::max(capacity, kMinBuilderCapacity);
-
-  const int64_t new_bitmap_size = BitUtil::BytesForBits(capacity);
-  if (capacity_ == 0) {
-    RETURN_NOT_OK(AllocateResizableBuffer(pool_, new_bitmap_size, &data_));
-    raw_data_ = reinterpret_cast<uint8_t*>(data_->mutable_data());
-
-    // We zero the memory for booleans to keep things simple; for some reason if
-    // we do not, even though we may write every bit (through in-place | or &),
-    // valgrind will still show a warning. If we do not zero the bytes here, we
-    // will have to be careful to zero them in AppendNull and AppendNulls. Also,
-    // zeroing the bits results in deterministic bits when each byte may have a
-    // mix of nulls and not nulls.
-    //
-    // We only zero up to new_bitmap_size because the padding was zeroed by
-    // AllocateResizableBuffer
-    memset(raw_data_, 0, static_cast<size_t>(new_bitmap_size));
-  } else {
-    const int64_t old_bitmap_capacity = data_->capacity();
-    RETURN_NOT_OK(data_->Resize(new_bitmap_size));
-    const int64_t new_bitmap_capacity = data_->capacity();
-    raw_data_ = reinterpret_cast<uint8_t*>(data_->mutable_data());
-
-    // See comment above about why we zero memory for booleans
-    memset(raw_data_ + old_bitmap_capacity, 0,
-           static_cast<size_t>(new_bitmap_capacity - old_bitmap_capacity));
-  }
-
-  return ArrayBuilder::Resize(capacity);
-}
-
-Status BooleanBuilder::FinishInternal(std::shared_ptr<ArrayData>* out) {
-  int64_t bit_offset = length_ % 8;
-  if (bit_offset > 0) {
-    // Adjust last byte
-    data_->mutable_data()[length_ / 8] &= BitUtil::kPrecedingBitmask[bit_offset];
-  }
-
-  RETURN_NOT_OK(TrimBuffer(BitUtil::BytesForBits(length_), null_bitmap_.get()));
-  RETURN_NOT_OK(TrimBuffer(BitUtil::BytesForBits(length_), data_.get()));
-
-  *out = ArrayData::Make(boolean(), length_, {null_bitmap_, data_}, null_count_);
-
-  data_ = null_bitmap_ = nullptr;
-  capacity_ = length_ = null_count_ = 0;
-  return Status::OK();
-}
-
-Status BooleanBuilder::AppendValues(const uint8_t* values, int64_t length,
-                                    const uint8_t* valid_bytes) {
-  RETURN_NOT_OK(Reserve(length));
-
-  int64_t i = 0;
-  internal::GenerateBitsUnrolled(raw_data_, length_, length,
-                                 [values, &i]() -> bool { return values[i++] != 0; });
-
-  // this updates length_
-  ArrayBuilder::UnsafeAppendToBitmap(valid_bytes, length);
-  return Status::OK();
-}
-
-Status BooleanBuilder::AppendValues(const uint8_t* values, int64_t length,
-                                    const std::vector<bool>& is_valid) {
-  RETURN_NOT_OK(Reserve(length));
-  DCHECK_EQ(length, static_cast<int64_t>(is_valid.size()));
-
-  int64_t i = 0;
-  internal::GenerateBitsUnrolled(raw_data_, length_, length,
-                                 [values, &i]() -> bool { return values[i++]; });
-
-  // this updates length_
-  ArrayBuilder::UnsafeAppendToBitmap(is_valid);
-  return Status::OK();
-}
-
-Status BooleanBuilder::AppendValues(const std::vector<uint8_t>& values,
-                                    const std::vector<bool>& is_valid) {
-  return AppendValues(values.data(), static_cast<int64_t>(values.size()), is_valid);
-}
-
-Status BooleanBuilder::AppendValues(const std::vector<uint8_t>& values) {
-  return AppendValues(values.data(), static_cast<int64_t>(values.size()));
-}
-
-Status BooleanBuilder::AppendValues(const std::vector<bool>& values,
-                                    const std::vector<bool>& is_valid) {
-  const int64_t length = static_cast<int64_t>(values.size());
-  RETURN_NOT_OK(Reserve(length));
-  DCHECK_EQ(length, static_cast<int64_t>(is_valid.size()));
-
-  int64_t i = 0;
-  internal::GenerateBitsUnrolled(raw_data_, length_, length,
-                                 [&values, &i]() -> bool { return values[i++]; });
-
-  // this updates length_
-  ArrayBuilder::UnsafeAppendToBitmap(is_valid);
-  return Status::OK();
-}
-
-Status BooleanBuilder::AppendValues(const std::vector<bool>& values) {
-  const int64_t length = static_cast<int64_t>(values.size());
-  RETURN_NOT_OK(Reserve(length));
-
-  int64_t i = 0;
-  internal::GenerateBitsUnrolled(raw_data_, length_, length,
-                                 [&values, &i]() -> bool { return values[i++]; });
-
-  // this updates length_
-  ArrayBuilder::UnsafeSetNotNull(length);
-  return Status::OK();
-}
-
-// ----------------------------------------------------------------------
-// ListBuilder
-
-ListBuilder::ListBuilder(MemoryPool* pool,
-                         std::shared_ptr<ArrayBuilder> const& value_builder,
-                         const std::shared_ptr<DataType>& type)
-    : ArrayBuilder(type ? type
-                        : std::static_pointer_cast<DataType>(
-                              std::make_shared<ListType>(value_builder->type())),
-                   pool),
-      offsets_builder_(pool),
-      value_builder_(value_builder) {}
-
-Status ListBuilder::AppendValues(const int32_t* offsets, int64_t length,
-                                 const uint8_t* valid_bytes) {
-  RETURN_NOT_OK(Reserve(length));
-  UnsafeAppendToBitmap(valid_bytes, length);
-  offsets_builder_.UnsafeAppend(offsets, length);
-  return Status::OK();
-}
-
-Status ListBuilder::AppendNextOffset() {
-  int64_t num_values = value_builder_->length();
-  if (ARROW_PREDICT_FALSE(num_values > kListMaximumElements)) {
-    std::stringstream ss;
-    ss << "ListArray cannot contain more then INT32_MAX - 1 child elements,"
-       << " have " << num_values;
-    return Status::CapacityError(ss.str());
-  }
-  return offsets_builder_.Append(static_cast<int32_t>(num_values));
-}
-
-Status ListBuilder::Append(bool is_valid) {
-  RETURN_NOT_OK(Reserve(1));
-  UnsafeAppendToBitmap(is_valid);
-  return AppendNextOffset();
-}
-
-Status ListBuilder::Resize(int64_t capacity) {
-  DCHECK_LE(capacity, kListMaximumElements);
-  RETURN_NOT_OK(CheckCapacity(capacity, capacity_));
-
-  // one more then requested for offsets
-  RETURN_NOT_OK(offsets_builder_.Resize((capacity + 1) * sizeof(int32_t)));
-  return ArrayBuilder::Resize(capacity);
-}
-
-Status ListBuilder::FinishInternal(std::shared_ptr<ArrayData>* out) {
-  RETURN_NOT_OK(AppendNextOffset());
-
-  // Offset padding zeroed by BufferBuilder
-  std::shared_ptr<Buffer> offsets;
-  RETURN_NOT_OK(offsets_builder_.Finish(&offsets));
-
-  std::shared_ptr<ArrayData> items;
-  if (values_) {
-    items = values_->data();
-  } else {
-    if (value_builder_->length() == 0) {
-      // Try to make sure we get a non-null values buffer (ARROW-2744)
-      RETURN_NOT_OK(value_builder_->Resize(0));
-    }
-    RETURN_NOT_OK(value_builder_->FinishInternal(&items));
-  }
-
-  *out = ArrayData::Make(type_, length_, {null_bitmap_, offsets}, null_count_);
-  (*out)->child_data.emplace_back(std::move(items));
-  Reset();
-  return Status::OK();
-}
-
-void ListBuilder::Reset() {
-  ArrayBuilder::Reset();
-  values_.reset();
-  offsets_builder_.Reset();
-  value_builder_->Reset();
-}
-
-ArrayBuilder* ListBuilder::value_builder() const {
-  DCHECK(!values_) << "Using value builder is pointless when values_ is set";
-  return value_builder_.get();
-}
-
-// ----------------------------------------------------------------------
-// Struct
-
-StructBuilder::StructBuilder(const std::shared_ptr<DataType>& type, MemoryPool* pool,
-                             std::vector<std::shared_ptr<ArrayBuilder>>&& field_builders)
-    : ArrayBuilder(type, pool), field_builders_(std::move(field_builders)) {}
-
-void StructBuilder::Reset() {
-  ArrayBuilder::Reset();
-  for (const auto& field_builder : field_builders_) {
-    field_builder->Reset();
-  }
-}
-
-Status StructBuilder::FinishInternal(std::shared_ptr<ArrayData>* out) {
-  RETURN_NOT_OK(TrimBuffer(BitUtil::BytesForBits(length_), null_bitmap_.get()));
-  *out = ArrayData::Make(type_, length_, {null_bitmap_}, null_count_);
-
-  (*out)->child_data.resize(field_builders_.size());
-  for (size_t i = 0; i < field_builders_.size(); ++i) {
-    if (length_ == 0) {
-      // Try to make sure the child buffers are initialized
-      RETURN_NOT_OK(field_builders_[i]->Resize(0));
-    }
-    RETURN_NOT_OK(field_builders_[i]->FinishInternal(&(*out)->child_data[i]));
-  }
-
-  null_bitmap_ = nullptr;
-  capacity_ = length_ = null_count_ = 0;
-  return Status::OK();
-}
+class MemoryPool;
 
 // ----------------------------------------------------------------------
 // Helper functions
@@ -566,7 +73,7 @@ Status MakeBuilder(MemoryPool* pool, const std::shared_ptr<DataType>& type,
     case Type::LIST: {
       std::unique_ptr<ArrayBuilder> value_builder;
       std::shared_ptr<DataType> value_type =
-          checked_cast<const ListType&>(*type).value_type();
+          internal::checked_cast<const ListType&>(*type).value_type();
       RETURN_NOT_OK(MakeBuilder(pool, value_type, &value_builder));
       out->reset(new ListBuilder(pool, std::move(value_builder)));
       return Status::OK();
diff --git a/cpp/src/arrow/builder.h b/cpp/src/arrow/builder.h
index d001667..a7ab22c 100644
--- a/cpp/src/arrow/builder.h
+++ b/cpp/src/arrow/builder.h
@@ -15,1184 +15,27 @@
 // specific language governing permissions and limitations
 // under the License.
 
-#ifndef ARROW_BUILDER_H
-#define ARROW_BUILDER_H
+#pragma once
 
-#include <algorithm>  // IWYU pragma: keep
-#include <array>
-#include <cstddef>
-#include <cstdint>
-#include <cstring>
-#include <iterator>
-#include <limits>
 #include <memory>
-#include <string>
-#include <type_traits>
-#include <vector>
 
-#include "arrow/buffer.h"
-#include "arrow/memory_pool.h"
+#include "arrow/array/builder_adaptive.h"   // IWYU pragma: export
+#include "arrow/array/builder_base.h"       // IWYU pragma: export
+#include "arrow/array/builder_binary.h"     // IWYU pragma: export
+#include "arrow/array/builder_decimal.h"    // IWYU pragma: export
+#include "arrow/array/builder_dict.h"       // IWYU pragma: export
+#include "arrow/array/builder_nested.h"     // IWYU pragma: export
+#include "arrow/array/builder_primitive.h"  // IWYU pragma: export
 #include "arrow/status.h"
-#include "arrow/type.h"
-#include "arrow/type_traits.h"
-#include "arrow/util/bit-util.h"
-#include "arrow/util/macros.h"
-#include "arrow/util/string_view.h"
-#include "arrow/util/type_traits.h"
 #include "arrow/util/visibility.h"
 
 namespace arrow {
 
-class Array;
-struct ArrayData;
-class Decimal128;
-
-constexpr int64_t kBinaryMemoryLimit = std::numeric_limits<int32_t>::max() - 1;
-constexpr int64_t kListMaximumElements = std::numeric_limits<int32_t>::max() - 1;
-
-constexpr int64_t kMinBuilderCapacity = 1 << 5;
-
-/// Base class for all data array builders.
-///
-/// This class provides a facilities for incrementally building the null bitmap
-/// (see Append methods) and as a side effect the current number of slots and
-/// the null count.
-///
-/// \note Users are expected to use builders as one of the concrete types below.
-/// For example, ArrayBuilder* pointing to BinaryBuilder should be downcast before use.
-class ARROW_EXPORT ArrayBuilder {
- public:
-  explicit ArrayBuilder(const std::shared_ptr<DataType>& type, MemoryPool* pool)
-      : type_(type),
-        pool_(pool),
-        null_bitmap_(NULLPTR),
-        null_count_(0),
-        null_bitmap_data_(NULLPTR),
-        length_(0),
-        capacity_(0) {}
-
-  virtual ~ArrayBuilder() = default;
-
-  /// For nested types. Since the objects are owned by this class instance, we
-  /// skip shared pointers and just return a raw pointer
-  ArrayBuilder* child(int i) { return children_[i].get(); }
-
-  int num_children() const { return static_cast<int>(children_.size()); }
-
-  int64_t length() const { return length_; }
-  int64_t null_count() const { return null_count_; }
-  int64_t capacity() const { return capacity_; }
-
-  /// \brief Ensure that enough memory has been allocated to fit the indicated
-  /// number of total elements in the builder, including any that have already
-  /// been appended. Does not account for reallocations that may be due to
-  /// variable size data, like binary values. To make space for incremental
-  /// appends, use Reserve instead.
-  ///
-  /// \param[in] capacity the minimum number of total array values to
-  ///            accommodate. Must be greater than the current capacity.
-  /// \return Status
-  virtual Status Resize(int64_t capacity);
-
-  /// \brief Ensure that there is enough space allocated to add the indicated
-  /// number of elements without any further calls to Resize. The memory
-  /// allocated is rounded up to the next highest power of 2 similar to memory
-  /// allocations in STL containers like std::vector
-  /// \param[in] additional_capacity the number of additional array values
-  /// \return Status
-  Status Reserve(int64_t additional_capacity);
-
-  /// Reset the builder.
-  virtual void Reset();
-
-  /// For cases where raw data was memcpy'd into the internal buffers, allows us
-  /// to advance the length of the builder. It is your responsibility to use
-  /// this function responsibly.
-  Status Advance(int64_t elements);
-
-  /// \brief Return result of builder as an internal generic ArrayData
-  /// object. Resets builder except for dictionary builder
-  ///
-  /// \param[out] out the finalized ArrayData object
-  /// \return Status
-  virtual Status FinishInternal(std::shared_ptr<ArrayData>* out) = 0;
-
-  /// \brief Return result of builder as an Array object.
-  ///
-  /// The builder is reset except for DictionaryBuilder.
-  ///
-  /// \param[out] out the finalized Array object
-  /// \return Status
-  Status Finish(std::shared_ptr<Array>* out);
-
-  std::shared_ptr<DataType> type() const { return type_; }
-
- protected:
-  ArrayBuilder() {}
-
-  /// Append to null bitmap
-  Status AppendToBitmap(bool is_valid);
-
-  /// Vector append. Treat each zero byte as a null.   If valid_bytes is null
-  /// assume all of length bits are valid.
-  Status AppendToBitmap(const uint8_t* valid_bytes, int64_t length);
-
-  /// Set the next length bits to not null (i.e. valid).
-  Status SetNotNull(int64_t length);
-
-  // Unsafe operations (don't check capacity/don't resize)
-
-  void UnsafeAppendNull() { UnsafeAppendToBitmap(false); }
-
-  // Append to null bitmap, update the length
-  void UnsafeAppendToBitmap(bool is_valid) {
-    if (is_valid) {
-      BitUtil::SetBit(null_bitmap_data_, length_);
-    } else {
-      ++null_count_;
-    }
-    ++length_;
-  }
-
-  template <typename IterType>
-  void UnsafeAppendToBitmap(const IterType& begin, const IterType& end) {
-    int64_t byte_offset = length_ / 8;
-    int64_t bit_offset = length_ % 8;
-    uint8_t bitset = null_bitmap_data_[byte_offset];
-
-    for (auto iter = begin; iter != end; ++iter) {
-      if (bit_offset == 8) {
-        bit_offset = 0;
-        null_bitmap_data_[byte_offset] = bitset;
-        byte_offset++;
-        // TODO: Except for the last byte, this shouldn't be needed
-        bitset = null_bitmap_data_[byte_offset];
-      }
-
-      if (*iter) {
-        bitset |= BitUtil::kBitmask[bit_offset];
-      } else {
-        bitset &= BitUtil::kFlippedBitmask[bit_offset];
-        ++null_count_;
-      }
-
-      bit_offset++;
-    }
-
-    if (bit_offset != 0) {
-      null_bitmap_data_[byte_offset] = bitset;
-    }
-
-    length_ += std::distance(begin, end);
-  }
-
-  // Vector append. Treat each zero byte as a nullzero. If valid_bytes is null
-  // assume all of length bits are valid.
-  void UnsafeAppendToBitmap(const uint8_t* valid_bytes, int64_t length);
-
-  void UnsafeAppendToBitmap(const std::vector<bool>& is_valid);
-
-  // Set the next length bits to not null (i.e. valid).
-  void UnsafeSetNotNull(int64_t length);
-
-  static Status TrimBuffer(const int64_t bytes_filled, ResizableBuffer* buffer);
-
-  static Status CheckCapacity(int64_t new_capacity, int64_t old_capacity) {
-    if (new_capacity < 0) {
-      return Status::Invalid("Resize capacity must be positive");
-    }
-    if (new_capacity < old_capacity) {
-      return Status::Invalid("Resize cannot downsize");
-    }
-    return Status::OK();
-  }
-
-  std::shared_ptr<DataType> type_;
-  MemoryPool* pool_;
-
-  // When null_bitmap are first appended to the builder, the null bitmap is allocated
-  std::shared_ptr<ResizableBuffer> null_bitmap_;
-  int64_t null_count_;
-  uint8_t* null_bitmap_data_;
-
-  // Array length, so far. Also, the index of the next element to be added
-  int64_t length_;
-  int64_t capacity_;
-
-  // Child value array builders. These are owned by this class
-  std::vector<std::unique_ptr<ArrayBuilder>> children_;
-
- private:
-  ARROW_DISALLOW_COPY_AND_ASSIGN(ArrayBuilder);
-};
-
-class ARROW_EXPORT NullBuilder : public ArrayBuilder {
- public:
-  explicit NullBuilder(MemoryPool* pool ARROW_MEMORY_POOL_DEFAULT)
-      : ArrayBuilder(null(), pool) {}
-
-  Status AppendNull() {
-    ++null_count_;
-    ++length_;
-    return Status::OK();
-  }
-
-  Status Append(std::nullptr_t value) { return AppendNull(); }
-
-  Status FinishInternal(std::shared_ptr<ArrayData>* out) override;
-};
-
-template <typename Type>
-class ARROW_EXPORT PrimitiveBuilder : public ArrayBuilder {
- public:
-  using value_type = typename Type::c_type;
-
-  explicit PrimitiveBuilder(const std::shared_ptr<DataType>& type, MemoryPool* pool)
-      : ArrayBuilder(type, pool), data_(NULLPTR), raw_data_(NULLPTR) {}
-
-  using ArrayBuilder::Advance;
-
-  /// Write nulls as uint8_t* (0 value indicates null) into pre-allocated memory
-  /// The memory at the corresponding data slot is set to 0 to prevent uninitialized
-  /// memory access
-  Status AppendNulls(const uint8_t* valid_bytes, int64_t length) {
-    ARROW_RETURN_NOT_OK(Reserve(length));
-    memset(raw_data_ + length_, 0,
-           static_cast<size_t>(TypeTraits<Type>::bytes_required(length)));
-    UnsafeAppendToBitmap(valid_bytes, length);
-    return Status::OK();
-  }
-
-  /// \brief Append a single null element
-  Status AppendNull() {
-    ARROW_RETURN_NOT_OK(Reserve(1));
-    memset(raw_data_ + length_, 0, sizeof(value_type));
-    UnsafeAppendToBitmap(false);
-    return Status::OK();
-  }
-
-  value_type GetValue(int64_t index) const {
-    return reinterpret_cast<const value_type*>(data_->data())[index];
-  }
-
-  /// \brief Append a sequence of elements in one shot
-  /// \param[in] values a contiguous C array of values
-  /// \param[in] length the number of values to append
-  /// \param[in] valid_bytes an optional sequence of bytes where non-zero
-  /// indicates a valid (non-null) value
-  /// \return Status
-  Status AppendValues(const value_type* values, int64_t length,
-                      const uint8_t* valid_bytes = NULLPTR);
-
-  /// \brief Append a sequence of elements in one shot
-  /// \param[in] values a contiguous C array of values
-  /// \param[in] length the number of values to append
-  /// \param[in] is_valid an std::vector<bool> indicating valid (1) or null
-  /// (0). Equal in length to values
-  /// \return Status
-  Status AppendValues(const value_type* values, int64_t length,
-                      const std::vector<bool>& is_valid);
-
-  /// \brief Append a sequence of elements in one shot
-  /// \param[in] values a std::vector of values
-  /// \param[in] is_valid an std::vector<bool> indicating valid (1) or null
-  /// (0). Equal in length to values
-  /// \return Status
-  Status AppendValues(const std::vector<value_type>& values,
-                      const std::vector<bool>& is_valid);
-
-  /// \brief Append a sequence of elements in one shot
-  /// \param[in] values a std::vector of values
-  /// \return Status
-  Status AppendValues(const std::vector<value_type>& values);
-
-  /// \brief Append a sequence of elements in one shot
-  /// \param[in] values_begin InputIterator to the beginning of the values
-  /// \param[in] values_end InputIterator pointing to the end of the values
-  /// \return Status
-
-  template <typename ValuesIter>
-  Status AppendValues(ValuesIter values_begin, ValuesIter values_end) {
-    int64_t length = static_cast<int64_t>(std::distance(values_begin, values_end));
-    ARROW_RETURN_NOT_OK(Reserve(length));
-
-    std::copy(values_begin, values_end, raw_data_ + length_);
-
-    // this updates the length_
-    UnsafeSetNotNull(length);
-    return Status::OK();
-  }
-
-  /// \brief Append a sequence of elements in one shot, with a specified nullmap
-  /// \param[in] values_begin InputIterator to the beginning of the values
-  /// \param[in] values_end InputIterator pointing to the end of the values
-  /// \param[in] valid_begin InputIterator with elements indication valid(1)
-  ///  or null(0) values.
-  /// \return Status
-  template <typename ValuesIter, typename ValidIter>
-  typename std::enable_if<!std::is_pointer<ValidIter>::value, Status>::type AppendValues(
-      ValuesIter values_begin, ValuesIter values_end, ValidIter valid_begin) {
-    static_assert(!internal::is_null_pointer<ValidIter>::value,
-                  "Don't pass a NULLPTR directly as valid_begin, use the 2-argument "
-                  "version instead");
-    int64_t length = static_cast<int64_t>(std::distance(values_begin, values_end));
-    ARROW_RETURN_NOT_OK(Reserve(length));
-
-    std::copy(values_begin, values_end, raw_data_ + length_);
-
-    // this updates the length_
-    UnsafeAppendToBitmap(valid_begin, std::next(valid_begin, length));
-    return Status::OK();
-  }
-
-  // Same as above, with a pointer type ValidIter
-  template <typename ValuesIter, typename ValidIter>
-  typename std::enable_if<std::is_pointer<ValidIter>::value, Status>::type AppendValues(
-      ValuesIter values_begin, ValuesIter values_end, ValidIter valid_begin) {
-    int64_t length = static_cast<int64_t>(std::distance(values_begin, values_end));
-    ARROW_RETURN_NOT_OK(Reserve(length));
-
-    std::copy(values_begin, values_end, raw_data_ + length_);
-
-    // this updates the length_
-    if (valid_begin == NULLPTR) {
-      UnsafeSetNotNull(length);
-    } else {
-      UnsafeAppendToBitmap(valid_begin, std::next(valid_begin, length));
-    }
-
-    return Status::OK();
-  }
-
-  Status FinishInternal(std::shared_ptr<ArrayData>* out) override;
-  void Reset() override;
-
-  Status Resize(int64_t capacity) override;
-
- protected:
-  std::shared_ptr<ResizableBuffer> data_;
-  value_type* raw_data_;
-};
-
-/// Base class for all Builders that emit an Array of a scalar numerical type.
-template <typename T>
-class ARROW_EXPORT NumericBuilder : public PrimitiveBuilder<T> {
- public:
-  using typename PrimitiveBuilder<T>::value_type;
-  using PrimitiveBuilder<T>::PrimitiveBuilder;
-
-  template <typename T1 = T>
-  explicit NumericBuilder(
-      typename std::enable_if<TypeTraits<T1>::is_parameter_free, MemoryPool*>::type pool
-          ARROW_MEMORY_POOL_DEFAULT)
-      : PrimitiveBuilder<T1>(TypeTraits<T1>::type_singleton(), pool) {}
-
-  using ArrayBuilder::UnsafeAppendNull;
-  using PrimitiveBuilder<T>::AppendValues;
-  using PrimitiveBuilder<T>::Resize;
-  using PrimitiveBuilder<T>::Reserve;
-
-  /// Append a single scalar and increase the size if necessary.
-  Status Append(const value_type val) {
-    ARROW_RETURN_NOT_OK(ArrayBuilder::Reserve(1));
-    UnsafeAppend(val);
-    return Status::OK();
-  }
-
-  /// Append a single scalar under the assumption that the underlying Buffer is
-  /// large enough.
-  ///
-  /// This method does not capacity-check; make sure to call Reserve
-  /// beforehand.
-  void UnsafeAppend(const value_type val) {
-    BitUtil::SetBit(null_bitmap_data_, length_);
-    raw_data_[length_++] = val;
-  }
-
- protected:
-  using PrimitiveBuilder<T>::length_;
-  using PrimitiveBuilder<T>::null_bitmap_data_;
-  using PrimitiveBuilder<T>::raw_data_;
-};
-
-// Builders
-
-using UInt8Builder = NumericBuilder<UInt8Type>;
-using UInt16Builder = NumericBuilder<UInt16Type>;
-using UInt32Builder = NumericBuilder<UInt32Type>;
-using UInt64Builder = NumericBuilder<UInt64Type>;
-
-using Int8Builder = NumericBuilder<Int8Type>;
-using Int16Builder = NumericBuilder<Int16Type>;
-using Int32Builder = NumericBuilder<Int32Type>;
-using Int64Builder = NumericBuilder<Int64Type>;
-using TimestampBuilder = NumericBuilder<TimestampType>;
-using Time32Builder = NumericBuilder<Time32Type>;
-using Time64Builder = NumericBuilder<Time64Type>;
-using Date32Builder = NumericBuilder<Date32Type>;
-using Date64Builder = NumericBuilder<Date64Type>;
-
-using HalfFloatBuilder = NumericBuilder<HalfFloatType>;
-using FloatBuilder = NumericBuilder<FloatType>;
-using DoubleBuilder = NumericBuilder<DoubleType>;
-
-namespace internal {
-
-class ARROW_EXPORT AdaptiveIntBuilderBase : public ArrayBuilder {
- public:
-  explicit AdaptiveIntBuilderBase(MemoryPool* pool);
-
-  /// Write nulls as uint8_t* (0 value indicates null) into pre-allocated memory
-  Status AppendNulls(const uint8_t* valid_bytes, int64_t length) {
-    ARROW_RETURN_NOT_OK(CommitPendingData());
-    ARROW_RETURN_NOT_OK(Reserve(length));
-    memset(data_->mutable_data() + length_ * int_size_, 0, int_size_ * length);
-    UnsafeAppendToBitmap(valid_bytes, length);
-    return Status::OK();
-  }
-
-  Status AppendNull() {
-    pending_data_[pending_pos_] = 0;
-    pending_valid_[pending_pos_] = 0;
-    pending_has_nulls_ = true;
-    ++pending_pos_;
-
-    if (ARROW_PREDICT_FALSE(pending_pos_ >= pending_size_)) {
-      return CommitPendingData();
-    }
-    return Status::OK();
-  }
-
-  void Reset() override;
-  Status Resize(int64_t capacity) override;
-
- protected:
-  virtual Status CommitPendingData() = 0;
-
-  std::shared_ptr<ResizableBuffer> data_;
-  uint8_t* raw_data_;
-  uint8_t int_size_;
-
-  static constexpr int32_t pending_size_ = 1024;
-  uint8_t pending_valid_[pending_size_];
-  uint64_t pending_data_[pending_size_];
-  int32_t pending_pos_;
-  bool pending_has_nulls_;
-};
-
-}  // namespace internal
-
-class ARROW_EXPORT AdaptiveUIntBuilder : public internal::AdaptiveIntBuilderBase {
- public:
-  explicit AdaptiveUIntBuilder(MemoryPool* pool ARROW_MEMORY_POOL_DEFAULT);
-
-  using ArrayBuilder::Advance;
-  using internal::AdaptiveIntBuilderBase::Reset;
-
-  /// Scalar append
-  Status Append(const uint64_t val) {
-    pending_data_[pending_pos_] = val;
-    pending_valid_[pending_pos_] = 1;
-    ++pending_pos_;
-
-    if (ARROW_PREDICT_FALSE(pending_pos_ >= pending_size_)) {
-      return CommitPendingData();
-    }
-    return Status::OK();
-  }
-
-  /// \brief Append a sequence of elements in one shot
-  /// \param[in] values a contiguous C array of values
-  /// \param[in] length the number of values to append
-  /// \param[in] valid_bytes an optional sequence of bytes where non-zero
-  /// indicates a valid (non-null) value
-  /// \return Status
-  Status AppendValues(const uint64_t* values, int64_t length,
-                      const uint8_t* valid_bytes = NULLPTR);
-
-  Status FinishInternal(std::shared_ptr<ArrayData>* out) override;
-
- protected:
-  Status CommitPendingData() override;
-  Status ExpandIntSize(uint8_t new_int_size);
-
-  Status AppendValuesInternal(const uint64_t* values, int64_t length,
-                              const uint8_t* valid_bytes);
-
-  template <typename new_type, typename old_type>
-  typename std::enable_if<sizeof(old_type) >= sizeof(new_type), Status>::type
-  ExpandIntSizeInternal();
-#define __LESS(a, b) (a) < (b)
-  template <typename new_type, typename old_type>
-  typename std::enable_if<__LESS(sizeof(old_type), sizeof(new_type)), Status>::type
-  ExpandIntSizeInternal();
-#undef __LESS
-
-  template <typename new_type>
-  Status ExpandIntSizeN();
-};
-
-class ARROW_EXPORT AdaptiveIntBuilder : public internal::AdaptiveIntBuilderBase {
- public:
-  explicit AdaptiveIntBuilder(MemoryPool* pool ARROW_MEMORY_POOL_DEFAULT);
-
-  using ArrayBuilder::Advance;
-  using internal::AdaptiveIntBuilderBase::Reset;
-
-  /// Scalar append
-  Status Append(const int64_t val) {
-    auto v = static_cast<uint64_t>(val);
-
-    pending_data_[pending_pos_] = v;
-    pending_valid_[pending_pos_] = 1;
-    ++pending_pos_;
-
-    if (ARROW_PREDICT_FALSE(pending_pos_ >= pending_size_)) {
-      return CommitPendingData();
-    }
-    return Status::OK();
-  }
-
-  /// \brief Append a sequence of elements in one shot
-  /// \param[in] values a contiguous C array of values
-  /// \param[in] length the number of values to append
-  /// \param[in] valid_bytes an optional sequence of bytes where non-zero
-  /// indicates a valid (non-null) value
-  /// \return Status
-  Status AppendValues(const int64_t* values, int64_t length,
-                      const uint8_t* valid_bytes = NULLPTR);
-
-  Status FinishInternal(std::shared_ptr<ArrayData>* out) override;
-
- protected:
-  Status CommitPendingData() override;
-  Status ExpandIntSize(uint8_t new_int_size);
-
-  Status AppendValuesInternal(const int64_t* values, int64_t length,
-                              const uint8_t* valid_bytes);
-
-  template <typename new_type, typename old_type>
-  typename std::enable_if<sizeof(old_type) >= sizeof(new_type), Status>::type
-  ExpandIntSizeInternal();
-#define __LESS(a, b) (a) < (b)
-  template <typename new_type, typename old_type>
-  typename std::enable_if<__LESS(sizeof(old_type), sizeof(new_type)), Status>::type
-  ExpandIntSizeInternal();
-#undef __LESS
-
-  template <typename new_type>
-  Status ExpandIntSizeN();
-};
-
-class ARROW_EXPORT BooleanBuilder : public ArrayBuilder {
- public:
-  using value_type = bool;
-  explicit BooleanBuilder(MemoryPool* pool ARROW_MEMORY_POOL_DEFAULT);
-
-  explicit BooleanBuilder(const std::shared_ptr<DataType>& type, MemoryPool* pool);
-
-  using ArrayBuilder::Advance;
-  using ArrayBuilder::UnsafeAppendNull;
-
-  /// Write nulls as uint8_t* (0 value indicates null) into pre-allocated memory
-  Status AppendNulls(const uint8_t* valid_bytes, int64_t length) {
-    ARROW_RETURN_NOT_OK(Reserve(length));
-    UnsafeAppendToBitmap(valid_bytes, length);
-
-    return Status::OK();
-  }
-
-  Status AppendNull() {
-    ARROW_RETURN_NOT_OK(Reserve(1));
-    UnsafeAppendToBitmap(false);
-
-    return Status::OK();
-  }
-
-  /// Scalar append
-  Status Append(const bool val) {
-    ARROW_RETURN_NOT_OK(Reserve(1));
-    UnsafeAppend(val);
-    return Status::OK();
-  }
-
-  Status Append(const uint8_t val) { return Append(val != 0); }
-
-  /// Scalar append, without checking for capacity
-  void UnsafeAppend(const bool val) {
-    BitUtil::SetBit(null_bitmap_data_, length_);
-    if (val) {
-      BitUtil::SetBit(raw_data_, length_);
-    } else {
-      BitUtil::ClearBit(raw_data_, length_);
-    }
-    ++length_;
-  }
-
-  void UnsafeAppend(const uint8_t val) { UnsafeAppend(val != 0); }
-
-  /// \brief Append a sequence of elements in one shot
-  /// \param[in] values a contiguous array of bytes (non-zero is 1)
-  /// \param[in] length the number of values to append
-  /// \param[in] valid_bytes an optional sequence of bytes where non-zero
-  /// indicates a valid (non-null) value
-  /// \return Status
-  Status AppendValues(const uint8_t* values, int64_t length,
-                      const uint8_t* valid_bytes = NULLPTR);
-
-  /// \brief Append a sequence of elements in one shot
-  /// \param[in] values a contiguous C array of values
-  /// \param[in] length the number of values to append
-  /// \param[in] is_valid an std::vector<bool> indicating valid (1) or null
-  /// (0). Equal in length to values
-  /// \return Status
-  Status AppendValues(const uint8_t* values, int64_t length,
-                      const std::vector<bool>& is_valid);
-
-  /// \brief Append a sequence of elements in one shot
-  /// \param[in] values a std::vector of bytes
-  /// \param[in] is_valid an std::vector<bool> indicating valid (1) or null
-  /// (0). Equal in length to values
-  /// \return Status
-  Status AppendValues(const std::vector<uint8_t>& values,
-                      const std::vector<bool>& is_valid);
-
-  /// \brief Append a sequence of elements in one shot
-  /// \param[in] values a std::vector of bytes
-  /// \return Status
-  Status AppendValues(const std::vector<uint8_t>& values);
-
-  /// \brief Append a sequence of elements in one shot
-  /// \param[in] values an std::vector<bool> indicating true (1) or false
-  /// \param[in] is_valid an std::vector<bool> indicating valid (1) or null
-  /// (0). Equal in length to values
-  /// \return Status
-  Status AppendValues(const std::vector<bool>& values, const std::vector<bool>& is_valid);
-
-  /// \brief Append a sequence of elements in one shot
-  /// \param[in] values an std::vector<bool> indicating true (1) or false
-  /// \return Status
-  Status AppendValues(const std::vector<bool>& values);
-
-  /// \brief Append a sequence of elements in one shot
-  /// \param[in] values_begin InputIterator to the beginning of the values
-  /// \param[in] values_end InputIterator pointing to the end of the values
-  ///  or null(0) values
-  /// \return Status
-  template <typename ValuesIter>
-  Status AppendValues(ValuesIter values_begin, ValuesIter values_end) {
-    int64_t length = static_cast<int64_t>(std::distance(values_begin, values_end));
-    ARROW_RETURN_NOT_OK(Reserve(length));
-    auto iter = values_begin;
-    internal::GenerateBitsUnrolled(raw_data_, length_, length,
-                                   [&iter]() -> bool { return *(iter++); });
-
-    // this updates length_
-    UnsafeSetNotNull(length);
-    return Status::OK();
-  }
-
-  /// \brief Append a sequence of elements in one shot, with a specified nullmap
-  /// \param[in] values_begin InputIterator to the beginning of the values
-  /// \param[in] values_end InputIterator pointing to the end of the values
-  /// \param[in] valid_begin InputIterator with elements indication valid(1)
-  ///  or null(0) values
-  /// \return Status
-  template <typename ValuesIter, typename ValidIter>
-  typename std::enable_if<!std::is_pointer<ValidIter>::value, Status>::type AppendValues(
-      ValuesIter values_begin, ValuesIter values_end, ValidIter valid_begin) {
-    static_assert(!internal::is_null_pointer<ValidIter>::value,
-                  "Don't pass a NULLPTR directly as valid_begin, use the 2-argument "
-                  "version instead");
-    int64_t length = static_cast<int64_t>(std::distance(values_begin, values_end));
-    ARROW_RETURN_NOT_OK(Reserve(length));
-
-    auto iter = values_begin;
-    internal::GenerateBitsUnrolled(raw_data_, length_, length,
-                                   [&iter]() -> bool { return *(iter++); });
-
-    // this updates length_
-    ArrayBuilder::UnsafeAppendToBitmap(valid_begin, std::next(valid_begin, length));
-    return Status::OK();
-  }
-
-  // Same as above, for a pointer type ValidIter
-  template <typename ValuesIter, typename ValidIter>
-  typename std::enable_if<std::is_pointer<ValidIter>::value, Status>::type AppendValues(
-      ValuesIter values_begin, ValuesIter values_end, ValidIter valid_begin) {
-    int64_t length = static_cast<int64_t>(std::distance(values_begin, values_end));
-    ARROW_RETURN_NOT_OK(Reserve(length));
-
-    auto iter = values_begin;
-    internal::GenerateBitsUnrolled(raw_data_, length_, length,
-                                   [&iter]() -> bool { return *(iter++); });
-
-    // this updates the length_
-    if (valid_begin == NULLPTR) {
-      UnsafeSetNotNull(length);
-    } else {
-      UnsafeAppendToBitmap(valid_begin, std::next(valid_begin, length));
-    }
-
-    return Status::OK();
-  }
-
-  Status FinishInternal(std::shared_ptr<ArrayData>* out) override;
-  void Reset() override;
-  Status Resize(int64_t capacity) override;
-
- protected:
-  std::shared_ptr<ResizableBuffer> data_;
-  uint8_t* raw_data_;
-};
-
-// ----------------------------------------------------------------------
-// List builder
-
-/// \class ListBuilder
-/// \brief Builder class for variable-length list array value types
-///
-/// To use this class, you must append values to the child array builder and use
-/// the Append function to delimit each distinct list value (once the values
-/// have been appended to the child array) or use the bulk API to append
-/// a sequence of offests and null values.
-///
-/// A note on types.  Per arrow/type.h all types in the c++ implementation are
-/// logical so even though this class always builds list array, this can
-/// represent multiple different logical types.  If no logical type is provided
-/// at construction time, the class defaults to List<T> where t is taken from the
-/// value_builder/values that the object is constructed with.
-class ARROW_EXPORT ListBuilder : public ArrayBuilder {
- public:
-  /// Use this constructor to incrementally build the value array along with offsets and
-  /// null bitmap.
-  ListBuilder(MemoryPool* pool, std::shared_ptr<ArrayBuilder> const& value_builder,
-              const std::shared_ptr<DataType>& type = NULLPTR);
-
-  Status Resize(int64_t capacity) override;
-  void Reset() override;
-  Status FinishInternal(std::shared_ptr<ArrayData>* out) override;
-
-  /// \brief Vector append
-  ///
-  /// If passed, valid_bytes is of equal length to values, and any zero byte
-  /// will be considered as a null for that slot
-  Status AppendValues(const int32_t* offsets, int64_t length,
-                      const uint8_t* valid_bytes = NULLPTR);
-
-  /// \brief Start a new variable-length list slot
-  ///
-  /// This function should be called before beginning to append elements to the
-  /// value builder
-  Status Append(bool is_valid = true);
-
-  Status AppendNull() { return Append(false); }
-
-  ArrayBuilder* value_builder() const;
-
- protected:
-  TypedBufferBuilder<int32_t> offsets_builder_;
-  std::shared_ptr<ArrayBuilder> value_builder_;
-  std::shared_ptr<Array> values_;
-
-  Status AppendNextOffset();
-};
-
-// ----------------------------------------------------------------------
-// Binary and String
-
-/// \class BinaryBuilder
-/// \brief Builder class for variable-length binary data
-class ARROW_EXPORT BinaryBuilder : public ArrayBuilder {
- public:
-  explicit BinaryBuilder(MemoryPool* pool ARROW_MEMORY_POOL_DEFAULT);
-
-  BinaryBuilder(const std::shared_ptr<DataType>& type, MemoryPool* pool);
-
-  Status Append(const uint8_t* value, int32_t length);
-
-  Status Append(const char* value, int32_t length) {
-    return Append(reinterpret_cast<const uint8_t*>(value), length);
-  }
-
-  Status Append(util::string_view value) {
-    return Append(value.data(), static_cast<int32_t>(value.size()));
-  }
-
-  Status AppendNull();
-
-  /// \brief Append without checking capacity
-  ///
-  /// Offsets and data should have been presized using Reserve() and
-  /// ReserveData(), respectively.
-  void UnsafeAppend(const uint8_t* value, int32_t length) {
-    UnsafeAppendNextOffset();
-    value_data_builder_.UnsafeAppend(value, length);
-    UnsafeAppendToBitmap(true);
-  }
-
-  void UnsafeAppend(const char* value, int32_t length) {
-    UnsafeAppend(reinterpret_cast<const uint8_t*>(value), length);
-  }
-
-  void UnsafeAppend(const std::string& value) {
-    UnsafeAppend(value.c_str(), static_cast<int32_t>(value.size()));
-  }
-
-  void UnsafeAppendNull() {
-    const int64_t num_bytes = value_data_builder_.length();
-    offsets_builder_.UnsafeAppend(static_cast<int32_t>(num_bytes));
-    UnsafeAppendToBitmap(false);
-  }
-
-  void Reset() override;
-  Status Resize(int64_t capacity) override;
-
-  /// \brief Ensures there is enough allocated capacity to append the indicated
-  /// number of bytes to the value data buffer without additional allocations
-  Status ReserveData(int64_t elements);
-
-  Status FinishInternal(std::shared_ptr<ArrayData>* out) override;
-
-  /// \return size of values buffer so far
-  int64_t value_data_length() const { return value_data_builder_.length(); }
-  /// \return capacity of values buffer
-  int64_t value_data_capacity() const { return value_data_builder_.capacity(); }
-
-  /// Temporary access to a value.
-  ///
-  /// This pointer becomes invalid on the next modifying operation.
-  const uint8_t* GetValue(int64_t i, int32_t* out_length) const;
-
-  /// Temporary access to a value.
-  ///
-  /// This view becomes invalid on the next modifying operation.
-  util::string_view GetView(int64_t i) const;
-
- protected:
-  TypedBufferBuilder<int32_t> offsets_builder_;
-  TypedBufferBuilder<uint8_t> value_data_builder_;
-
-  Status AppendNextOffset();
-
-  void UnsafeAppendNextOffset() {
-    const int64_t num_bytes = value_data_builder_.length();
-    offsets_builder_.UnsafeAppend(static_cast<int32_t>(num_bytes));
-  }
-};
-
-/// \class StringBuilder
-/// \brief Builder class for UTF8 strings
-class ARROW_EXPORT StringBuilder : public BinaryBuilder {
- public:
-  using BinaryBuilder::BinaryBuilder;
-  explicit StringBuilder(MemoryPool* pool ARROW_MEMORY_POOL_DEFAULT);
-
-  using BinaryBuilder::Append;
-  using BinaryBuilder::Reset;
-  using BinaryBuilder::UnsafeAppend;
-
-  /// \brief Append a sequence of strings in one shot.
-  ///
-  /// \param[in] values a vector of strings
-  /// \param[in] valid_bytes an optional sequence of bytes where non-zero
-  /// indicates a valid (non-null) value
-  /// \return Status
-  Status AppendValues(const std::vector<std::string>& values,
-                      const uint8_t* valid_bytes = NULLPTR);
-
-  /// \brief Append a sequence of nul-terminated strings in one shot.
-  ///        If one of the values is NULL, it is processed as a null
-  ///        value even if the corresponding valid_bytes entry is 1.
-  ///
-  /// \param[in] values a contiguous C array of nul-terminated char *
-  /// \param[in] length the number of values to append
-  /// \param[in] valid_bytes an optional sequence of bytes where non-zero
-  /// indicates a valid (non-null) value
-  /// \return Status
-  Status AppendValues(const char** values, int64_t length,
-                      const uint8_t* valid_bytes = NULLPTR);
-};
-
-// ----------------------------------------------------------------------
-// FixedSizeBinaryBuilder
-
-class ARROW_EXPORT FixedSizeBinaryBuilder : public ArrayBuilder {
- public:
-  FixedSizeBinaryBuilder(const std::shared_ptr<DataType>& type,
-                         MemoryPool* pool ARROW_MEMORY_POOL_DEFAULT);
-
-  Status Append(const uint8_t* value) {
-    ARROW_RETURN_NOT_OK(Reserve(1));
-    UnsafeAppendToBitmap(true);
-    return byte_builder_.Append(value, byte_width_);
-  }
-
-  Status Append(const char* value) {
-    return Append(reinterpret_cast<const uint8_t*>(value));
-  }
-
-  Status Append(const util::string_view& view) {
-#ifndef NDEBUG
-    CheckValueSize(static_cast<int64_t>(view.size()));
-#endif
-    return Append(reinterpret_cast<const uint8_t*>(view.data()));
-  }
-
-  Status Append(const std::string& s) {
-#ifndef NDEBUG
-    CheckValueSize(static_cast<int64_t>(s.size()));
-#endif
-    return Append(reinterpret_cast<const uint8_t*>(s.data()));
-  }
-
-  template <size_t NBYTES>
-  Status Append(const std::array<uint8_t, NBYTES>& value) {
-    ARROW_RETURN_NOT_OK(Reserve(1));
-    UnsafeAppendToBitmap(true);
-    return byte_builder_.Append(value);
-  }
-
-  Status AppendValues(const uint8_t* data, int64_t length,
-                      const uint8_t* valid_bytes = NULLPTR);
-  Status AppendNull();
-
-  void Reset() override;
-  Status Resize(int64_t capacity) override;
-  Status FinishInternal(std::shared_ptr<ArrayData>* out) override;
-
-  /// \return size of values buffer so far
-  int64_t value_data_length() const { return byte_builder_.length(); }
-
-  int32_t byte_width() const { return byte_width_; }
-
-  /// Temporary access to a value.
-  ///
-  /// This pointer becomes invalid on the next modifying operation.
-  const uint8_t* GetValue(int64_t i) const;
-
-  /// Temporary access to a value.
-  ///
-  /// This view becomes invalid on the next modifying operation.
-  util::string_view GetView(int64_t i) const;
-
- protected:
-  int32_t byte_width_;
-  BufferBuilder byte_builder_;
-
-#ifndef NDEBUG
-  void CheckValueSize(int64_t size);
-#endif
-};
-
-class ARROW_EXPORT Decimal128Builder : public FixedSizeBinaryBuilder {
- public:
-  explicit Decimal128Builder(const std::shared_ptr<DataType>& type,
-                             MemoryPool* pool ARROW_MEMORY_POOL_DEFAULT);
-
-  using FixedSizeBinaryBuilder::Append;
-  using FixedSizeBinaryBuilder::AppendValues;
-  using FixedSizeBinaryBuilder::Reset;
-
-  Status Append(const Decimal128& val);
-
-  Status FinishInternal(std::shared_ptr<ArrayData>* out) override;
-};
-
-using DecimalBuilder = Decimal128Builder;
-
-// ----------------------------------------------------------------------
-// Struct
-
-// ---------------------------------------------------------------------------------
-// StructArray builder
-/// Append, Resize and Reserve methods are acting on StructBuilder.
-/// Please make sure all these methods of all child-builders' are consistently
-/// called to maintain data-structure consistency.
-class ARROW_EXPORT StructBuilder : public ArrayBuilder {
- public:
-  StructBuilder(const std::shared_ptr<DataType>& type, MemoryPool* pool,
-                std::vector<std::shared_ptr<ArrayBuilder>>&& field_builders);
-
-  Status FinishInternal(std::shared_ptr<ArrayData>* out) override;
-
-  /// Null bitmap is of equal length to every child field, and any zero byte
-  /// will be considered as a null for that field, but users must using app-
-  /// end methods or advance methods of the child builders' independently to
-  /// insert data.
-  Status AppendValues(int64_t length, const uint8_t* valid_bytes) {
-    ARROW_RETURN_NOT_OK(Reserve(length));
-    UnsafeAppendToBitmap(valid_bytes, length);
-    return Status::OK();
-  }
-
-  /// Append an element to the Struct. All child-builders' Append method must
-  /// be called independently to maintain data-structure consistency.
-  Status Append(bool is_valid = true) {
-    ARROW_RETURN_NOT_OK(Reserve(1));
-    UnsafeAppendToBitmap(is_valid);
-    return Status::OK();
-  }
-
-  Status AppendNull() { return Append(false); }
-
-  void Reset() override;
-
-  ArrayBuilder* field_builder(int i) const { return field_builders_[i].get(); }
-
-  int num_fields() const { return static_cast<int>(field_builders_.size()); }
-
- protected:
-  std::vector<std::shared_ptr<ArrayBuilder>> field_builders_;
-};
-
-// ----------------------------------------------------------------------
-// Dictionary builder
-
-namespace internal {
-
-template <typename T>
-struct DictionaryScalar {
-  using type = typename T::c_type;
-};
-
-template <>
-struct DictionaryScalar<BinaryType> {
-  using type = util::string_view;
-};
-
-template <>
-struct DictionaryScalar<StringType> {
-  using type = util::string_view;
-};
-
-template <>
-struct DictionaryScalar<FixedSizeBinaryType> {
-  using type = util::string_view;
-};
-
-}  // namespace internal
-
-/// \brief Array builder for created encoded DictionaryArray from dense array
-///
-/// Unlike other builders, dictionary builder does not completely reset the state
-/// on Finish calls. The arrays built after the initial Finish call will reuse
-/// the previously created encoding and build a delta dictionary when new terms
-/// occur.
-///
-/// data
-template <typename T>
-class ARROW_EXPORT DictionaryBuilder : public ArrayBuilder {
- public:
-  using Scalar = typename internal::DictionaryScalar<T>::type;
-
-  // WARNING: the type given below is the value type, not the DictionaryType.
-  // The DictionaryType is instantiated on the Finish() call.
-  DictionaryBuilder(const std::shared_ptr<DataType>& type, MemoryPool* pool);
-
-  template <typename T1 = T>
-  explicit DictionaryBuilder(
-      typename std::enable_if<TypeTraits<T1>::is_parameter_free, MemoryPool*>::type pool)
-      : DictionaryBuilder<T1>(TypeTraits<T1>::type_singleton(), pool) {}
-
-  ~DictionaryBuilder() override;
-
-  /// \brief Append a scalar value
-  Status Append(const Scalar& value);
-
-  /// \brief Append a fixed-width string (only for FixedSizeBinaryType)
-  template <typename T1 = T>
-  Status Append(typename std::enable_if<std::is_base_of<FixedSizeBinaryType, T1>::value,
-                                        const uint8_t*>::type value) {
-    return Append(util::string_view(reinterpret_cast<const char*>(value), byte_width_));
-  }
-
-  /// \brief Append a fixed-width string (only for FixedSizeBinaryType)
-  template <typename T1 = T>
-  Status Append(typename std::enable_if<std::is_base_of<FixedSizeBinaryType, T1>::value,
-                                        const char*>::type value) {
-    return Append(util::string_view(value, byte_width_));
-  }
-
-  /// \brief Append a scalar null value
-  Status AppendNull();
-
-  /// \brief Append a whole dense array to the builder
-  Status AppendArray(const Array& array);
-
-  void Reset() override;
-  Status Resize(int64_t capacity) override;
-  Status FinishInternal(std::shared_ptr<ArrayData>* out) override;
-
-  /// is the dictionary builder in the delta building mode
-  bool is_building_delta() { return delta_offset_ > 0; }
-
- protected:
-  class MemoTableImpl;
-  std::unique_ptr<MemoTableImpl> memo_table_;
-
-  int32_t delta_offset_;
-  // Only used for FixedSizeBinaryType
-  int32_t byte_width_;
-
-  AdaptiveIntBuilder values_builder_;
-};
-
-template <>
-class ARROW_EXPORT DictionaryBuilder<NullType> : public ArrayBuilder {
- public:
-  DictionaryBuilder(const std::shared_ptr<DataType>& type, MemoryPool* pool);
-  explicit DictionaryBuilder(MemoryPool* pool);
-
-  /// \brief Append a scalar null value
-  Status AppendNull();
-
-  /// \brief Append a whole dense array to the builder
-  Status AppendArray(const Array& array);
-
-  Status Resize(int64_t capacity) override;
-  Status FinishInternal(std::shared_ptr<ArrayData>* out) override;
-
- protected:
-  AdaptiveIntBuilder values_builder_;
-};
-
-class ARROW_EXPORT BinaryDictionaryBuilder : public DictionaryBuilder<BinaryType> {
- public:
-  using DictionaryBuilder::Append;
-  using DictionaryBuilder::DictionaryBuilder;
-
-  Status Append(const uint8_t* value, int32_t length) {
-    return Append(reinterpret_cast<const char*>(value), length);
-  }
-
-  Status Append(const char* value, int32_t length) {
-    return Append(util::string_view(value, length));
-  }
-};
-
-/// \brief Dictionary array builder with convenience methods for strings
-class ARROW_EXPORT StringDictionaryBuilder : public DictionaryBuilder<StringType> {
- public:
-  using DictionaryBuilder::Append;
-  using DictionaryBuilder::DictionaryBuilder;
-
-  Status Append(const uint8_t* value, int32_t length) {
-    return Append(reinterpret_cast<const char*>(value), length);
-  }
-
-  Status Append(const char* value, int32_t length) {
-    return Append(util::string_view(value, length));
-  }
-};
-
-// ----------------------------------------------------------------------
-// Helper functions
+class DataType;
+class MemoryPool;
 
 ARROW_EXPORT
 Status MakeBuilder(MemoryPool* pool, const std::shared_ptr<DataType>& type,
                    std::unique_ptr<ArrayBuilder>* out);
 
 }  // namespace arrow
-
-#endif  // ARROW_BUILDER_H_
diff --git a/cpp/src/arrow/compute/compute-test.cc b/cpp/src/arrow/compute/compute-test.cc
index 52fc588..e34a086 100644
--- a/cpp/src/arrow/compute/compute-test.cc
+++ b/cpp/src/arrow/compute/compute-test.cc
@@ -71,6 +71,27 @@ shared_ptr<Array> _MakeArray(const shared_ptr<DataType>& type, const vector<T>&
 }
 
 // ----------------------------------------------------------------------
+// Datum
+
+template <typename T>
+void CheckImplicitConstructor(enum Datum::type expected_kind) {
+  std::shared_ptr<T> value;
+  Datum datum = value;
+  ASSERT_EQ(expected_kind, datum.kind());
+}
+
+TEST(TestDatum, ImplicitConstructors) {
+  CheckImplicitConstructor<Array>(Datum::ARRAY);
+
+  // Instantiate from array subclass
+  CheckImplicitConstructor<BinaryArray>(Datum::ARRAY);
+
+  CheckImplicitConstructor<ChunkedArray>(Datum::CHUNKED_ARRAY);
+  CheckImplicitConstructor<RecordBatch>(Datum::RECORD_BATCH);
+  CheckImplicitConstructor<Table>(Datum::TABLE);
+}
+
+// ----------------------------------------------------------------------
 // Cast
 
 static void AssertBufferSame(const Array& left, const Array& right, int buffer_index) {
@@ -781,7 +802,7 @@ TEST_F(TestCast, ChunkedArray) {
   CastOptions options;
 
   Datum out;
-  ASSERT_OK(Cast(&this->ctx_, Datum(carr), out_type, options, &out));
+  ASSERT_OK(Cast(&this->ctx_, carr, out_type, options, &out));
   ASSERT_EQ(Datum::CHUNKED_ARRAY, out.kind());
 
   auto out_carr = out.chunked_array();
@@ -869,7 +890,7 @@ TEST_F(TestCast, PreallocatedMemory) {
   out_data->buffers.push_back(out_values);
 
   Datum out(out_data);
-  ASSERT_OK(kernel->Call(&this->ctx_, Datum(arr), &out));
+  ASSERT_OK(kernel->Call(&this->ctx_, arr, &out));
 
   // Buffer address unchanged
   ASSERT_EQ(out_values.get(), out_data->buffers[1].get());
@@ -912,8 +933,8 @@ void CheckOffsetOutputCase(FunctionContext* ctx, const std::shared_ptr<DataType>
   Datum out_second(out_second_data);
 
   // Cast each bit
-  ASSERT_OK(kernel->Call(ctx, Datum(arr->Slice(0, first_half)), &out_first));
-  ASSERT_OK(kernel->Call(ctx, Datum(arr->Slice(first_half)), &out_second));
+  ASSERT_OK(kernel->Call(ctx, arr->Slice(0, first_half), &out_first));
+  ASSERT_OK(kernel->Call(ctx, arr->Slice(first_half), &out_second));
 
   shared_ptr<Array> result = MakeArray(out_data);
 
@@ -1105,7 +1126,7 @@ TYPED_TEST(TestDictionaryCast, Basic) {
       TestBase::MakeRandomArray<typename TypeTraits<TypeParam>::ArrayType>(10, 2);
 
   Datum out;
-  ASSERT_OK(DictionaryEncode(&this->ctx_, Datum(plain_array->data()), &out));
+  ASSERT_OK(DictionaryEncode(&this->ctx_, plain_array->data(), &out));
 
   this->CheckPass(*MakeArray(out.array()), *plain_array, plain_array->type(), options);
 }
@@ -1201,7 +1222,7 @@ void CheckUnique(FunctionContext* ctx, const shared_ptr<DataType>& type,
   shared_ptr<Array> expected = _MakeArray<Type, T>(type, out_values, out_is_valid);
 
   shared_ptr<Array> result;
-  ASSERT_OK(Unique(ctx, Datum(input), &result));
+  ASSERT_OK(Unique(ctx, input, &result));
   ASSERT_ARRAYS_EQUAL(*expected, *result);
 }
 
@@ -1218,7 +1239,7 @@ void CheckDictEncode(FunctionContext* ctx, const shared_ptr<DataType>& type,
   DictionaryArray expected(dictionary(int32(), ex_dict), ex_indices);
 
   Datum datum_out;
-  ASSERT_OK(DictionaryEncode(ctx, Datum(input), &datum_out));
+  ASSERT_OK(DictionaryEncode(ctx, input, &datum_out));
   shared_ptr<Array> result = MakeArray(datum_out.array());
 
   ASSERT_ARRAYS_EQUAL(expected, *result);
@@ -1461,7 +1482,7 @@ TEST_F(TestHashKernel, ChunkedArrayInvoke) {
 
   // Unique
   shared_ptr<Array> result;
-  ASSERT_OK(Unique(&this->ctx_, Datum(carr), &result));
+  ASSERT_OK(Unique(&this->ctx_, carr, &result));
   ASSERT_ARRAYS_EQUAL(*ex_dict, *result);
 
   // Dictionary encode
@@ -1475,7 +1496,7 @@ TEST_F(TestHashKernel, ChunkedArrayInvoke) {
   auto dict_carr = std::make_shared<ChunkedArray>(dict_arrays);
 
   Datum encoded_out;
-  ASSERT_OK(DictionaryEncode(&this->ctx_, Datum(carr), &encoded_out));
+  ASSERT_OK(DictionaryEncode(&this->ctx_, carr, &encoded_out));
   ASSERT_EQ(Datum::CHUNKED_ARRAY, encoded_out.kind());
 
   AssertChunkedEqual(*dict_carr, *encoded_out.chunked_array());
@@ -1490,7 +1511,7 @@ class TestBooleanKernel : public ComputeFixture, public TestBase {
                        const std::shared_ptr<Array>& right,
                        const std::shared_ptr<Array>& expected) {
     Datum result;
-    ASSERT_OK(kernel(&this->ctx_, Datum(left), Datum(right), &result));
+    ASSERT_OK(kernel(&this->ctx_, left, right, &result));
     ASSERT_EQ(Datum::ARRAY, result.kind());
     std::shared_ptr<Array> result_array = result.make_array();
     ASSERT_TRUE(result_array->Equals(expected));
@@ -1502,7 +1523,7 @@ class TestBooleanKernel : public ComputeFixture, public TestBase {
                               const std::shared_ptr<ChunkedArray>& expected) {
     Datum result;
     std::shared_ptr<Array> result_array;
-    ASSERT_OK(kernel(&this->ctx_, Datum(left), Datum(right), &result));
+    ASSERT_OK(kernel(&this->ctx_, left, right, &result));
     ASSERT_EQ(Datum::CHUNKED_ARRAY, result.kind());
     std::shared_ptr<ChunkedArray> result_ca = result.chunked_array();
     ASSERT_TRUE(result_ca->Equals(expected));
@@ -1552,13 +1573,13 @@ TEST_F(TestBooleanKernel, Invert) {
 
   // Plain array
   Datum result;
-  ASSERT_OK(Invert(&this->ctx_, Datum(a1), &result));
+  ASSERT_OK(Invert(&this->ctx_, a1, &result));
   ASSERT_EQ(Datum::ARRAY, result.kind());
   std::shared_ptr<Array> result_array = result.make_array();
   ASSERT_TRUE(result_array->Equals(a2));
 
   // Array with offset
-  ASSERT_OK(Invert(&this->ctx_, Datum(a1->Slice(1)), &result));
+  ASSERT_OK(Invert(&this->ctx_, a1->Slice(1), &result));
   ASSERT_EQ(Datum::ARRAY, result.kind());
   result_array = result.make_array();
   ASSERT_TRUE(result_array->Equals(a2->Slice(1)));
@@ -1568,7 +1589,7 @@ TEST_F(TestBooleanKernel, Invert) {
   auto ca1 = std::make_shared<ChunkedArray>(ca1_arrs);
   std::vector<std::shared_ptr<Array>> ca2_arrs = {a2, a2->Slice(1)};
   auto ca2 = std::make_shared<ChunkedArray>(ca2_arrs);
-  ASSERT_OK(Invert(&this->ctx_, Datum(ca1), &result));
+  ASSERT_OK(Invert(&this->ctx_, ca1, &result));
   ASSERT_EQ(Datum::CHUNKED_ARRAY, result.kind());
   std::shared_ptr<ChunkedArray> result_ca = result.chunked_array();
   ASSERT_TRUE(result_ca->Equals(ca2));
@@ -1618,14 +1639,14 @@ TEST_F(TestInvokeBinaryKernel, Exceptions) {
   auto a2 = _MakeArray<BooleanType, bool>(type, values2, {});
 
   // Left is not an array-like
-  ASSERT_RAISES(Invalid, detail::InvokeBinaryArrayKernel(
-                             &this->ctx_, &kernel, Datum(table), Datum(a2), &outputs));
+  ASSERT_RAISES(Invalid, detail::InvokeBinaryArrayKernel(&this->ctx_, &kernel, table, a2,
+                                                         &outputs));
   // Right is not an array-like
-  ASSERT_RAISES(Invalid, detail::InvokeBinaryArrayKernel(&this->ctx_, &kernel, Datum(a1),
-                                                         Datum(table), &outputs));
+  ASSERT_RAISES(Invalid, detail::InvokeBinaryArrayKernel(&this->ctx_, &kernel, a1, table,
+                                                         &outputs));
   // Different sized inputs
-  ASSERT_RAISES(Invalid, detail::InvokeBinaryArrayKernel(&this->ctx_, &kernel, Datum(a1),
-                                                         Datum(a1->Slice(1)), &outputs));
+  ASSERT_RAISES(Invalid, detail::InvokeBinaryArrayKernel(&this->ctx_, &kernel, a1,
+                                                         a1->Slice(1), &outputs));
 }
 
 }  // namespace compute
diff --git a/cpp/src/arrow/compute/kernel.h b/cpp/src/arrow/compute/kernel.h
index bef2b9a..87080b1 100644
--- a/cpp/src/arrow/compute/kernel.h
+++ b/cpp/src/arrow/compute/kernel.h
@@ -61,19 +61,28 @@ struct ARROW_EXPORT Datum {
   /// \brief Empty datum, to be populated elsewhere
   Datum() : value(NULLPTR) {}
 
-  explicit Datum(const std::shared_ptr<Scalar>& value) : value(value) {}
-
-  explicit Datum(const std::shared_ptr<ArrayData>& value) : value(value) {}
-
-  explicit Datum(const std::shared_ptr<Array>& value) : Datum(value->data()) {}
-
-  explicit Datum(const std::shared_ptr<ChunkedArray>& value) : value(value) {}
-
-  explicit Datum(const std::shared_ptr<RecordBatch>& value) : value(value) {}
-
-  explicit Datum(const std::shared_ptr<Table>& value) : value(value) {}
-
-  explicit Datum(const std::vector<Datum>& value) : value(value) {}
+  Datum(const std::shared_ptr<Scalar>& value)  // NOLINT implicit conversion
+      : value(value) {}
+  Datum(const std::shared_ptr<ArrayData>& value)  // NOLINT implicit conversion
+      : value(value) {}
+
+  Datum(const std::shared_ptr<Array>& value)  // NOLINT implicit conversion
+      : Datum(value ? value->data() : NULLPTR) {}
+
+  Datum(const std::shared_ptr<ChunkedArray>& value)  // NOLINT implicit conversion
+      : value(value) {}
+  Datum(const std::shared_ptr<RecordBatch>& value)  // NOLINT implicit conversion
+      : value(value) {}
+  Datum(const std::shared_ptr<Table>& value)  // NOLINT implicit conversion
+      : value(value) {}
+  Datum(const std::vector<Datum>& value)  // NOLINT implicit conversion
+      : value(value) {}
+
+  // Cast from subtypes of Array to Datum
+  template <typename T,
+            typename = typename std::enable_if<std::is_base_of<Array, T>::value>::type>
+  Datum(const std::shared_ptr<T>& value)  // NOLINT implicit conversion
+      : Datum(std::shared_ptr<Array>(value)) {}
 
   ~Datum() {}
 
diff --git a/cpp/src/arrow/csv/column-builder.h b/cpp/src/arrow/csv/column-builder.h
index b21cff7..054a642 100644
--- a/cpp/src/arrow/csv/column-builder.h
+++ b/cpp/src/arrow/csv/column-builder.h
@@ -18,22 +18,29 @@
 #ifndef ARROW_CSV_COLUMN_BUILDER_H
 #define ARROW_CSV_COLUMN_BUILDER_H
 
+#include <cstdint>
 #include <memory>
-#include <vector>
 
 #include "arrow/array.h"
-#include "arrow/csv/converter.h"
-#include "arrow/csv/options.h"
-#include "arrow/memory_pool.h"
 #include "arrow/status.h"
-#include "arrow/table.h"
-#include "arrow/type.h"
-#include "arrow/util/task-group.h"
 #include "arrow/util/visibility.h"
 
 namespace arrow {
+
+class ChunkedArray;
+class DataType;
+
+namespace internal {
+
+class TaskGroup;
+
+}  // namespace internal
+
 namespace csv {
 
+class BlockParser;
+struct ConvertOptions;
+
 class ARROW_EXPORT ColumnBuilder {
  public:
   virtual ~ColumnBuilder() = default;
diff --git a/cpp/src/arrow/csv/converter.cc b/cpp/src/arrow/csv/converter.cc
index 7d8bff8..8a249a6 100644
--- a/cpp/src/arrow/csv/converter.cc
+++ b/cpp/src/arrow/csv/converter.cc
@@ -20,6 +20,7 @@
 #include <cstring>
 #include <sstream>
 #include <string>
+#include <type_traits>
 
 #include "arrow/builder.h"
 #include "arrow/csv/parser.h"
diff --git a/cpp/src/arrow/csv/parser.h b/cpp/src/arrow/csv/parser.h
index 8a51574..fdddc37 100644
--- a/cpp/src/arrow/csv/parser.h
+++ b/cpp/src/arrow/csv/parser.h
@@ -18,6 +18,7 @@
 #ifndef ARROW_CSV_PARSER_H
 #define ARROW_CSV_PARSER_H
 
+#include <cstddef>
 #include <cstdint>
 #include <memory>
 #include <vector>
diff --git a/cpp/src/arrow/csv/reader.cc b/cpp/src/arrow/csv/reader.cc
index 8cf74d6..b2a6b7b 100644
--- a/cpp/src/arrow/csv/reader.cc
+++ b/cpp/src/arrow/csv/reader.cc
@@ -23,6 +23,8 @@
 #include <memory>
 #include <sstream>
 #include <string>
+#include <unordered_map>
+#include <utility>
 #include <vector>
 
 #include "arrow/buffer.h"
diff --git a/cpp/src/arrow/io/buffered.cc b/cpp/src/arrow/io/buffered.cc
index 0c04ac2..f3eae39 100644
--- a/cpp/src/arrow/io/buffered.cc
+++ b/cpp/src/arrow/io/buffered.cc
@@ -21,10 +21,10 @@
 #include <cstring>
 #include <memory>
 #include <mutex>
-#include <string>
 #include <utility>
 
 #include "arrow/buffer.h"
+#include "arrow/memory_pool.h"
 #include "arrow/status.h"
 #include "arrow/util/logging.h"
 #include "arrow/util/string_view.h"
diff --git a/cpp/src/arrow/io/buffered.h b/cpp/src/arrow/io/buffered.h
index e4374ba..d507955 100644
--- a/cpp/src/arrow/io/buffered.h
+++ b/cpp/src/arrow/io/buffered.h
@@ -29,6 +29,7 @@
 
 namespace arrow {
 
+class Buffer;
 class MemoryPool;
 class Status;
 
diff --git a/cpp/src/arrow/ipc/feather-test.cc b/cpp/src/arrow/ipc/feather-test.cc
index b0be289..8139c47 100644
--- a/cpp/src/arrow/ipc/feather-test.cc
+++ b/cpp/src/arrow/ipc/feather-test.cc
@@ -30,6 +30,7 @@
 #include "arrow/pretty_print.h"
 #include "arrow/record_batch.h"
 #include "arrow/status.h"
+#include "arrow/table.h"
 #include "arrow/test-util.h"
 #include "arrow/type.h"
 #include "arrow/util/checked_cast.h"
diff --git a/cpp/src/arrow/ipc/json-simple-test.cc b/cpp/src/arrow/ipc/json-simple-test.cc
index 4552521..84a2210 100644
--- a/cpp/src/arrow/ipc/json-simple-test.cc
+++ b/cpp/src/arrow/ipc/json-simple-test.cc
@@ -34,6 +34,7 @@
 #include "arrow/type.h"
 #include "arrow/type_traits.h"
 #include "arrow/util/checked_cast.h"
+#include "arrow/util/decimal.h"
 
 #if defined(_MSC_VER)
 // "warning C4307: '+': integral constant overflow"
diff --git a/cpp/src/arrow/memory_pool-test.h b/cpp/src/arrow/memory_pool-test.h
index 34523a1..fc86d94 100644
--- a/cpp/src/arrow/memory_pool-test.h
+++ b/cpp/src/arrow/memory_pool-test.h
@@ -16,6 +16,7 @@
 // under the License.
 
 #include <algorithm>
+#include <cstddef>
 #include <cstdint>
 #include <limits>
 
diff --git a/cpp/src/arrow/memory_pool.cc b/cpp/src/arrow/memory_pool.cc
index 0a27141..d62db32 100644
--- a/cpp/src/arrow/memory_pool.cc
+++ b/cpp/src/arrow/memory_pool.cc
@@ -17,18 +17,16 @@
 
 #include "arrow/memory_pool.h"
 
-#include <algorithm>
-#include <atomic>
-#include <cerrno>
-#include <cstdlib>
-#include <cstring>
-#include <iostream>
+#include <algorithm>  // IWYU pragma: keep
+#include <cstdlib>    // IWYU pragma: keep
+#include <cstring>    // IWYU pragma: keep
+#include <iostream>   // IWYU pragma: keep
 #include <limits>
 #include <memory>
 #include <sstream>  // IWYU pragma: keep
 
 #include "arrow/status.h"
-#include "arrow/util/logging.h"
+#include "arrow/util/logging.h"  // IWYU pragma: keep
 
 #ifdef ARROW_JEMALLOC
 // Needed to support jemalloc 3 and 4
diff --git a/cpp/src/arrow/pretty_print-test.cc b/cpp/src/arrow/pretty_print-test.cc
index 8434e59..a1acfb8 100644
--- a/cpp/src/arrow/pretty_print-test.cc
+++ b/cpp/src/arrow/pretty_print-test.cc
@@ -26,12 +26,10 @@
 
 #include "arrow/array.h"
 #include "arrow/builder.h"
-#include "arrow/memory_pool.h"
 #include "arrow/pretty_print.h"
 #include "arrow/table.h"
 #include "arrow/test-util.h"
 #include "arrow/type.h"
-#include "arrow/util/decimal.h"
 
 namespace arrow {
 
@@ -342,7 +340,7 @@ TEST_F(TestPrettyPrint, DictionaryType) {
 
 TEST_F(TestPrettyPrint, ChunkedArrayPrimitiveType) {
   auto array = ArrayFromJSON(int32(), "[0, 1, null, 3, null]");
-  ChunkedArray chunked_array({array});
+  ChunkedArray chunked_array(array);
 
   static const char* expected = R"expected([
   [
diff --git a/cpp/src/arrow/pretty_print.cc b/cpp/src/arrow/pretty_print.cc
index ec23bfb..c524039 100644
--- a/cpp/src/arrow/pretty_print.cc
+++ b/cpp/src/arrow/pretty_print.cc
@@ -19,7 +19,7 @@
 #include <cstdint>
 #include <iostream>
 #include <memory>
-#include <sstream>
+#include <sstream>  // IWYU pragma: keep
 #include <string>
 #include <type_traits>
 #include <vector>
diff --git a/cpp/src/arrow/pretty_print.h b/cpp/src/arrow/pretty_print.h
index fde6c29..ca50bc0 100644
--- a/cpp/src/arrow/pretty_print.h
+++ b/cpp/src/arrow/pretty_print.h
@@ -21,14 +21,17 @@
 #include <ostream>
 #include <string>
 
-#include "arrow/type_fwd.h"
 #include "arrow/util/visibility.h"
 
 namespace arrow {
 
 class Array;
+class Column;
 class ChunkedArray;
+class RecordBatch;
+class Schema;
 class Status;
+class Table;
 
 struct PrettyPrintOptions {
   PrettyPrintOptions(int indent_arg, int window_arg = 10, int indent_size_arg = 2,
diff --git a/cpp/src/arrow/python/numpy_to_arrow.cc b/cpp/src/arrow/python/numpy_to_arrow.cc
index f9a5ea1..da288d3 100644
--- a/cpp/src/arrow/python/numpy_to_arrow.cc
+++ b/cpp/src/arrow/python/numpy_to_arrow.cc
@@ -25,6 +25,7 @@
 #include <algorithm>
 #include <cmath>
 #include <cstdint>
+#include <cstring>
 #include <limits>
 #include <memory>
 #include <sstream>
@@ -539,33 +540,27 @@ Status NumPyConverter::Visit(const BinaryType& type) {
 
   auto data = reinterpret_cast<const uint8_t*>(PyArray_DATA(arr_));
 
-  int item_length = 0;
+  auto AppendNotNull = [&builder, this](const uint8_t* data) {
+    // This is annoying. NumPy allows strings to have nul-terminators, so
+    // we must check for them here
+    const size_t item_size =
+        strnlen(reinterpret_cast<const char*>(data), static_cast<size_t>(itemsize_));
+    return builder.Append(data, static_cast<int32_t>(item_size));
+  };
+
   if (mask_ != nullptr) {
     Ndarray1DIndexer<uint8_t> mask_values(mask_);
     for (int64_t i = 0; i < length_; ++i) {
       if (mask_values[i]) {
         RETURN_NOT_OK(builder.AppendNull());
       } else {
-        // This is annoying. NumPy allows strings to have nul-terminators, so
-        // we must check for them here
-        for (item_length = 0; item_length < itemsize_; ++item_length) {
-          if (data[item_length] == 0) {
-            break;
-          }
-        }
-        RETURN_NOT_OK(builder.Append(data, item_length));
+        RETURN_NOT_OK(AppendNotNull(data));
       }
       data += stride_;
     }
   } else {
     for (int64_t i = 0; i < length_; ++i) {
-      for (item_length = 0; item_length < itemsize_; ++item_length) {
-        // Look for nul-terminator
-        if (data[item_length] == 0) {
-          break;
-        }
-      }
-      RETURN_NOT_OK(builder.Append(data, item_length));
+      RETURN_NOT_OK(AppendNotNull(data));
       data += stride_;
     }
   }
diff --git a/cpp/src/arrow/python/python-test.cc b/cpp/src/arrow/python/python-test.cc
index 2d15ce4..7443c54 100644
--- a/cpp/src/arrow/python/python-test.cc
+++ b/cpp/src/arrow/python/python-test.cc
@@ -25,6 +25,7 @@
 #include "arrow/builder.h"
 #include "arrow/table.h"
 #include "arrow/test-util.h"
+#include "arrow/util/decimal.h"
 
 #include "arrow/python/arrow_to_pandas.h"
 #include "arrow/python/decimal.h"
diff --git a/cpp/src/arrow/record_batch.h b/cpp/src/arrow/record_batch.h
index 674b68b..ceb6885 100644
--- a/cpp/src/arrow/record_batch.h
+++ b/cpp/src/arrow/record_batch.h
@@ -32,6 +32,7 @@ namespace arrow {
 class Array;
 struct ArrayData;
 class Status;
+class Table;
 
 /// \class RecordBatch
 /// \brief Collection of equal-length arrays matching a particular Schema
diff --git a/cpp/src/arrow/table.h b/cpp/src/arrow/table.h
index 6b57332..2ac34b4 100644
--- a/cpp/src/arrow/table.h
+++ b/cpp/src/arrow/table.h
@@ -44,6 +44,11 @@ class ARROW_EXPORT ChunkedArray {
   /// The vector should be non-empty and all its elements should have the same
   /// data type.
   explicit ChunkedArray(const ArrayVector& chunks);
+
+  /// \brief Construct a chunked array from a single Array
+  explicit ChunkedArray(const std::shared_ptr<Array>& chunk)
+      : ChunkedArray(ArrayVector({chunk})) {}
+
   /// \brief Construct a chunked array from a vector of arrays and a data type
   ///
   /// As the data type is passed explicitly, the vector may be empty.
diff --git a/cpp/src/arrow/tensor.cc b/cpp/src/arrow/tensor.cc
index 589ee99..792945b 100644
--- a/cpp/src/arrow/tensor.cc
+++ b/cpp/src/arrow/tensor.cc
@@ -17,6 +17,7 @@
 
 #include "arrow/tensor.h"
 
+#include <cstddef>
 #include <cstdint>
 #include <functional>
 #include <memory>
diff --git a/cpp/src/arrow/test-util.cc b/cpp/src/arrow/test-util.cc
index 38e07dd..8c5f364 100644
--- a/cpp/src/arrow/test-util.cc
+++ b/cpp/src/arrow/test-util.cc
@@ -18,13 +18,12 @@
 #include "arrow/test-util.h"
 
 #ifndef _WIN32
-#include <sys/stat.h>
-#include <sys/wait.h>
-#include <unistd.h>
+#include <sys/stat.h>  // IWYU pragma: keep
+#include <sys/wait.h>  // IWYU pragma: keep
+#include <unistd.h>    // IWYU pragma: keep
 #endif
 
 #include <algorithm>
-#include <chrono>
 #include <cstdint>
 #include <cstdlib>
 #include <iostream>
@@ -33,23 +32,17 @@
 #include <random>
 #include <sstream>
 #include <string>
-#include <thread>
 #include <vector>
 
 #include <gtest/gtest.h>
 
 #include "arrow/array.h"
 #include "arrow/buffer.h"
-#include "arrow/builder.h"
 #include "arrow/ipc/json-simple.h"
-#include "arrow/memory_pool.h"
 #include "arrow/pretty_print.h"
 #include "arrow/status.h"
 #include "arrow/table.h"
 #include "arrow/type.h"
-#include "arrow/type_traits.h"
-#include "arrow/util/bit-util.h"
-#include "arrow/util/decimal.h"
 #include "arrow/util/logging.h"
 
 namespace arrow {
diff --git a/cpp/src/arrow/test-util.h b/cpp/src/arrow/test-util.h
index 7829ac2..7fe7685 100644
--- a/cpp/src/arrow/test-util.h
+++ b/cpp/src/arrow/test-util.h
@@ -17,23 +17,17 @@
 
 #pragma once
 
-#ifndef _WIN32
-#include <sys/stat.h>
-#include <sys/wait.h>
-#include <unistd.h>
-#endif
-
 #include <algorithm>
-#include <chrono>
 #include <cstdint>
 #include <cstdlib>
+#include <cstring>
 #include <iostream>
 #include <limits>
 #include <memory>
 #include <random>
 #include <sstream>
 #include <string>
-#include <thread>
+#include <type_traits>
 #include <vector>
 
 #include <gtest/gtest.h>
@@ -43,13 +37,13 @@
 #include "arrow/builder.h"
 #include "arrow/memory_pool.h"
 #include "arrow/pretty_print.h"
+#include "arrow/record_batch.h"
 #include "arrow/status.h"
-#include "arrow/table.h"
 #include "arrow/type.h"
 #include "arrow/type_traits.h"
 #include "arrow/util/bit-util.h"
-#include "arrow/util/decimal.h"
 #include "arrow/util/logging.h"
+#include "arrow/util/macros.h"
 #include "arrow/util/visibility.h"
 
 #define STRINGIFY(x) #x
@@ -102,6 +96,10 @@
 
 namespace arrow {
 
+class ChunkedArray;
+class Column;
+class Table;
+
 using ArrayVector = std::vector<std::shared_ptr<Array>>;
 
 #define ASSERT_ARRAYS_EQUAL(LEFT, RIGHT)                                               \
diff --git a/cpp/src/arrow/util/compression_lz4.cc b/cpp/src/arrow/util/compression_lz4.cc
index 0acd54d..97fd46a 100644
--- a/cpp/src/arrow/util/compression_lz4.cc
+++ b/cpp/src/arrow/util/compression_lz4.cc
@@ -18,6 +18,7 @@
 #include "arrow/util/compression_lz4.h"
 
 #include <cstdint>
+#include <cstring>
 #include <sstream>
 
 #include <lz4.h>
diff --git a/cpp/src/arrow/util/int-util-test.cc b/cpp/src/arrow/util/int-util-test.cc
index 51fd96e..018eeda 100644
--- a/cpp/src/arrow/util/int-util-test.cc
+++ b/cpp/src/arrow/util/int-util-test.cc
@@ -17,14 +17,12 @@
 
 #include <algorithm>
 #include <cstdint>
-#include <memory>
 #include <random>
 #include <utility>
 #include <vector>
 
 #include <gtest/gtest.h>
 
-#include "arrow/test-util.h"
 #include "arrow/util/int-util.h"
 
 namespace arrow {
diff --git a/cpp/src/arrow/util/string_view.h b/cpp/src/arrow/util/string_view.h
index 2ee594a..0f35483 100644
--- a/cpp/src/arrow/util/string_view.h
+++ b/cpp/src/arrow/util/string_view.h
@@ -18,7 +18,7 @@
 #ifndef ARROW_UTIL_STRING_VIEW_H
 #define ARROW_UTIL_STRING_VIEW_H
 
-#include "arrow/util/string_view/string_view.hpp"
+#include "arrow/util/string_view/string_view.hpp"  // IWYU pragma: export
 
 namespace arrow {
 namespace util {
diff --git a/cpp/src/parquet/arrow/CMakeLists.txt b/cpp/src/parquet/arrow/CMakeLists.txt
index 9372c31..89afc39 100644
--- a/cpp/src/parquet/arrow/CMakeLists.txt
+++ b/cpp/src/parquet/arrow/CMakeLists.txt
@@ -18,8 +18,11 @@
 ADD_PARQUET_TEST(arrow-schema-test)
 ADD_PARQUET_TEST(arrow-reader-writer-test)
 
-ADD_ARROW_BENCHMARK(reader-writer-benchmark
+ADD_BENCHMARK(reader-writer-benchmark
   PREFIX "parquet-arrow"
   EXTRA_LINK_LIBS ${PARQUET_BENCHMARK_LINK_LIBRARIES})
+if (TARGET parquet-arrow-reader-writer-benchmark)
+  add_dependencies(parquet parquet-arrow-reader-writer-benchmark)
+endif()
 
 ARROW_INSTALL_ALL_HEADERS("parquet/arrow")
diff --git a/cpp/src/parquet/arrow/arrow-reader-writer-test.cc b/cpp/src/parquet/arrow/arrow-reader-writer-test.cc
index 24ec0dd..07124eb 100644
--- a/cpp/src/parquet/arrow/arrow-reader-writer-test.cc
+++ b/cpp/src/parquet/arrow/arrow-reader-writer-test.cc
@@ -464,7 +464,11 @@ class TestParquetIO : public ::testing::Test {
     ASSERT_OK_NO_THROW(file_reader->GetColumn(0, &column_reader));
     ASSERT_NE(nullptr, column_reader.get());
 
-    ASSERT_OK(column_reader->NextBatch(SMALL_SIZE, out));
+    std::shared_ptr<ChunkedArray> chunked_out;
+    ASSERT_OK(column_reader->NextBatch(SMALL_SIZE, &chunked_out));
+
+    ASSERT_EQ(1, chunked_out->num_chunks());
+    *out = chunked_out->chunk(0);
     ASSERT_NE(nullptr, out->get());
   }
 
@@ -1745,10 +1749,11 @@ TEST(TestArrowReadWrite, ListLargeRecords) {
 
   std::vector<std::shared_ptr<Array>> pieces;
   for (int i = 0; i < num_rows; ++i) {
-    std::shared_ptr<Array> piece;
-    ASSERT_OK(col_reader->NextBatch(1, &piece));
-    ASSERT_EQ(1, piece->length());
-    pieces.push_back(piece);
+    std::shared_ptr<ChunkedArray> chunked_piece;
+    ASSERT_OK(col_reader->NextBatch(1, &chunked_piece));
+    ASSERT_EQ(1, chunked_piece->length());
+    ASSERT_EQ(1, chunked_piece->num_chunks());
+    pieces.push_back(chunked_piece->chunk(0));
   }
   auto chunked = std::make_shared<::arrow::ChunkedArray>(pieces);
 
diff --git a/cpp/src/parquet/arrow/reader.cc b/cpp/src/parquet/arrow/reader.cc
index 6273fda..2a7730d 100644
--- a/cpp/src/parquet/arrow/reader.cc
+++ b/cpp/src/parquet/arrow/reader.cc
@@ -32,6 +32,9 @@
 #include "arrow/util/logging.h"
 #include "arrow/util/thread-pool.h"
 
+// For arrow::compute::Datum. This should perhaps be promoted. See ARROW-4022
+#include "arrow/compute/kernel.h"
+
 #include "parquet/arrow/record_reader.h"
 #include "parquet/arrow/schema.h"
 #include "parquet/column_reader.h"
@@ -46,6 +49,7 @@
 
 using arrow::Array;
 using arrow::BooleanArray;
+using arrow::ChunkedArray;
 using arrow::Column;
 using arrow::Field;
 using arrow::Int32Array;
@@ -57,6 +61,9 @@ using arrow::StructArray;
 using arrow::Table;
 using arrow::TimestampArray;
 
+// For Array/ChunkedArray variant
+using arrow::compute::Datum;
+
 using parquet::schema::Node;
 
 // Help reduce verbosity
@@ -85,6 +92,19 @@ static inline int64_t impala_timestamp_to_nanoseconds(const Int96& impala_timest
 template <typename ArrowType>
 using ArrayType = typename ::arrow::TypeTraits<ArrowType>::ArrayType;
 
+namespace {
+
+Status GetSingleChunk(const ChunkedArray& chunked, std::shared_ptr<Array>* out) {
+  DCHECK_GT(chunked.num_chunks(), 0);
+  if (chunked.num_chunks() > 1) {
+    return Status::Invalid("Function call returned a chunked array");
+  }
+  *out = chunked.chunk(0);
+  return Status::OK();
+}
+
+}  // namespace
+
 // ----------------------------------------------------------------------
 // Iteration utilities
 
@@ -223,15 +243,18 @@ class FileReader::Impl {
   virtual ~Impl() {}
 
   Status GetColumn(int i, std::unique_ptr<ColumnReader>* out);
-  Status ReadSchemaField(int i, std::shared_ptr<Array>* out);
+
+  Status ReadSchemaField(int i, std::shared_ptr<ChunkedArray>* out);
   Status ReadSchemaField(int i, const std::vector<int>& indices,
-                         std::shared_ptr<Array>* out);
+                         std::shared_ptr<ChunkedArray>* out);
+  Status ReadColumn(int i, std::shared_ptr<ChunkedArray>* out);
+  Status ReadColumnChunk(int column_index, int row_group_index,
+                         std::shared_ptr<ChunkedArray>* out);
+
   Status GetReaderForNode(int index, const Node* node, const std::vector<int>& indices,
                           int16_t def_level,
                           std::unique_ptr<ColumnReader::ColumnReaderImpl>* out);
-  Status ReadColumn(int i, std::shared_ptr<Array>* out);
-  Status ReadColumnChunk(int column_index, int row_group_index,
-                         std::shared_ptr<Array>* out);
+
   Status GetSchema(std::shared_ptr<::arrow::Schema>* out);
   Status GetSchema(const std::vector<int>& indices,
                    std::shared_ptr<::arrow::Schema>* out);
@@ -267,7 +290,8 @@ class FileReader::Impl {
 class ColumnReader::ColumnReaderImpl {
  public:
   virtual ~ColumnReaderImpl() {}
-  virtual Status NextBatch(int64_t records_to_read, std::shared_ptr<Array>* out) = 0;
+  virtual Status NextBatch(int64_t records_to_read,
+                           std::shared_ptr<ChunkedArray>* out) = 0;
   virtual Status GetDefLevels(const int16_t** data, size_t* length) = 0;
   virtual Status GetRepLevels(const int16_t** data, size_t* length) = 0;
   virtual const std::shared_ptr<Field> field() = 0;
@@ -283,10 +307,10 @@ class PARQUET_NO_EXPORT PrimitiveImpl : public ColumnReader::ColumnReaderImpl {
     NextRowGroup();
   }
 
-  Status NextBatch(int64_t records_to_read, std::shared_ptr<Array>* out) override;
+  Status NextBatch(int64_t records_to_read, std::shared_ptr<ChunkedArray>* out) override;
 
   template <typename ParquetType>
-  Status WrapIntoListArray(std::shared_ptr<Array>* array);
+  Status WrapIntoListArray(Datum* inout_array);
 
   Status GetDefLevels(const int16_t** data, size_t* length) override;
   Status GetRepLevels(const int16_t** data, size_t* length) override;
@@ -314,7 +338,7 @@ class PARQUET_NO_EXPORT StructImpl : public ColumnReader::ColumnReaderImpl {
     InitField(node, children);
   }
 
-  Status NextBatch(int64_t records_to_read, std::shared_ptr<Array>* out) override;
+  Status NextBatch(int64_t records_to_read, std::shared_ptr<ChunkedArray>* out) override;
   Status GetDefLevels(const int16_t** data, size_t* length) override;
   Status GetRepLevels(const int16_t** data, size_t* length) override;
   const std::shared_ptr<Field> field() override { return field_; }
@@ -395,7 +419,7 @@ Status FileReader::Impl::GetReaderForNode(
   return Status::OK();
 }
 
-Status FileReader::Impl::ReadSchemaField(int i, std::shared_ptr<Array>* out) {
+Status FileReader::Impl::ReadSchemaField(int i, std::shared_ptr<ChunkedArray>* out) {
   std::vector<int> indices(reader_->metadata()->num_columns());
 
   for (size_t j = 0; j < indices.size(); ++j) {
@@ -406,7 +430,7 @@ Status FileReader::Impl::ReadSchemaField(int i, std::shared_ptr<Array>* out) {
 }
 
 Status FileReader::Impl::ReadSchemaField(int i, const std::vector<int>& indices,
-                                         std::shared_ptr<Array>* out) {
+                                         std::shared_ptr<ChunkedArray>* out) {
   auto parquet_schema = reader_->metadata()->schema();
 
   auto node = parquet_schema->group_node()->field(i).get();
@@ -432,7 +456,7 @@ Status FileReader::Impl::ReadSchemaField(int i, const std::vector<int>& indices,
   return reader->NextBatch(records_to_read, out);
 }
 
-Status FileReader::Impl::ReadColumn(int i, std::shared_ptr<Array>* out) {
+Status FileReader::Impl::ReadColumn(int i, std::shared_ptr<ChunkedArray>* out) {
   std::unique_ptr<ColumnReader> flat_column_reader;
   RETURN_NOT_OK(GetColumn(i, &flat_column_reader));
 
@@ -452,7 +476,7 @@ Status FileReader::Impl::GetSchema(const std::vector<int>& indices,
 }
 
 Status FileReader::Impl::ReadColumnChunk(int column_index, int row_group_index,
-                                         std::shared_ptr<Array>* out) {
+                                         std::shared_ptr<ChunkedArray>* out) {
   auto rg_metadata = reader_->metadata()->RowGroup(row_group_index);
   int64_t records_to_read = rg_metadata->ColumnChunk(column_index)->num_values();
 
@@ -463,10 +487,7 @@ Status FileReader::Impl::ReadColumnChunk(int column_index, int row_group_index,
       new PrimitiveImpl(pool_, std::move(input)));
   ColumnReader flat_column_reader(std::move(impl));
 
-  std::shared_ptr<Array> array;
-  RETURN_NOT_OK(flat_column_reader.NextBatch(records_to_read, &array));
-  *out = array;
-  return Status::OK();
+  return flat_column_reader.NextBatch(records_to_read, out);
 }
 
 Status FileReader::Impl::ReadRowGroup(int row_group_index,
@@ -485,7 +506,7 @@ Status FileReader::Impl::ReadRowGroup(int row_group_index,
   auto ReadColumnFunc = [&indices, &row_group_index, &schema, &columns, this](int i) {
     int column_index = indices[i];
 
-    std::shared_ptr<Array> array;
+    std::shared_ptr<ChunkedArray> array;
     RETURN_NOT_OK(ReadColumnChunk(column_index, row_group_index, &array));
     columns[i] = std::make_shared<Column>(schema->field(i), array);
     return Status::OK();
@@ -532,7 +553,7 @@ Status FileReader::Impl::ReadTable(const std::vector<int>& indices,
   std::vector<std::shared_ptr<Column>> columns(num_fields);
 
   auto ReadColumnFunc = [&indices, &field_indices, &schema, &columns, this](int i) {
-    std::shared_ptr<Array> array;
+    std::shared_ptr<ChunkedArray> array;
     RETURN_NOT_OK(ReadSchemaField(field_indices[i], indices, &array));
     columns[i] = std::make_shared<Column>(schema->field(i), array);
     return Status::OK();
@@ -576,8 +597,6 @@ Status FileReader::Impl::ReadTable(std::shared_ptr<Table>* table) {
 Status FileReader::Impl::ReadRowGroups(const std::vector<int>& row_groups,
                                        const std::vector<int>& indices,
                                        std::shared_ptr<Table>* table) {
-  // TODO(PARQUET-1393): Modify the record readers to already read this into a single,
-  // continuous array.
   std::vector<std::shared_ptr<Table>> tables(row_groups.size(), nullptr);
 
   for (size_t i = 0; i < row_groups.size(); ++i) {
@@ -633,7 +652,7 @@ Status FileReader::GetSchema(const std::vector<int>& indices,
   return impl_->GetSchema(indices, out);
 }
 
-Status FileReader::ReadColumn(int i, std::shared_ptr<Array>* out) {
+Status FileReader::ReadColumn(int i, std::shared_ptr<ChunkedArray>* out) {
   try {
     return impl_->ReadColumn(i, out);
   } catch (const ::parquet::ParquetException& e) {
@@ -641,7 +660,7 @@ Status FileReader::ReadColumn(int i, std::shared_ptr<Array>* out) {
   }
 }
 
-Status FileReader::ReadSchemaField(int i, std::shared_ptr<Array>* out) {
+Status FileReader::ReadSchemaField(int i, std::shared_ptr<ChunkedArray>* out) {
   try {
     return impl_->ReadSchemaField(i, out);
   } catch (const ::parquet::ParquetException& e) {
@@ -649,6 +668,18 @@ Status FileReader::ReadSchemaField(int i, std::shared_ptr<Array>* out) {
   }
 }
 
+Status FileReader::ReadColumn(int i, std::shared_ptr<Array>* out) {
+  std::shared_ptr<ChunkedArray> chunked_out;
+  RETURN_NOT_OK(ReadColumn(i, &chunked_out));
+  return GetSingleChunk(*chunked_out, out);
+}
+
+Status FileReader::ReadSchemaField(int i, std::shared_ptr<Array>* out) {
+  std::shared_ptr<ChunkedArray> chunked_out;
+  RETURN_NOT_OK(ReadSchemaField(i, &chunked_out));
+  return GetSingleChunk(*chunked_out, out);
+}
+
 Status FileReader::GetRecordBatchReader(const std::vector<int>& row_group_indices,
                                         std::shared_ptr<RecordBatchReader>* out) {
   std::vector<int> indices(impl_->num_columns());
@@ -764,7 +795,28 @@ const ParquetFileReader* FileReader::parquet_reader() const {
 }
 
 template <typename ParquetType>
-Status PrimitiveImpl::WrapIntoListArray(std::shared_ptr<Array>* array) {
+Status PrimitiveImpl::WrapIntoListArray(Datum* inout_array) {
+  if (descr_->max_repetition_level() == 0) {
+    // Flat, no action
+    return Status::OK();
+  }
+
+  std::shared_ptr<Array> flat_array;
+
+  // ARROW-3762(wesm): If inout_array is a chunked array, we reject as this is
+  // not yet implemented
+  if (inout_array->kind() == Datum::CHUNKED_ARRAY) {
+    if (inout_array->chunked_array()->num_chunks() > 1) {
+      return Status::NotImplemented(
+          "Nested data conversions not implemented for "
+          "chunked array outputs");
+    }
+    flat_array = inout_array->chunked_array()->chunk(0);
+  } else {
+    DCHECK_EQ(Datum::ARRAY, inout_array->kind());
+    flat_array = inout_array->make_array();
+  }
+
   const int16_t* def_levels = record_reader_->def_levels();
   const int16_t* rep_levels = record_reader_->rep_levels();
   const int64_t total_levels_read = record_reader_->levels_position();
@@ -775,110 +827,106 @@ Status PrimitiveImpl::WrapIntoListArray(std::shared_ptr<Array>* array) {
                                   &arrow_schema));
   std::shared_ptr<Field> current_field = arrow_schema->field(0);
 
-  if (descr_->max_repetition_level() > 0) {
-    // Walk downwards to extract nullability
-    std::vector<bool> nullable;
-    std::vector<std::shared_ptr<::arrow::Int32Builder>> offset_builders;
-    std::vector<std::shared_ptr<::arrow::BooleanBuilder>> valid_bits_builders;
-    nullable.push_back(current_field->nullable());
-    while (current_field->type()->num_children() > 0) {
-      if (current_field->type()->num_children() > 1) {
-        return Status::NotImplemented(
-            "Fields with more than one child are not supported.");
-      } else {
-        if (current_field->type()->id() != ::arrow::Type::LIST) {
-          return Status::NotImplemented(
-              "Currently only nesting with Lists is supported.");
-        }
-        current_field = current_field->type()->child(0);
+  // Walk downwards to extract nullability
+  std::vector<bool> nullable;
+  std::vector<std::shared_ptr<::arrow::Int32Builder>> offset_builders;
+  std::vector<std::shared_ptr<::arrow::BooleanBuilder>> valid_bits_builders;
+  nullable.push_back(current_field->nullable());
+  while (current_field->type()->num_children() > 0) {
+    if (current_field->type()->num_children() > 1) {
+      return Status::NotImplemented("Fields with more than one child are not supported.");
+    } else {
+      if (current_field->type()->id() != ::arrow::Type::LIST) {
+        return Status::NotImplemented("Currently only nesting with Lists is supported.");
       }
-      offset_builders.emplace_back(
-          std::make_shared<::arrow::Int32Builder>(::arrow::int32(), pool_));
-      valid_bits_builders.emplace_back(
-          std::make_shared<::arrow::BooleanBuilder>(::arrow::boolean(), pool_));
-      nullable.push_back(current_field->nullable());
+      current_field = current_field->type()->child(0);
     }
+    offset_builders.emplace_back(
+        std::make_shared<::arrow::Int32Builder>(::arrow::int32(), pool_));
+    valid_bits_builders.emplace_back(
+        std::make_shared<::arrow::BooleanBuilder>(::arrow::boolean(), pool_));
+    nullable.push_back(current_field->nullable());
+  }
 
-    int64_t list_depth = offset_builders.size();
-    // This describes the minimal definition that describes a level that
-    // reflects a value in the primitive values array.
-    int16_t values_def_level = descr_->max_definition_level();
-    if (nullable[nullable.size() - 1]) {
-      values_def_level--;
-    }
+  int64_t list_depth = offset_builders.size();
+  // This describes the minimal definition that describes a level that
+  // reflects a value in the primitive values array.
+  int16_t values_def_level = descr_->max_definition_level();
+  if (nullable[nullable.size() - 1]) {
+    values_def_level--;
+  }
 
-    // The definition levels that are needed so that a list is declared
-    // as empty and not null.
-    std::vector<int16_t> empty_def_level(list_depth);
-    int def_level = 0;
-    for (int i = 0; i < list_depth; i++) {
-      if (nullable[i]) {
-        def_level++;
-      }
-      empty_def_level[i] = static_cast<int16_t>(def_level);
+  // The definition levels that are needed so that a list is declared
+  // as empty and not null.
+  std::vector<int16_t> empty_def_level(list_depth);
+  int def_level = 0;
+  for (int i = 0; i < list_depth; i++) {
+    if (nullable[i]) {
       def_level++;
     }
+    empty_def_level[i] = static_cast<int16_t>(def_level);
+    def_level++;
+  }
 
-    int32_t values_offset = 0;
-    std::vector<int64_t> null_counts(list_depth, 0);
-    for (int64_t i = 0; i < total_levels_read; i++) {
-      int16_t rep_level = rep_levels[i];
-      if (rep_level < descr_->max_repetition_level()) {
-        for (int64_t j = rep_level; j < list_depth; j++) {
-          if (j == (list_depth - 1)) {
-            RETURN_NOT_OK(offset_builders[j]->Append(values_offset));
-          } else {
-            RETURN_NOT_OK(offset_builders[j]->Append(
-                static_cast<int32_t>(offset_builders[j + 1]->length())));
-          }
+  int32_t values_offset = 0;
+  std::vector<int64_t> null_counts(list_depth, 0);
+  for (int64_t i = 0; i < total_levels_read; i++) {
+    int16_t rep_level = rep_levels[i];
+    if (rep_level < descr_->max_repetition_level()) {
+      for (int64_t j = rep_level; j < list_depth; j++) {
+        if (j == (list_depth - 1)) {
+          RETURN_NOT_OK(offset_builders[j]->Append(values_offset));
+        } else {
+          RETURN_NOT_OK(offset_builders[j]->Append(
+              static_cast<int32_t>(offset_builders[j + 1]->length())));
+        }
 
-          if (((empty_def_level[j] - 1) == def_levels[i]) && (nullable[j])) {
-            RETURN_NOT_OK(valid_bits_builders[j]->Append(false));
-            null_counts[j]++;
+        if (((empty_def_level[j] - 1) == def_levels[i]) && (nullable[j])) {
+          RETURN_NOT_OK(valid_bits_builders[j]->Append(false));
+          null_counts[j]++;
+          break;
+        } else {
+          RETURN_NOT_OK(valid_bits_builders[j]->Append(true));
+          if (empty_def_level[j] == def_levels[i]) {
             break;
-          } else {
-            RETURN_NOT_OK(valid_bits_builders[j]->Append(true));
-            if (empty_def_level[j] == def_levels[i]) {
-              break;
-            }
           }
         }
       }
-      if (def_levels[i] >= values_def_level) {
-        values_offset++;
-      }
     }
-    // Add the final offset to all lists
-    for (int64_t j = 0; j < list_depth; j++) {
-      if (j == (list_depth - 1)) {
-        RETURN_NOT_OK(offset_builders[j]->Append(values_offset));
-      } else {
-        RETURN_NOT_OK(offset_builders[j]->Append(
-            static_cast<int32_t>(offset_builders[j + 1]->length())));
-      }
+    if (def_levels[i] >= values_def_level) {
+      values_offset++;
     }
-
-    std::vector<std::shared_ptr<Buffer>> offsets;
-    std::vector<std::shared_ptr<Buffer>> valid_bits;
-    std::vector<int64_t> list_lengths;
-    for (int64_t j = 0; j < list_depth; j++) {
-      list_lengths.push_back(offset_builders[j]->length() - 1);
-      std::shared_ptr<Array> array;
-      RETURN_NOT_OK(offset_builders[j]->Finish(&array));
-      offsets.emplace_back(std::static_pointer_cast<Int32Array>(array)->values());
-      RETURN_NOT_OK(valid_bits_builders[j]->Finish(&array));
-      valid_bits.emplace_back(std::static_pointer_cast<BooleanArray>(array)->values());
+  }
+  // Add the final offset to all lists
+  for (int64_t j = 0; j < list_depth; j++) {
+    if (j == (list_depth - 1)) {
+      RETURN_NOT_OK(offset_builders[j]->Append(values_offset));
+    } else {
+      RETURN_NOT_OK(offset_builders[j]->Append(
+          static_cast<int32_t>(offset_builders[j + 1]->length())));
     }
+  }
 
-    std::shared_ptr<Array> output(*array);
-    for (int64_t j = list_depth - 1; j >= 0; j--) {
-      auto list_type =
-          ::arrow::list(::arrow::field("item", output->type(), nullable[j + 1]));
-      output = std::make_shared<::arrow::ListArray>(
-          list_type, list_lengths[j], offsets[j], output, valid_bits[j], null_counts[j]);
-    }
-    *array = output;
+  std::vector<std::shared_ptr<Buffer>> offsets;
+  std::vector<std::shared_ptr<Buffer>> valid_bits;
+  std::vector<int64_t> list_lengths;
+  for (int64_t j = 0; j < list_depth; j++) {
+    list_lengths.push_back(offset_builders[j]->length() - 1);
+    std::shared_ptr<Array> array;
+    RETURN_NOT_OK(offset_builders[j]->Finish(&array));
+    offsets.emplace_back(std::static_pointer_cast<Int32Array>(array)->values());
+    RETURN_NOT_OK(valid_bits_builders[j]->Finish(&array));
+    valid_bits.emplace_back(std::static_pointer_cast<BooleanArray>(array)->values());
+  }
+
+  std::shared_ptr<Array> output = flat_array;
+  for (int64_t j = list_depth - 1; j >= 0; j--) {
+    auto list_type =
+        ::arrow::list(::arrow::field("item", output->type(), nullable[j + 1]));
+    output = std::make_shared<::arrow::ListArray>(list_type, list_lengths[j], offsets[j],
+                                                  output, valid_bits[j], null_counts[j]);
   }
+  *inout_array = output;
   return Status::OK();
 }
 
@@ -909,8 +957,7 @@ struct TransferFunctor {
   using ParquetCType = typename ParquetType::c_type;
 
   Status operator()(RecordReader* reader, MemoryPool* pool,
-                    const std::shared_ptr<::arrow::DataType>& type,
-                    std::shared_ptr<Array>* out) {
+                    const std::shared_ptr<::arrow::DataType>& type, Datum* out) {
     static_assert(!std::is_same<ArrowType, ::arrow::Int32Type>::value,
                   "The fast path transfer functor should be used "
                   "for primitive values");
@@ -938,8 +985,7 @@ template <typename ArrowType, typename ParquetType>
 struct TransferFunctor<ArrowType, ParquetType,
                        supports_fast_path<ArrowType, ParquetType>> {
   Status operator()(RecordReader* reader, MemoryPool* pool,
-                    const std::shared_ptr<::arrow::DataType>& type,
-                    std::shared_ptr<Array>* out) {
+                    const std::shared_ptr<::arrow::DataType>& type, Datum* out) {
     int64_t length = reader->values_written();
     std::shared_ptr<ResizableBuffer> values = reader->ReleaseValues();
 
@@ -957,8 +1003,7 @@ struct TransferFunctor<ArrowType, ParquetType,
 template <>
 struct TransferFunctor<::arrow::BooleanType, BooleanType> {
   Status operator()(RecordReader* reader, MemoryPool* pool,
-                    const std::shared_ptr<::arrow::DataType>& type,
-                    std::shared_ptr<Array>* out) {
+                    const std::shared_ptr<::arrow::DataType>& type, Datum* out) {
     int64_t length = reader->values_written();
     std::shared_ptr<Buffer> data;
 
@@ -991,8 +1036,7 @@ struct TransferFunctor<::arrow::BooleanType, BooleanType> {
 template <>
 struct TransferFunctor<::arrow::TimestampType, Int96Type> {
   Status operator()(RecordReader* reader, MemoryPool* pool,
-                    const std::shared_ptr<::arrow::DataType>& type,
-                    std::shared_ptr<Array>* out) {
+                    const std::shared_ptr<::arrow::DataType>& type, Datum* out) {
     int64_t length = reader->values_written();
     auto values = reinterpret_cast<const Int96*>(reader->values());
 
@@ -1019,8 +1063,7 @@ struct TransferFunctor<::arrow::TimestampType, Int96Type> {
 template <>
 struct TransferFunctor<::arrow::Date64Type, Int32Type> {
   Status operator()(RecordReader* reader, MemoryPool* pool,
-                    const std::shared_ptr<::arrow::DataType>& type,
-                    std::shared_ptr<Array>* out) {
+                    const std::shared_ptr<::arrow::DataType>& type, Datum* out) {
     int64_t length = reader->values_written();
     auto values = reinterpret_cast<const int32_t*>(reader->values());
 
@@ -1046,19 +1089,24 @@ struct TransferFunctor<::arrow::Date64Type, Int32Type> {
 template <typename ArrowType, typename ParquetType>
 struct TransferFunctor<
     ArrowType, ParquetType,
-    typename std::enable_if<std::is_same<ParquetType, ByteArrayType>::value ||
-                            std::is_same<ParquetType, FLBAType>::value>::type> {
+    typename std::enable_if<
+        (std::is_base_of<::arrow::BinaryType, ArrowType>::value ||
+         std::is_same<::arrow::FixedSizeBinaryType, ArrowType>::value) &&
+        (std::is_same<ParquetType, ByteArrayType>::value ||
+         std::is_same<ParquetType, FLBAType>::value)>::type> {
   Status operator()(RecordReader* reader, MemoryPool* pool,
-                    const std::shared_ptr<::arrow::DataType>& type,
-                    std::shared_ptr<Array>* out) {
-    RETURN_NOT_OK(reader->builder()->Finish(out));
+                    const std::shared_ptr<::arrow::DataType>& type, Datum* out) {
+    std::vector<std::shared_ptr<Array>> chunks = reader->GetBuilderChunks();
 
     if (type->id() == ::arrow::Type::STRING) {
       // Convert from BINARY type to STRING
-      auto new_data = (*out)->data()->Copy();
-      new_data->type = type;
-      *out = ::arrow::MakeArray(new_data);
+      for (size_t i = 0; i < chunks.size(); ++i) {
+        auto new_data = chunks[i]->data()->Copy();
+        new_data->type = type;
+        chunks[i] = ::arrow::MakeArray(new_data);
+      }
     }
+    *out = std::make_shared<ChunkedArray>(chunks);
     return Status::OK();
   }
 };
@@ -1166,121 +1214,133 @@ static inline void RawBytesToDecimalBytes(const uint8_t* value, int32_t byte_wid
   BytesToIntegerPair(value, byte_width, high, low);
 }
 
-/// \brief Convert an array of FixedLenByteArrays to an arrow::Decimal128Array
-/// We do this by:
-/// 1. Creating a arrow::FixedSizeBinaryArray from the RecordReader's builder
-/// 2. Allocating a buffer for the arrow::Decimal128Array
-/// 3. Converting the big-endian bytes in the FixedSizeBinaryArray to two integers
-///    representing the high and low bits of each decimal value.
+// ----------------------------------------------------------------------
+// BYTE_ARRAY / FIXED_LEN_BYTE_ARRAY -> Decimal128
+
+template <typename T>
+Status ConvertToDecimal128(const Array& array, const std::shared_ptr<::arrow::DataType>&,
+                           MemoryPool* pool, std::shared_ptr<Array>*) {
+  return Status::NotImplemented("not implemented");
+}
+
 template <>
-struct TransferFunctor<::arrow::Decimal128Type, FLBAType> {
-  Status operator()(RecordReader* reader, MemoryPool* pool,
-                    const std::shared_ptr<::arrow::DataType>& type,
-                    std::shared_ptr<Array>* out) {
-    DCHECK_EQ(type->id(), ::arrow::Type::DECIMAL);
+Status ConvertToDecimal128<FLBAType>(const Array& array,
+                                     const std::shared_ptr<::arrow::DataType>& type,
+                                     MemoryPool* pool, std::shared_ptr<Array>* out) {
+  const auto& fixed_size_binary_array =
+      static_cast<const ::arrow::FixedSizeBinaryArray&>(array);
 
-    // Finish the built data into a temporary array
-    std::shared_ptr<Array> array;
-    RETURN_NOT_OK(reader->builder()->Finish(&array));
-    const auto& fixed_size_binary_array =
-        static_cast<const ::arrow::FixedSizeBinaryArray&>(*array);
+  // The byte width of each decimal value
+  const int32_t type_length =
+      static_cast<const ::arrow::Decimal128Type&>(*type).byte_width();
 
-    // Get the byte width of the values in the FixedSizeBinaryArray. Most of the time
-    // this will be different from the decimal array width because we write the minimum
-    // number of bytes necessary to represent a given precision
-    const int32_t byte_width =
-        static_cast<const ::arrow::FixedSizeBinaryType&>(*fixed_size_binary_array.type())
-            .byte_width();
+  // number of elements in the entire array
+  const int64_t length = fixed_size_binary_array.length();
 
-    // The byte width of each decimal value
-    const int32_t type_length =
-        static_cast<const ::arrow::Decimal128Type&>(*type).byte_width();
+  // Get the byte width of the values in the FixedSizeBinaryArray. Most of the time
+  // this will be different from the decimal array width because we write the minimum
+  // number of bytes necessary to represent a given precision
+  const int32_t byte_width =
+      static_cast<const ::arrow::FixedSizeBinaryType&>(*fixed_size_binary_array.type())
+          .byte_width();
 
-    // number of elements in the entire array
-    const int64_t length = fixed_size_binary_array.length();
+  // allocate memory for the decimal array
+  std::shared_ptr<Buffer> data;
+  RETURN_NOT_OK(::arrow::AllocateBuffer(pool, length * type_length, &data));
 
-    // allocate memory for the decimal array
-    std::shared_ptr<Buffer> data;
-    RETURN_NOT_OK(::arrow::AllocateBuffer(pool, length * type_length, &data));
-
-    // raw bytes that we can write to
-    uint8_t* out_ptr = data->mutable_data();
-
-    // convert each FixedSizeBinary value to valid decimal bytes
-    const int64_t null_count = fixed_size_binary_array.null_count();
-    if (null_count > 0) {
-      for (int64_t i = 0; i < length; ++i, out_ptr += type_length) {
-        if (!fixed_size_binary_array.IsNull(i)) {
-          RawBytesToDecimalBytes(fixed_size_binary_array.GetValue(i), byte_width,
-                                 out_ptr);
-        }
-      }
-    } else {
-      for (int64_t i = 0; i < length; ++i, out_ptr += type_length) {
+  // raw bytes that we can write to
+  uint8_t* out_ptr = data->mutable_data();
+
+  // convert each FixedSizeBinary value to valid decimal bytes
+  const int64_t null_count = fixed_size_binary_array.null_count();
+  if (null_count > 0) {
+    for (int64_t i = 0; i < length; ++i, out_ptr += type_length) {
+      if (!fixed_size_binary_array.IsNull(i)) {
         RawBytesToDecimalBytes(fixed_size_binary_array.GetValue(i), byte_width, out_ptr);
       }
     }
-
-    *out = std::make_shared<::arrow::Decimal128Array>(
-        type, length, data, fixed_size_binary_array.null_bitmap(), null_count);
-    return Status::OK();
+  } else {
+    for (int64_t i = 0; i < length; ++i, out_ptr += type_length) {
+      RawBytesToDecimalBytes(fixed_size_binary_array.GetValue(i), byte_width, out_ptr);
+    }
   }
-};
 
-/// \brief Convert an arrow::BinaryArray to an arrow::Decimal128Array
-/// We do this by:
-/// 1. Creating an arrow::BinaryArray from the RecordReader's builder
-/// 2. Allocating a buffer for the arrow::Decimal128Array
-/// 3. Converting the big-endian bytes in each BinaryArray entry to two integers
-///    representing the high and low bits of each decimal value.
-template <>
-struct TransferFunctor<::arrow::Decimal128Type, ByteArrayType> {
-  Status operator()(RecordReader* reader, MemoryPool* pool,
-                    const std::shared_ptr<::arrow::DataType>& type,
-                    std::shared_ptr<Array>* out) {
-    DCHECK_EQ(type->id(), ::arrow::Type::DECIMAL);
+  *out = std::make_shared<::arrow::Decimal128Array>(
+      type, length, data, fixed_size_binary_array.null_bitmap(), null_count);
 
-    // Finish the built data into a temporary array
-    std::shared_ptr<Array> array;
-    RETURN_NOT_OK(reader->builder()->Finish(&array));
-    const auto& binary_array = static_cast<const ::arrow::BinaryArray&>(*array);
+  return Status::OK();
+}
 
-    const int64_t length = binary_array.length();
+template <>
+Status ConvertToDecimal128<ByteArrayType>(const Array& array,
+                                          const std::shared_ptr<::arrow::DataType>& type,
+                                          MemoryPool* pool, std::shared_ptr<Array>* out) {
+  const auto& binary_array = static_cast<const ::arrow::BinaryArray&>(array);
+  const int64_t length = binary_array.length();
 
-    const auto& decimal_type = static_cast<const ::arrow::Decimal128Type&>(*type);
-    const int64_t type_length = decimal_type.byte_width();
+  const auto& decimal_type = static_cast<const ::arrow::Decimal128Type&>(*type);
+  const int64_t type_length = decimal_type.byte_width();
 
-    std::shared_ptr<Buffer> data;
-    RETURN_NOT_OK(::arrow::AllocateBuffer(pool, length * type_length, &data));
+  std::shared_ptr<Buffer> data;
+  RETURN_NOT_OK(::arrow::AllocateBuffer(pool, length * type_length, &data));
 
-    // raw bytes that we can write to
-    uint8_t* out_ptr = data->mutable_data();
+  // raw bytes that we can write to
+  uint8_t* out_ptr = data->mutable_data();
 
-    const int64_t null_count = binary_array.null_count();
+  const int64_t null_count = binary_array.null_count();
 
-    // convert each BinaryArray value to valid decimal bytes
-    for (int64_t i = 0; i < length; i++, out_ptr += type_length) {
-      int32_t record_len = 0;
-      const uint8_t* record_loc = binary_array.GetValue(i, &record_len);
+  // convert each BinaryArray value to valid decimal bytes
+  for (int64_t i = 0; i < length; i++, out_ptr += type_length) {
+    int32_t record_len = 0;
+    const uint8_t* record_loc = binary_array.GetValue(i, &record_len);
 
-      if ((record_len < 0) || (record_len > type_length)) {
-        return Status::Invalid("Invalid BYTE_ARRAY size");
-      }
+    if ((record_len < 0) || (record_len > type_length)) {
+      return Status::Invalid("Invalid BYTE_ARRAY size");
+    }
 
-      auto out_ptr_view = reinterpret_cast<uint64_t*>(out_ptr);
-      out_ptr_view[0] = 0;
-      out_ptr_view[1] = 0;
+    auto out_ptr_view = reinterpret_cast<uint64_t*>(out_ptr);
+    out_ptr_view[0] = 0;
+    out_ptr_view[1] = 0;
 
-      // only convert rows that are not null if there are nulls, or
-      // all rows, if there are not
-      if (((null_count > 0) && !binary_array.IsNull(i)) || (null_count <= 0)) {
-        RawBytesToDecimalBytes(record_loc, record_len, out_ptr);
-      }
+    // only convert rows that are not null if there are nulls, or
+    // all rows, if there are not
+    if (((null_count > 0) && !binary_array.IsNull(i)) || (null_count <= 0)) {
+      RawBytesToDecimalBytes(record_loc, record_len, out_ptr);
     }
+  }
+
+  *out = std::make_shared<::arrow::Decimal128Array>(
+      type, length, data, binary_array.null_bitmap(), null_count);
+  return Status::OK();
+}
+
+/// \brief Convert an arrow::BinaryArray to an arrow::Decimal128Array
+/// We do this by:
+/// 1. Creating an arrow::BinaryArray from the RecordReader's builder
+/// 2. Allocating a buffer for the arrow::Decimal128Array
+/// 3. Converting the big-endian bytes in each BinaryArray entry to two integers
+///    representing the high and low bits of each decimal value.
+template <typename ArrowType, typename ParquetType>
+struct TransferFunctor<
+    ArrowType, ParquetType,
+    typename std::enable_if<std::is_same<ArrowType, ::arrow::Decimal128Type>::value &&
+                            (std::is_same<ParquetType, ByteArrayType>::value ||
+                             std::is_same<ParquetType, FLBAType>::value)>::type> {
+  Status operator()(RecordReader* reader, MemoryPool* pool,
+                    const std::shared_ptr<::arrow::DataType>& type, Datum* out) {
+    DCHECK_EQ(type->id(), ::arrow::Type::DECIMAL);
 
-    *out = std::make_shared<::arrow::Decimal128Array>(
-        type, length, data, binary_array.null_bitmap(), null_count);
+    ::arrow::ArrayVector chunks = reader->GetBuilderChunks();
 
+    for (size_t i = 0; i < chunks.size(); ++i) {
+      std::shared_ptr<Array> chunk_as_decimal;
+      RETURN_NOT_OK(
+          ConvertToDecimal128<ParquetType>(*chunks[i], type, pool, &chunk_as_decimal));
+
+      // Replace the chunk, which will hopefully also free memory as we go
+      chunks[i] = chunk_as_decimal;
+    }
+    *out = std::make_shared<ChunkedArray>(chunks);
     return Status::OK();
   }
 };
@@ -1295,7 +1355,7 @@ template <typename ParquetIntegerType,
               std::is_same<ParquetIntegerType, Int64Type>::value>::type>
 static Status DecimalIntegerTransfer(RecordReader* reader, MemoryPool* pool,
                                      const std::shared_ptr<::arrow::DataType>& type,
-                                     std::shared_ptr<Array>* out) {
+                                     Datum* out) {
   DCHECK_EQ(type->id(), ::arrow::Type::DECIMAL);
 
   const int64_t length = reader->values_written();
@@ -1342,8 +1402,7 @@ static Status DecimalIntegerTransfer(RecordReader* reader, MemoryPool* pool,
 template <>
 struct TransferFunctor<::arrow::Decimal128Type, Int32Type> {
   Status operator()(RecordReader* reader, MemoryPool* pool,
-                    const std::shared_ptr<::arrow::DataType>& type,
-                    std::shared_ptr<Array>* out) {
+                    const std::shared_ptr<::arrow::DataType>& type, Datum* out) {
     return DecimalIntegerTransfer<Int32Type>(reader, pool, type, out);
   }
 };
@@ -1351,23 +1410,23 @@ struct TransferFunctor<::arrow::Decimal128Type, Int32Type> {
 template <>
 struct TransferFunctor<::arrow::Decimal128Type, Int64Type> {
   Status operator()(RecordReader* reader, MemoryPool* pool,
-                    const std::shared_ptr<::arrow::DataType>& type,
-                    std::shared_ptr<Array>* out) {
+                    const std::shared_ptr<::arrow::DataType>& type, Datum* out) {
     return DecimalIntegerTransfer<Int64Type>(reader, pool, type, out);
   }
 };
 
-#define TRANSFER_DATA(ArrowType, ParquetType)                            \
-  TransferFunctor<ArrowType, ParquetType> func;                          \
-  RETURN_NOT_OK(func(record_reader_.get(), pool_, field_->type(), out)); \
-  RETURN_NOT_OK(WrapIntoListArray<ParquetType>(out))
+#define TRANSFER_DATA(ArrowType, ParquetType)                                \
+  TransferFunctor<ArrowType, ParquetType> func;                              \
+  RETURN_NOT_OK(func(record_reader_.get(), pool_, field_->type(), &result)); \
+  RETURN_NOT_OK(WrapIntoListArray<ParquetType>(&result))
 
 #define TRANSFER_CASE(ENUM, ArrowType, ParquetType) \
   case ::arrow::Type::ENUM: {                       \
     TRANSFER_DATA(ArrowType, ParquetType);          \
   } break;
 
-Status PrimitiveImpl::NextBatch(int64_t records_to_read, std::shared_ptr<Array>* out) {
+Status PrimitiveImpl::NextBatch(int64_t records_to_read,
+                                std::shared_ptr<ChunkedArray>* out) {
   try {
     // Pre-allocation gives much better performance for flat columns
     record_reader_->Reserve(records_to_read);
@@ -1387,6 +1446,7 @@ Status PrimitiveImpl::NextBatch(int64_t records_to_read, std::shared_ptr<Array>*
     return ::arrow::Status::IOError(e.what());
   }
 
+  Datum result;
   switch (field_->type()->id()) {
     TRANSFER_CASE(BOOL, ::arrow::BooleanType, BooleanType)
     TRANSFER_CASE(UINT8, ::arrow::UInt8Type, Int32Type)
@@ -1405,8 +1465,8 @@ Status PrimitiveImpl::NextBatch(int64_t records_to_read, std::shared_ptr<Array>*
     TRANSFER_CASE(DATE64, ::arrow::Date64Type, Int32Type)
     TRANSFER_CASE(FIXED_SIZE_BINARY, ::arrow::FixedSizeBinaryType, FLBAType)
     case ::arrow::Type::NA: {
-      *out = std::make_shared<::arrow::NullArray>(record_reader_->values_written());
-      RETURN_NOT_OK(WrapIntoListArray<Int32Type>(out));
+      result = std::make_shared<::arrow::NullArray>(record_reader_->values_written());
+      RETURN_NOT_OK(WrapIntoListArray<Int32Type>(&result));
       break;
     }
     case ::arrow::Type::DECIMAL: {
@@ -1452,6 +1512,15 @@ Status PrimitiveImpl::NextBatch(int64_t records_to_read, std::shared_ptr<Array>*
       return Status::NotImplemented(ss.str());
   }
 
+  DCHECK_NE(result.kind(), Datum::NONE);
+
+  if (result.kind() == Datum::ARRAY) {
+    *out = std::make_shared<ChunkedArray>(result.make_array());
+  } else if (result.kind() == Datum::CHUNKED_ARRAY) {
+    *out = result.chunked_array();
+  } else {
+    DCHECK(false) << "Should be impossible";
+  }
   return Status::OK();
 }
 
@@ -1477,10 +1546,17 @@ ColumnReader::ColumnReader(std::unique_ptr<ColumnReaderImpl> impl)
 
 ColumnReader::~ColumnReader() {}
 
-Status ColumnReader::NextBatch(int64_t records_to_read, std::shared_ptr<Array>* out) {
+Status ColumnReader::NextBatch(int64_t records_to_read,
+                               std::shared_ptr<ChunkedArray>* out) {
   return impl_->NextBatch(records_to_read, out);
 }
 
+Status ColumnReader::NextBatch(int64_t records_to_read, std::shared_ptr<Array>* out) {
+  std::shared_ptr<ChunkedArray> chunked_out;
+  RETURN_NOT_OK(impl_->NextBatch(records_to_read, &chunked_out));
+  return GetSingleChunk(*chunked_out, out);
+}
+
 // StructImpl methods
 
 Status StructImpl::DefLevelsToNullArray(std::shared_ptr<Buffer>* null_bitmap_out,
@@ -1565,17 +1641,21 @@ Status StructImpl::GetRepLevels(const int16_t** data, size_t* length) {
   return Status::NotImplemented("GetRepLevels is not implemented for struct");
 }
 
-Status StructImpl::NextBatch(int64_t records_to_read, std::shared_ptr<Array>* out) {
+Status StructImpl::NextBatch(int64_t records_to_read,
+                             std::shared_ptr<ChunkedArray>* out) {
   std::vector<std::shared_ptr<Array>> children_arrays;
   std::shared_ptr<Buffer> null_bitmap;
   int64_t null_count;
 
   // Gather children arrays and def levels
   for (auto& child : children_) {
-    std::shared_ptr<Array> child_array;
+    std::shared_ptr<ChunkedArray> field;
+    RETURN_NOT_OK(child->NextBatch(records_to_read, &field));
 
-    RETURN_NOT_OK(child->NextBatch(records_to_read, &child_array));
-    children_arrays.push_back(child_array);
+    if (field->num_chunks() > 1) {
+      return Status::Invalid("Chunked field reads not yet supported with StructArray");
+    }
+    children_arrays.push_back(field->chunk(0));
   }
 
   RETURN_NOT_OK(DefLevelsToNullArray(&null_bitmap, &null_count));
@@ -1589,8 +1669,9 @@ Status StructImpl::NextBatch(int64_t records_to_read, std::shared_ptr<Array>* ou
     }
   }
 
-  *out = std::make_shared<StructArray>(field()->type(), struct_length, children_arrays,
-                                       null_bitmap, null_count);
+  auto result = std::make_shared<StructArray>(field()->type(), struct_length,
+                                              children_arrays, null_bitmap, null_count);
+  *out = std::make_shared<ChunkedArray>(result);
   return Status::OK();
 }
 
@@ -1613,10 +1694,16 @@ RowGroupReader::~RowGroupReader() {}
 RowGroupReader::RowGroupReader(FileReader::Impl* impl, int row_group_index)
     : impl_(impl), row_group_index_(row_group_index) {}
 
-Status ColumnChunkReader::Read(std::shared_ptr<::arrow::Array>* out) {
+Status ColumnChunkReader::Read(std::shared_ptr<::arrow::ChunkedArray>* out) {
   return impl_->ReadColumnChunk(column_index_, row_group_index_, out);
 }
 
+Status ColumnChunkReader::Read(std::shared_ptr<::arrow::Array>* out) {
+  std::shared_ptr<ChunkedArray> chunked_out;
+  RETURN_NOT_OK(impl_->ReadColumnChunk(column_index_, row_group_index_, &chunked_out));
+  return GetSingleChunk(*chunked_out, out);
+}
+
 ColumnChunkReader::~ColumnChunkReader() {}
 
 ColumnChunkReader::ColumnChunkReader(FileReader::Impl* impl, int row_group_index,
diff --git a/cpp/src/parquet/arrow/reader.h b/cpp/src/parquet/arrow/reader.h
index 2cd94ca..5286e74 100644
--- a/cpp/src/parquet/arrow/reader.h
+++ b/cpp/src/parquet/arrow/reader.h
@@ -30,6 +30,7 @@
 namespace arrow {
 
 class Array;
+class ChunkedArray;
 class MemoryPool;
 class RecordBatchReader;
 class Schema;
@@ -125,6 +126,10 @@ class PARQUET_EXPORT FileReader {
                             std::shared_ptr<::arrow::Schema>* out);
 
   // Read column as a whole into an Array.
+  ::arrow::Status ReadColumn(int i, std::shared_ptr<::arrow::ChunkedArray>* out);
+
+  /// \note Deprecated since 0.12
+  ARROW_DEPRECATED("Use version with ChunkedArray output")
   ::arrow::Status ReadColumn(int i, std::shared_ptr<::arrow::Array>* out);
 
   // NOTE: Experimental API
@@ -139,27 +144,11 @@ class PARQUET_EXPORT FileReader {
   // 2 foo3
   //
   // i=0 will read the entire foo struct, i=1 the foo2 primitive column etc
-  ::arrow::Status ReadSchemaField(int i, std::shared_ptr<::arrow::Array>* out);
+  ::arrow::Status ReadSchemaField(int i, std::shared_ptr<::arrow::ChunkedArray>* out);
 
-  // NOTE: Experimental API
-  // Reads a specific top level schema field into an Array, while keeping only chosen
-  // leaf columns.
-  // The index i refers the index of the top level schema field, which may
-  // be nested or flat, and indices vector refers to the leaf column indices - e.g.
-  //
-  // i  indices
-  // 0  0        foo.bar
-  // 0  1        foo.bar.baz
-  // 0  2        foo.qux
-  // 1  3        foo2
-  // 2  4        foo3
-  //
-  // i=0 indices={0,2} will read a partial struct with foo.bar and foo.quox columns
-  // i=1 indices={3} will read foo2 column
-  // i=1 indices={2} will result in out=nullptr
-  // leaf indices which are unrelated to the schema field are ignored
-  ::arrow::Status ReadSchemaField(int i, const std::vector<int>& indices,
-                                  std::shared_ptr<::arrow::Array>* out);
+  /// \note Deprecated since 0.12
+  ARROW_DEPRECATED("Use version with ChunkedArray output")
+  ::arrow::Status ReadSchemaField(int i, std::shared_ptr<::arrow::Array>* out);
 
   /// \brief Return a RecordBatchReader of row groups selected from row_group_indices, the
   ///    ordering in row_group_indices matters.
@@ -248,6 +237,10 @@ class PARQUET_EXPORT RowGroupReader {
 
 class PARQUET_EXPORT ColumnChunkReader {
  public:
+  ::arrow::Status Read(std::shared_ptr<::arrow::ChunkedArray>* out);
+
+  /// \note Deprecated since 0.12
+  ARROW_DEPRECATED("Use version with ChunkedArray output")
   ::arrow::Status Read(std::shared_ptr<::arrow::Array>* out);
 
   virtual ~ColumnChunkReader();
@@ -281,6 +274,11 @@ class PARQUET_EXPORT ColumnReader {
   //
   // Returns Status::OK on a successful read, including if you have exhausted
   // the data available in the file.
+  ::arrow::Status NextBatch(int64_t batch_size,
+                            std::shared_ptr<::arrow::ChunkedArray>* out);
+
+  /// \note Deprecated since 0.12
+  ARROW_DEPRECATED("Use version with ChunkedArray output")
   ::arrow::Status NextBatch(int64_t batch_size, std::shared_ptr<::arrow::Array>* out);
 
  private:
diff --git a/cpp/src/parquet/arrow/record_reader.cc b/cpp/src/parquet/arrow/record_reader.cc
index 4a3cd52..d1bf2c5 100644
--- a/cpp/src/parquet/arrow/record_reader.cc
+++ b/cpp/src/parquet/arrow/record_reader.cc
@@ -86,14 +86,6 @@ class RecordReader::RecordReaderImpl {
     valid_bits_ = AllocateBuffer(pool);
     def_levels_ = AllocateBuffer(pool);
     rep_levels_ = AllocateBuffer(pool);
-
-    if (descr->physical_type() == Type::BYTE_ARRAY) {
-      builder_.reset(new ::arrow::BinaryBuilder(pool));
-    } else if (descr->physical_type() == Type::FIXED_LEN_BYTE_ARRAY) {
-      int byte_width = descr->type_length();
-      std::shared_ptr<::arrow::DataType> type = ::arrow::fixed_size_binary(byte_width);
-      builder_.reset(new ::arrow::FixedSizeBinaryBuilder(type, pool));
-    }
     Reset();
   }
 
@@ -229,8 +221,6 @@ class RecordReader::RecordReaderImpl {
     return result;
   }
 
-  ::arrow::ArrayBuilder* builder() { return builder_.get(); }
-
   // Process written repetition/definition levels to reach the end of
   // records. Process no more levels than necessary to delimit the indicated
   // number of logical records. Updates internal state of RecordReader
@@ -375,7 +365,7 @@ class RecordReader::RecordReaderImpl {
 
     records_read_ = 0;
 
-    // Calling Finish on the builders also resets them
+    // Call Finish on the binary builders to reset them
   }
 
   void ResetValues() {
@@ -391,6 +381,8 @@ class RecordReader::RecordReaderImpl {
 
   virtual void DebugPrintState() = 0;
 
+  virtual std::vector<std::shared_ptr<::arrow::Array>> GetBuilderChunks() = 0;
+
  protected:
   virtual bool ReadNewPage() = 0;
 
@@ -434,9 +426,6 @@ class RecordReader::RecordReaderImpl {
   int64_t levels_position_;
   int64_t levels_capacity_;
 
-  // TODO(wesm): ByteArray / FixedLenByteArray types
-  std::unique_ptr<::arrow::ArrayBuilder> builder_;
-
   std::shared_ptr<::arrow::ResizableBuffer> values_;
 
   template <typename T>
@@ -450,12 +439,31 @@ class RecordReader::RecordReaderImpl {
 };
 
 template <typename DType>
+struct RecordReaderTraits {
+  using BuilderType = ::arrow::ArrayBuilder;
+};
+
+template <>
+struct RecordReaderTraits<ByteArrayType> {
+  using BuilderType = ::arrow::internal::ChunkedBinaryBuilder;
+};
+
+template <>
+struct RecordReaderTraits<FLBAType> {
+  using BuilderType = ::arrow::FixedSizeBinaryBuilder;
+};
+
+template <typename DType>
 class TypedRecordReader : public RecordReader::RecordReaderImpl {
  public:
-  typedef typename DType::c_type T;
+  using T = typename DType::c_type;
 
-  TypedRecordReader(const ColumnDescriptor* schema, ::arrow::MemoryPool* pool)
-      : RecordReader::RecordReaderImpl(schema, pool), current_decoder_(nullptr) {}
+  using BuilderType = typename RecordReaderTraits<DType>::BuilderType;
+
+  TypedRecordReader(const ColumnDescriptor* descr, ::arrow::MemoryPool* pool)
+      : RecordReader::RecordReaderImpl(descr, pool), current_decoder_(nullptr) {
+    InitializeBuilder();
+  }
 
   void ResetDecoders() override { decoders_.clear(); }
 
@@ -546,6 +554,10 @@ class TypedRecordReader : public RecordReader::RecordReaderImpl {
     std::cout << std::endl;
   }
 
+  std::vector<std::shared_ptr<::arrow::Array>> GetBuilderChunks() override {
+    throw ParquetException("GetChunks only implemented for binary types");
+  }
+
  private:
   typedef Decoder<DType> DecoderType;
 
@@ -554,11 +566,15 @@ class TypedRecordReader : public RecordReader::RecordReaderImpl {
   // plain-encoded data.
   std::unordered_map<int, std::shared_ptr<DecoderType>> decoders_;
 
+  std::unique_ptr<BuilderType> builder_;
+
   DecoderType* current_decoder_;
 
   // Advance to the next data page
   bool ReadNewPage() override;
 
+  void InitializeBuilder() {}
+
   void ConfigureDictionary(const DictionaryPage* page);
 };
 
@@ -573,16 +589,45 @@ template <>
 void TypedRecordReader<FLBAType>::DebugPrintState() {}
 
 template <>
+void TypedRecordReader<ByteArrayType>::InitializeBuilder() {
+  // Maximum of 16MB chunks
+  constexpr int32_t kBinaryChunksize = 1 << 24;
+  DCHECK_EQ(descr_->physical_type(), Type::BYTE_ARRAY);
+  builder_.reset(new ::arrow::internal::ChunkedBinaryBuilder(kBinaryChunksize, pool_));
+}
+
+template <>
+void TypedRecordReader<FLBAType>::InitializeBuilder() {
+  DCHECK_EQ(descr_->physical_type(), Type::FIXED_LEN_BYTE_ARRAY);
+  int byte_width = descr_->type_length();
+  std::shared_ptr<::arrow::DataType> type = ::arrow::fixed_size_binary(byte_width);
+  builder_.reset(new ::arrow::FixedSizeBinaryBuilder(type, pool_));
+}
+
+template <>
+::arrow::ArrayVector TypedRecordReader<ByteArrayType>::GetBuilderChunks() {
+  ::arrow::ArrayVector chunks;
+  PARQUET_THROW_NOT_OK(builder_->Finish(&chunks));
+  return chunks;
+}
+
+template <>
+::arrow::ArrayVector TypedRecordReader<FLBAType>::GetBuilderChunks() {
+  std::shared_ptr<::arrow::Array> chunk;
+  PARQUET_THROW_NOT_OK(builder_->Finish(&chunk));
+  return ::arrow::ArrayVector({chunk});
+}
+
+template <>
 inline void TypedRecordReader<ByteArrayType>::ReadValuesDense(int64_t values_to_read) {
   auto values = ValuesHead<ByteArray>();
   int64_t num_decoded =
       current_decoder_->Decode(values, static_cast<int>(values_to_read));
   DCHECK_EQ(num_decoded, values_to_read);
 
-  auto builder = static_cast<::arrow::BinaryBuilder*>(builder_.get());
   for (int64_t i = 0; i < num_decoded; i++) {
     PARQUET_THROW_NOT_OK(
-        builder->Append(values[i].ptr, static_cast<int32_t>(values[i].len)));
+        builder_->Append(values[i].ptr, static_cast<int32_t>(values[i].len)));
   }
   ResetValues();
 }
@@ -594,9 +639,8 @@ inline void TypedRecordReader<FLBAType>::ReadValuesDense(int64_t values_to_read)
       current_decoder_->Decode(values, static_cast<int>(values_to_read));
   DCHECK_EQ(num_decoded, values_to_read);
 
-  auto builder = static_cast<::arrow::FixedSizeBinaryBuilder*>(builder_.get());
   for (int64_t i = 0; i < num_decoded; i++) {
-    PARQUET_THROW_NOT_OK(builder->Append(values[i].ptr));
+    PARQUET_THROW_NOT_OK(builder_->Append(values[i].ptr));
   }
   ResetValues();
 }
@@ -613,14 +657,12 @@ inline void TypedRecordReader<ByteArrayType>::ReadValuesSpaced(int64_t values_to
       valid_bits_offset);
   DCHECK_EQ(num_decoded, values_to_read);
 
-  auto builder = static_cast<::arrow::BinaryBuilder*>(builder_.get());
-
   for (int64_t i = 0; i < num_decoded; i++) {
     if (::arrow::BitUtil::GetBit(valid_bits, valid_bits_offset + i)) {
       PARQUET_THROW_NOT_OK(
-          builder->Append(values[i].ptr, static_cast<int32_t>(values[i].len)));
+          builder_->Append(values[i].ptr, static_cast<int32_t>(values[i].len)));
     } else {
-      PARQUET_THROW_NOT_OK(builder->AppendNull());
+      PARQUET_THROW_NOT_OK(builder_->AppendNull());
     }
   }
   ResetValues();
@@ -638,12 +680,11 @@ inline void TypedRecordReader<FLBAType>::ReadValuesSpaced(int64_t values_to_read
       valid_bits_offset);
   DCHECK_EQ(num_decoded, values_to_read);
 
-  auto builder = static_cast<::arrow::FixedSizeBinaryBuilder*>(builder_.get());
   for (int64_t i = 0; i < num_decoded; i++) {
     if (::arrow::BitUtil::GetBit(valid_bits, valid_bits_offset + i)) {
-      PARQUET_THROW_NOT_OK(builder->Append(values[i].ptr));
+      PARQUET_THROW_NOT_OK(builder_->Append(values[i].ptr));
     } else {
-      PARQUET_THROW_NOT_OK(builder->AppendNull());
+      PARQUET_THROW_NOT_OK(builder_->AppendNull());
     }
   }
   ResetValues();
@@ -845,8 +886,6 @@ std::shared_ptr<ResizableBuffer> RecordReader::ReleaseIsValid() {
   return impl_->ReleaseIsValid();
 }
 
-::arrow::ArrayBuilder* RecordReader::builder() { return impl_->builder(); }
-
 int64_t RecordReader::values_written() const { return impl_->values_written(); }
 
 int64_t RecordReader::levels_position() const { return impl_->levels_position(); }
@@ -863,6 +902,10 @@ void RecordReader::SetPageReader(std::unique_ptr<PageReader> reader) {
   impl_->SetPageReader(std::move(reader));
 }
 
+::arrow::ArrayVector RecordReader::GetBuilderChunks() {
+  return impl_->GetBuilderChunks();
+}
+
 void RecordReader::DebugPrintState() { impl_->DebugPrintState(); }
 
 }  // namespace internal
diff --git a/cpp/src/parquet/arrow/record_reader.h b/cpp/src/parquet/arrow/record_reader.h
index 7efd0d5..0f62b74 100644
--- a/cpp/src/parquet/arrow/record_reader.h
+++ b/cpp/src/parquet/arrow/record_reader.h
@@ -20,6 +20,7 @@
 
 #include <cstdint>
 #include <memory>
+#include <vector>
 
 #include "arrow/memory_pool.h"
 
@@ -28,7 +29,7 @@
 
 namespace arrow {
 
-class ArrayBuilder;
+class Array;
 
 }  // namespace arrow
 
@@ -77,7 +78,6 @@ class RecordReader {
 
   std::shared_ptr<ResizableBuffer> ReleaseValues();
   std::shared_ptr<ResizableBuffer> ReleaseIsValid();
-  ::arrow::ArrayBuilder* builder();
 
   /// \brief Number of values written including nulls (if any)
   int64_t values_written() const;
@@ -106,6 +106,9 @@ class RecordReader {
 
   void DebugPrintState();
 
+  // For BYTE_ARRAY, FIXED_LEN_BYTE_ARRAY types that may have chunked output
+  std::vector<std::shared_ptr<::arrow::Array>> GetBuilderChunks();
+
  private:
   std::unique_ptr<RecordReaderImpl> impl_;
   explicit RecordReader(RecordReaderImpl* impl);
diff --git a/python/pyarrow/_parquet.pxd b/python/pyarrow/_parquet.pxd
index 9e1a249..b63e72c 100644
--- a/python/pyarrow/_parquet.pxd
+++ b/python/pyarrow/_parquet.pxd
@@ -19,7 +19,7 @@
 # cython: language_level = 3
 
 from pyarrow.includes.common cimport *
-from pyarrow.includes.libarrow cimport (CArray, CSchema, CStatus,
+from pyarrow.includes.libarrow cimport (CChunkedArray, CSchema, CStatus,
                                         CTable, CMemoryPool,
                                         CKeyValueMetadata,
                                         RandomAccessFile, OutputStream,
@@ -272,8 +272,8 @@ cdef extern from "parquet/arrow/reader.h" namespace "parquet::arrow" nogil:
 
     cdef cppclass FileReader:
         FileReader(CMemoryPool* pool, unique_ptr[ParquetFileReader] reader)
-        CStatus ReadColumn(int i, shared_ptr[CArray]* out)
-        CStatus ReadSchemaField(int i, shared_ptr[CArray]* out)
+        CStatus ReadColumn(int i, shared_ptr[CChunkedArray]* out)
+        CStatus ReadSchemaField(int i, shared_ptr[CChunkedArray]* out)
 
         int num_row_groups()
         CStatus ReadRowGroup(int i, shared_ptr[CTable]* out)
diff --git a/python/pyarrow/_parquet.pyx b/python/pyarrow/_parquet.pyx
index 8112504..36a4d34 100644
--- a/python/pyarrow/_parquet.pyx
+++ b/python/pyarrow/_parquet.pyx
@@ -26,6 +26,7 @@ from pyarrow.lib cimport (Array, Schema,
                           check_status,
                           MemoryPool, maybe_unbox_memory_pool,
                           Table,
+                          pyarrow_wrap_chunked_array,
                           pyarrow_wrap_schema,
                           pyarrow_wrap_table,
                           NativeFile, get_reader, get_writer)
@@ -770,28 +771,18 @@ cdef class ParquetReader:
         return self._column_idx_map[tobytes(column_name)]
 
     def read_column(self, int column_index):
-        cdef:
-            Array array = Array()
-            shared_ptr[CArray] carray
-
+        cdef shared_ptr[CChunkedArray] out
         with nogil:
             check_status(self.reader.get()
-                         .ReadColumn(column_index, &carray))
-
-        array.init(carray)
-        return array
+                         .ReadColumn(column_index, &out))
+        return pyarrow_wrap_chunked_array(out)
 
     def read_schema_field(self, int field_index):
-        cdef:
-            Array array = Array()
-            shared_ptr[CArray] carray
-
+        cdef shared_ptr[CChunkedArray] out
         with nogil:
             check_status(self.reader.get()
-                         .ReadSchemaField(field_index, &carray))
-
-        array.init(carray)
-        return array
+                         .ReadSchemaField(field_index, &out))
+        return pyarrow_wrap_chunked_array(out)
 
 
 cdef class ParquetWriter:
diff --git a/python/pyarrow/lib.pxd b/python/pyarrow/lib.pxd
index 745a049..3e62826 100644
--- a/python/pyarrow/lib.pxd
+++ b/python/pyarrow/lib.pxd
@@ -396,6 +396,8 @@ cdef object pyarrow_wrap_metadata(
 #
 
 cdef public object pyarrow_wrap_array(const shared_ptr[CArray]& sp_array)
+cdef public object pyarrow_wrap_chunked_array(
+    const shared_ptr[CChunkedArray]& sp_array)
 # XXX pyarrow.h calls it `wrap_record_batch`
 cdef public object pyarrow_wrap_batch(const shared_ptr[CRecordBatch]& cbatch)
 cdef public object pyarrow_wrap_buffer(const shared_ptr[CBuffer]& buf)
diff --git a/python/pyarrow/tests/test_parquet.py b/python/pyarrow/tests/test_parquet.py
index 89d3224..5c27a9b 100644
--- a/python/pyarrow/tests/test_parquet.py
+++ b/python/pyarrow/tests/test_parquet.py
@@ -1959,6 +1959,33 @@ def test_large_table_int32_overflow():
     _write_table(table, f)
 
 
+@pytest.mark.large_memory
+def test_binary_array_overflow_to_chunked():
+    # ARROW-3762
+
+    # 2^31 + 1 bytes
+    values = [b'x'] + [
+        b'x' * (1 << 20)
+    ] * 2 * (1 << 10)
+    df = pd.DataFrame({'byte_col': values})
+
+    tbl = pa.Table.from_pandas(df, preserve_index=False)
+
+    buf = io.BytesIO()
+    _write_table(tbl, buf)
+    buf.seek(0)
+    read_tbl = _read_table(buf)
+    buf = None
+
+    col0_data = read_tbl[0].data
+    assert isinstance(col0_data, pa.ChunkedArray)
+
+    # Split up into 16MB chunks. 128 * 16 = 2048, so 129
+    assert col0_data.num_chunks == 129
+
+    assert tbl.equals(read_tbl)
+
+
 def test_index_column_name_duplicate(tempdir):
     data = {
         'close': {