You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2019/07/26 23:54:45 UTC

[arrow] branch master updated: ARROW-3772: [C++][Parquet] Write Parquet dictionary indices directly to DictionaryBuilder rather than routing through dense form

This is an automated email from the ASF dual-hosted git repository.

wesm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new 38b0176  ARROW-3772: [C++][Parquet] Write Parquet dictionary indices directly to DictionaryBuilder rather than routing through dense form
38b0176 is described below

commit 38b01764da445ce6383b60a50d1e9b313857a3d7
Author: Wes McKinney <we...@apache.org>
AuthorDate: Fri Jul 26 18:54:31 2019 -0500

    ARROW-3772: [C++][Parquet] Write Parquet dictionary indices directly to DictionaryBuilder rather than routing through dense form
    
    I sort of ran into a hill of fire ants with this. There is too
    much to do in a single patch so I'm going to list out some of the
    things here and the follow up items that will have to be dealt
    with expediently in follow up patches:
    
    * Adds APIs to DictionaryBuilder to be able to insert dictionary memo values
      (`InsertMemoValues`) and indices (`AppendIndices` as `int64_t*`) separately
    * Adds `GetBatchSpaced` to `RleDecoder`
    * When setting "read_dictionary" for an Arrow column, now
      dictionary indices are appended directly to DictionaryBuilder
      rather than being fully decoded to dense, then rehashed. So
      this will both save memory and improve performance by skipping
      hashing
    * Dictionary fallback is no problem now because subsequent non-encoded pages
      will be appended to the existing dictionary
    * Each time a new dictionary page is encountered, the DictionaryBuilder is
      "flushed" since the order of the dictionary is changed. Note that this is
      only possible now due to ARROW-3144
    * Now that each row group can yield a different dictionary, they can possibly
      have different dictionary index types. This is a bug. Rather than explode the
      size of this patch I opened ARROW-6042 to add an exclusively int32-index
      dictionary builder so we can fix this as follow up
    * The handling of dictionaries in parquet/arrow/reader.cc is a mess -- the
      structure of the column readers with respect to nested data is very messy,
      and as a result it's not possible to do direct dictionary decoding inside a
      nested subfield. We will have to fix this in the course of refactoring for
      improved nested data support
    * Refactoring in parquet/arrow/reader.cc for improved readability
    
    There is much follow up work to do here, so I will get busy with with the other things while this is being reviewed.
    
    Closes #4949 from wesm/ARROW-3772 and squashes the following commits:
    
    026962971 <Wes McKinney> Add missing documentation
    d551b3de6 <Wes McKinney> Add missing Status checks now that wrapper gone
    e78f32b28 <Wes McKinney> Fix MSVC issues
    04c950078 <Wes McKinney> Fix API usages
    2b424409b <Wes McKinney> Fix release builds and Python
    21e0cc714 <Wes McKinney> Fix up unit test now that each row group has a different dictionary
    246e11cfd <Wes McKinney> Compiling again, but segfaulting
    ad253ed1a <Wes McKinney> More refactoring
    418ffe195 <Wes McKinney> Closer to working decoder implementation
    2d6558b41 <Wes McKinney> Add RleDecoder::GetBatchSpaced
    cbeea802e <Wes McKinney> Some refactoring / cleaning to prep
    
    Authored-by: Wes McKinney <we...@apache.org>
    Signed-off-by: Wes McKinney <we...@apache.org>
---
 cpp/src/arrow/array-dict-test.cc                  |  70 ++-
 cpp/src/arrow/array/builder_dict.cc               |  67 +--
 cpp/src/arrow/array/builder_dict.h                |  48 +-
 cpp/src/arrow/table.cc                            |   1 +
 cpp/src/arrow/testing/random.cc                   |  10 +
 cpp/src/arrow/testing/random.h                    |   5 +
 cpp/src/arrow/type.cc                             |   6 +-
 cpp/src/arrow/type_traits.h                       |   6 +
 cpp/src/arrow/util/hashing.h                      |   4 +-
 cpp/src/arrow/util/rle-encoding-test.cc           |  71 ++-
 cpp/src/arrow/util/rle-encoding.h                 |  97 ++++-
 cpp/src/parquet/arrow/arrow-reader-writer-test.cc | 118 ++---
 cpp/src/parquet/arrow/arrow-schema-test.cc        |  13 +-
 cpp/src/parquet/arrow/reader-writer-benchmark.cc  |  22 +-
 cpp/src/parquet/arrow/reader.cc                   | 508 +++++++++++-----------
 cpp/src/parquet/arrow/reader.h                    |  13 +-
 cpp/src/parquet/arrow/schema.cc                   |  24 +-
 cpp/src/parquet/arrow/schema.h                    |  50 ++-
 cpp/src/parquet/arrow/test-util.h                 |  10 +-
 cpp/src/parquet/arrow/writer.cc                   |   2 +
 cpp/src/parquet/column_reader.cc                  | 131 ++++--
 cpp/src/parquet/column_reader.h                   |  16 +-
 cpp/src/parquet/encoding-test.cc                  | 105 ++---
 cpp/src/parquet/encoding.cc                       | 268 +++++++++---
 cpp/src/parquet/encoding.h                        |  93 ++--
 cpp/src/parquet/types.h                           |  21 +-
 python/pyarrow/_parquet.pxd                       |   7 +
 python/pyarrow/_parquet.pyx                       |   3 +-
 28 files changed, 1158 insertions(+), 631 deletions(-)

diff --git a/cpp/src/arrow/array-dict-test.cc b/cpp/src/arrow/array-dict-test.cc
index ed37df3..ff00620 100644
--- a/cpp/src/arrow/array-dict-test.cc
+++ b/cpp/src/arrow/array-dict-test.cc
@@ -53,7 +53,7 @@ TYPED_TEST_CASE(TestDictionaryBuilder, PrimitiveDictionaries);
 TYPED_TEST(TestDictionaryBuilder, Basic) {
   using c_type = typename TypeParam::c_type;
 
-  DictionaryBuilder<TypeParam> builder(default_memory_pool());
+  DictionaryBuilder<TypeParam> builder;
   ASSERT_OK(builder.Append(static_cast<c_type>(1)));
   ASSERT_OK(builder.Append(static_cast<c_type>(2)));
   ASSERT_OK(builder.Append(static_cast<c_type>(1)));
@@ -81,7 +81,7 @@ TYPED_TEST(TestDictionaryBuilder, ArrayInit) {
   auto dict_array = ArrayFromJSON(value_type, "[1, 2]");
   auto dict_type = dictionary(int8(), value_type);
 
-  DictionaryBuilder<TypeParam> builder(dict_array, default_memory_pool());
+  DictionaryBuilder<TypeParam> builder(dict_array);
   ASSERT_OK(builder.Append(static_cast<c_type>(1)));
   ASSERT_OK(builder.Append(static_cast<c_type>(2)));
   ASSERT_OK(builder.Append(static_cast<c_type>(1)));
@@ -134,7 +134,7 @@ TYPED_TEST(TestDictionaryBuilder, ArrayConversion) {
   auto type = std::make_shared<TypeParam>();
 
   auto intermediate_result = ArrayFromJSON(type, "[1, 2, 1]");
-  DictionaryBuilder<TypeParam> dictionary_builder(default_memory_pool());
+  DictionaryBuilder<TypeParam> dictionary_builder;
   ASSERT_OK(dictionary_builder.AppendArray(*intermediate_result));
   std::shared_ptr<Array> result;
   ASSERT_OK(dictionary_builder.Finish(&result));
@@ -154,7 +154,7 @@ TYPED_TEST(TestDictionaryBuilder, DoubleTableSize) {
   // Skip this test for (u)int8
   if (sizeof(Scalar) > 1) {
     // Build the dictionary Array
-    DictionaryBuilder<TypeParam> builder(default_memory_pool());
+    DictionaryBuilder<TypeParam> builder;
     // Build expected data
     NumericBuilder<TypeParam> dict_builder;
     Int16Builder int_builder;
@@ -192,7 +192,7 @@ TYPED_TEST(TestDictionaryBuilder, DeltaDictionary) {
   using c_type = typename TypeParam::c_type;
   auto type = std::make_shared<TypeParam>();
 
-  DictionaryBuilder<TypeParam> builder(default_memory_pool());
+  DictionaryBuilder<TypeParam> builder;
 
   ASSERT_OK(builder.Append(static_cast<c_type>(1)));
   ASSERT_OK(builder.Append(static_cast<c_type>(2)));
@@ -232,7 +232,7 @@ TYPED_TEST(TestDictionaryBuilder, DoubleDeltaDictionary) {
   auto type = std::make_shared<TypeParam>();
   auto dict_type = dictionary(int8(), type);
 
-  DictionaryBuilder<TypeParam> builder(default_memory_pool());
+  DictionaryBuilder<TypeParam> builder;
 
   ASSERT_OK(builder.Append(static_cast<c_type>(1)));
   ASSERT_OK(builder.Append(static_cast<c_type>(2)));
@@ -284,7 +284,7 @@ TYPED_TEST(TestDictionaryBuilder, DoubleDeltaDictionary) {
 
 TEST(TestStringDictionaryBuilder, Basic) {
   // Build the dictionary Array
-  StringDictionaryBuilder builder(default_memory_pool());
+  StringDictionaryBuilder builder;
   ASSERT_OK(builder.Append("test"));
   ASSERT_OK(builder.Append("test2"));
   ASSERT_OK(builder.Append("test"));
@@ -301,12 +301,43 @@ TEST(TestStringDictionaryBuilder, Basic) {
   ASSERT_TRUE(expected.Equals(result));
 }
 
+TEST(TestStringDictionaryBuilder, AppendIndices) {
+  auto ex_dict = ArrayFromJSON(utf8(), R"(["c", "a", "b", "d"])");
+  auto invalid_dict = ArrayFromJSON(binary(), R"(["e", "f"])");
+
+  StringDictionaryBuilder builder;
+  ASSERT_OK(builder.InsertMemoValues(*ex_dict));
+
+  // Inserting again should have no effect
+  ASSERT_OK(builder.InsertMemoValues(*ex_dict));
+
+  // Type mismatch
+  ASSERT_RAISES(Invalid, builder.InsertMemoValues(*invalid_dict));
+
+  std::vector<int64_t> raw_indices = {0, 1, 2, -1, 3};
+  std::vector<uint8_t> is_valid = {1, 1, 1, 0, 1};
+  for (int i = 0; i < 2; ++i) {
+    ASSERT_OK(builder.AppendIndices(
+        raw_indices.data(), static_cast<int64_t>(raw_indices.size()), is_valid.data()));
+  }
+
+  ASSERT_EQ(10, builder.length());
+
+  std::shared_ptr<Array> result;
+  ASSERT_OK(builder.Finish(&result));
+
+  auto ex_indices = ArrayFromJSON(int8(), R"([0, 1, 2, null, 3, 0, 1, 2, null, 3])");
+  auto dtype = dictionary(int8(), utf8());
+  DictionaryArray expected(dtype, ex_indices, ex_dict);
+  ASSERT_TRUE(expected.Equals(result));
+}
+
 TEST(TestStringDictionaryBuilder, ArrayInit) {
   auto dict_array = ArrayFromJSON(utf8(), R"(["test", "test2"])");
   auto int_array = ArrayFromJSON(int8(), "[0, 1, 0]");
 
   // Build the dictionary Array
-  StringDictionaryBuilder builder(dict_array, default_memory_pool());
+  StringDictionaryBuilder builder(dict_array);
   ASSERT_OK(builder.Append("test"));
   ASSERT_OK(builder.Append("test2"));
   ASSERT_OK(builder.Append("test"));
@@ -345,7 +376,7 @@ TEST(TestStringDictionaryBuilder, MakeBuilder) {
 // ARROW-4367
 TEST(TestStringDictionaryBuilder, OnlyNull) {
   // Build the dictionary Array
-  StringDictionaryBuilder builder(default_memory_pool());
+  StringDictionaryBuilder builder;
   ASSERT_OK(builder.AppendNull());
 
   std::shared_ptr<Array> result;
@@ -362,7 +393,7 @@ TEST(TestStringDictionaryBuilder, OnlyNull) {
 
 TEST(TestStringDictionaryBuilder, DoubleTableSize) {
   // Build the dictionary Array
-  StringDictionaryBuilder builder(default_memory_pool());
+  StringDictionaryBuilder builder;
   // Build expected data
   StringBuilder str_builder;
   Int16Builder int_builder;
@@ -398,7 +429,7 @@ TEST(TestStringDictionaryBuilder, DoubleTableSize) {
 
 TEST(TestStringDictionaryBuilder, DeltaDictionary) {
   // Build the dictionary Array
-  StringDictionaryBuilder builder(default_memory_pool());
+  StringDictionaryBuilder builder;
   ASSERT_OK(builder.Append("test"));
   ASSERT_OK(builder.Append("test2"));
   ASSERT_OK(builder.Append("test"));
@@ -432,7 +463,7 @@ TEST(TestStringDictionaryBuilder, DeltaDictionary) {
 TEST(TestStringDictionaryBuilder, BigDeltaDictionary) {
   constexpr int16_t kTestLength = 2048;
   // Build the dictionary Array
-  StringDictionaryBuilder builder(default_memory_pool());
+  StringDictionaryBuilder builder;
 
   StringBuilder str_builder1;
   Int16Builder int_builder1;
@@ -518,8 +549,7 @@ TEST(TestStringDictionaryBuilder, BigDeltaDictionary) {
 
 TEST(TestFixedSizeBinaryDictionaryBuilder, Basic) {
   // Build the dictionary Array
-  DictionaryBuilder<FixedSizeBinaryType> builder(arrow::fixed_size_binary(4),
-                                                 default_memory_pool());
+  DictionaryBuilder<FixedSizeBinaryType> builder(arrow::fixed_size_binary(4));
   std::vector<uint8_t> test{12, 12, 11, 12};
   std::vector<uint8_t> test2{12, 12, 11, 11};
   ASSERT_OK(builder.Append(test.data()));
@@ -555,7 +585,7 @@ TEST(TestFixedSizeBinaryDictionaryBuilder, ArrayInit) {
   auto value_type = fixed_size_binary(4);
   auto dict_array = ArrayFromJSON(value_type, R"(["abcd", "wxyz"])");
   util::string_view test = "abcd", test2 = "wxyz";
-  DictionaryBuilder<FixedSizeBinaryType> builder(dict_array, default_memory_pool());
+  DictionaryBuilder<FixedSizeBinaryType> builder(dict_array);
   ASSERT_OK(builder.Append(test));
   ASSERT_OK(builder.Append(test2));
   ASSERT_OK(builder.Append(test));
@@ -597,7 +627,7 @@ TEST(TestFixedSizeBinaryDictionaryBuilder, DeltaDictionary) {
   auto value_type = arrow::fixed_size_binary(4);
   auto dict_type = dictionary(int8(), value_type);
 
-  DictionaryBuilder<FixedSizeBinaryType> builder(value_type, default_memory_pool());
+  DictionaryBuilder<FixedSizeBinaryType> builder(value_type);
   std::vector<uint8_t> test{12, 12, 11, 12};
   std::vector<uint8_t> test2{12, 12, 11, 11};
   std::vector<uint8_t> test3{12, 12, 11, 10};
@@ -656,7 +686,7 @@ TEST(TestFixedSizeBinaryDictionaryBuilder, DoubleTableSize) {
   auto value_type = arrow::fixed_size_binary(4);
   auto dict_type = dictionary(int16(), value_type);
 
-  DictionaryBuilder<FixedSizeBinaryType> builder(value_type, default_memory_pool());
+  DictionaryBuilder<FixedSizeBinaryType> builder(value_type);
   // Build expected data
   FixedSizeBinaryBuilder fsb_builder(value_type);
   Int16Builder int_builder;
@@ -693,7 +723,7 @@ TEST(TestFixedSizeBinaryDictionaryBuilder, DoubleTableSize) {
 TEST(TestFixedSizeBinaryDictionaryBuilder, InvalidTypeAppend) {
   // Build the dictionary Array
   auto value_type = arrow::fixed_size_binary(4);
-  DictionaryBuilder<FixedSizeBinaryType> builder(value_type, default_memory_pool());
+  DictionaryBuilder<FixedSizeBinaryType> builder(value_type);
   // Build an array with different byte width
   FixedSizeBinaryBuilder fsb_builder(arrow::fixed_size_binary(5));
   std::vector<uint8_t> value{100, 1, 1, 1, 1};
@@ -707,7 +737,7 @@ TEST(TestFixedSizeBinaryDictionaryBuilder, InvalidTypeAppend) {
 TEST(TestDecimalDictionaryBuilder, Basic) {
   // Build the dictionary Array
   auto decimal_type = arrow::decimal(2, 0);
-  DictionaryBuilder<FixedSizeBinaryType> builder(decimal_type, default_memory_pool());
+  DictionaryBuilder<FixedSizeBinaryType> builder(decimal_type);
 
   // Test data
   std::vector<Decimal128> test{12, 12, 11, 12};
@@ -730,7 +760,7 @@ TEST(TestDecimalDictionaryBuilder, DoubleTableSize) {
   const auto& decimal_type = arrow::decimal(21, 0);
 
   // Build the dictionary Array
-  DictionaryBuilder<FixedSizeBinaryType> builder(decimal_type, default_memory_pool());
+  DictionaryBuilder<FixedSizeBinaryType> builder(decimal_type);
 
   // Build expected data
   FixedSizeBinaryBuilder fsb_builder(decimal_type);
diff --git a/cpp/src/arrow/array/builder_dict.cc b/cpp/src/arrow/array/builder_dict.cc
index 8d5814a..6034465 100644
--- a/cpp/src/arrow/array/builder_dict.cc
+++ b/cpp/src/arrow/array/builder_dict.cc
@@ -40,6 +40,16 @@ using internal::checked_cast;
 // ----------------------------------------------------------------------
 // DictionaryType unification
 
+template <typename T, typename Out = void>
+using enable_if_memoize = typename std::enable_if<
+    !std::is_same<typename internal::DictionaryTraits<T>::MemoTableType, void>::value,
+    Out>::type;
+
+template <typename T, typename Out = void>
+using enable_if_no_memoize = typename std::enable_if<
+    std::is_same<typename internal::DictionaryTraits<T>::MemoTableType, void>::value,
+    Out>::type;
+
 struct UnifyDictionaryValues {
   MemoryPool* pool_;
   std::shared_ptr<DataType> value_type_;
@@ -48,21 +58,15 @@ struct UnifyDictionaryValues {
   std::shared_ptr<Array>* out_values_;
   std::vector<std::vector<int32_t>>* out_transpose_maps_;
 
-  Status Visit(const DataType&, void* = nullptr) {
+  template <typename T>
+  enable_if_no_memoize<T, Status> Visit(const T&) {
     // Default implementation for non-dictionary-supported datatypes
     return Status::NotImplemented("Unification of ", value_type_,
                                   " dictionaries is not implemented");
   }
 
-  Status Visit(const DayTimeIntervalType&, void* = nullptr) {
-    return Status::NotImplemented(
-        "Unification of DayTime"
-        " dictionaries is not implemented");
-  }
-
   template <typename T>
-  Status Visit(const T&,
-               typename internal::DictionaryTraits<T>::MemoTableType* = nullptr) {
+  enable_if_memoize<T, Status> Visit(const T&) {
     using ArrayType = typename TypeTraits<T>::ArrayType;
     using DictTraits = typename internal::DictionaryTraits<T>;
     using MemoTableType = typename DictTraits::MemoTableType;
@@ -163,14 +167,14 @@ class internal::DictionaryMemoTable::DictionaryMemoTableImpl {
     std::shared_ptr<DataType> value_type_;
     std::unique_ptr<MemoTable>* memo_table_;
 
-    Status Visit(const DataType&, void* = nullptr) {
+    template <typename T>
+    enable_if_no_memoize<T, Status> Visit(const T&) {
       return Status::NotImplemented("Initialization of ", value_type_,
                                     " memo table is not implemented");
     }
 
     template <typename T>
-    Status Visit(const T&,
-                 typename internal::DictionaryTraits<T>::MemoTableType* = nullptr) {
+    enable_if_memoize<T, Status> Visit(const T&) {
       using MemoTable = typename internal::DictionaryTraits<T>::MemoTableType;
       memo_table_->reset(new MemoTable(0));
       return Status::OK();
@@ -179,23 +183,24 @@ class internal::DictionaryMemoTable::DictionaryMemoTableImpl {
 
   struct ArrayValuesInserter {
     DictionaryMemoTableImpl* impl_;
+    const Array& values_;
 
     template <typename T>
-    Status Visit(const T& array) {
-      return InsertValues(array.type(), array);
+    Status Visit(const T& type) {
+      using ArrayType = typename TypeTraits<T>::ArrayType;
+      return InsertValues(type, checked_cast<const ArrayType&>(values_));
     }
 
    private:
-    template <typename DataType, typename Array>
-    Status InsertValues(const DataType& type, const Array&, void* = nullptr) {
+    template <typename DType, typename ArrayType>
+    enable_if_no_memoize<DType, Status> InsertValues(const DType& type,
+                                                     const ArrayType&) {
       return Status::NotImplemented("Inserting array values of ", type,
                                     " is not implemented");
     }
 
-    template <typename DataType, typename Array>
-    Status InsertValues(
-        const DataType&, const Array& array,
-        typename internal::DictionaryTraits<DataType>::MemoTableType* = nullptr) {
+    template <typename DType, typename ArrayType>
+    enable_if_memoize<DType, Status> InsertValues(const DType&, const ArrayType& array) {
       for (int64_t i = 0; i < array.length(); ++i) {
         ARROW_IGNORE_EXPR(impl_->GetOrInsert(array.GetView(i)));
       }
@@ -210,14 +215,14 @@ class internal::DictionaryMemoTable::DictionaryMemoTableImpl {
     int64_t start_offset_;
     std::shared_ptr<ArrayData>* out_;
 
-    Status Visit(const DataType&, void* = nullptr) {
+    template <typename T>
+    enable_if_no_memoize<T, Status> Visit(const T&) {
       return Status::NotImplemented("Getting array data of ", value_type_,
                                     " is not implemented");
     }
 
     template <typename T>
-    Status Visit(const T&,
-                 typename internal::DictionaryTraits<T>::MemoTableType* = nullptr) {
+    enable_if_memoize<T, Status> Visit(const T&) {
       using ConcreteMemoTable = typename internal::DictionaryTraits<T>::MemoTableType;
       auto memo_table = static_cast<ConcreteMemoTable*>(memo_table_);
       return internal::DictionaryTraits<T>::GetDictionaryArrayData(
@@ -232,9 +237,13 @@ class internal::DictionaryMemoTable::DictionaryMemoTableImpl {
     ARROW_IGNORE_EXPR(VisitTypeInline(*type_, &visitor));
   }
 
-  Status InsertValues(const std::shared_ptr<Array>& array) {
-    ArrayValuesInserter visitor{this};
-    return VisitArrayInline(*array, &visitor);
+  Status InsertValues(const Array& array) {
+    if (!array.type()->Equals(*type_)) {
+      return Status::Invalid("Array value type does not match memo type: ",
+                             array.type()->ToString());
+    }
+    ArrayValuesInserter visitor{this, array};
+    return VisitTypeInline(*array.type(), &visitor);
   }
 
   template <typename T>
@@ -267,7 +276,7 @@ internal::DictionaryMemoTable::DictionaryMemoTable(const std::shared_ptr<DataTyp
 internal::DictionaryMemoTable::DictionaryMemoTable(
     const std::shared_ptr<Array>& dictionary)
     : impl_(new DictionaryMemoTableImpl(dictionary->type())) {
-  ARROW_IGNORE_EXPR(impl_->InsertValues(dictionary));
+  ARROW_IGNORE_EXPR(impl_->InsertValues(*dictionary));
 }
 
 internal::DictionaryMemoTable::~DictionaryMemoTable() = default;
@@ -325,6 +334,10 @@ Status internal::DictionaryMemoTable::GetArrayData(MemoryPool* pool, int64_t sta
   return impl_->GetArrayData(pool, start_offset, out);
 }
 
+Status internal::DictionaryMemoTable::InsertValues(const Array& array) {
+  return impl_->InsertValues(array);
+}
+
 int32_t internal::DictionaryMemoTable::size() const { return impl_->size(); }
 
 }  // namespace arrow
diff --git a/cpp/src/arrow/array/builder_dict.h b/cpp/src/arrow/array/builder_dict.h
index 93cad29..e1cfe8a 100644
--- a/cpp/src/arrow/array/builder_dict.h
+++ b/cpp/src/arrow/array/builder_dict.h
@@ -74,6 +74,9 @@ class ARROW_EXPORT DictionaryMemoTable {
   Status GetArrayData(MemoryPool* pool, int64_t start_offset,
                       std::shared_ptr<ArrayData>* out);
 
+  /// \brief Insert new memo values
+  Status InsertValues(const Array& values);
+
   int32_t size() const;
 
  private:
@@ -103,7 +106,7 @@ class DictionaryBuilder : public ArrayBuilder {
   DictionaryBuilder(
       typename std::enable_if<!std::is_base_of<FixedSizeBinaryType, T1>::value,
                               const std::shared_ptr<DataType>&>::type type,
-      MemoryPool* pool)
+      MemoryPool* pool = default_memory_pool())
       : ArrayBuilder(type, pool),
         memo_table_(new internal::DictionaryMemoTable(type)),
         delta_offset_(0),
@@ -114,7 +117,7 @@ class DictionaryBuilder : public ArrayBuilder {
   explicit DictionaryBuilder(
       typename std::enable_if<std::is_base_of<FixedSizeBinaryType, T1>::value,
                               const std::shared_ptr<DataType>&>::type type,
-      MemoryPool* pool)
+      MemoryPool* pool = default_memory_pool())
       : ArrayBuilder(type, pool),
         memo_table_(new internal::DictionaryMemoTable(type)),
         delta_offset_(0),
@@ -123,10 +126,12 @@ class DictionaryBuilder : public ArrayBuilder {
 
   template <typename T1 = T>
   explicit DictionaryBuilder(
-      typename std::enable_if<TypeTraits<T1>::is_parameter_free, MemoryPool*>::type pool)
+      typename std::enable_if<TypeTraits<T1>::is_parameter_free, MemoryPool*>::type pool =
+          default_memory_pool())
       : DictionaryBuilder<T1>(TypeTraits<T1>::type_singleton(), pool) {}
 
-  DictionaryBuilder(const std::shared_ptr<Array>& dictionary, MemoryPool* pool)
+  DictionaryBuilder(const std::shared_ptr<Array>& dictionary,
+                    MemoryPool* pool = default_memory_pool())
       : ArrayBuilder(dictionary->type(), pool),
         memo_table_(new internal::DictionaryMemoTable(dictionary)),
         delta_offset_(0),
@@ -175,6 +180,27 @@ class DictionaryBuilder : public ArrayBuilder {
     return values_builder_.AppendNulls(length);
   }
 
+  /// \brief Insert values into the dictionary's memo, but do not append any
+  /// indices. Can be used to initialize a new builder with known dictionary
+  /// values
+  /// \param[in] values dictionary values to add to memo. Type must match
+  /// builder type
+  Status InsertMemoValues(const Array& values) {
+    return memo_table_->InsertValues(values);
+  }
+
+  /// \brief Append dictionary indices directly without modifying memo
+  ///
+  /// NOTE: Experimental API
+  Status AppendIndices(const int64_t* values, int64_t length,
+                       const uint8_t* valid_bytes = NULLPTR) {
+    int64_t null_count_before = values_builder_.null_count();
+    ARROW_RETURN_NOT_OK(values_builder_.AppendValues(values, length, valid_bytes));
+    length_ += length;
+    null_count_ += values_builder_.null_count() - null_count_before;
+    return Status::OK();
+  }
+
   /// \brief Append a whole dense array to the builder
   template <typename T1 = T>
   Status AppendArray(
@@ -277,12 +303,14 @@ class DictionaryBuilder : public ArrayBuilder {
 template <>
 class DictionaryBuilder<NullType> : public ArrayBuilder {
  public:
-  DictionaryBuilder(const std::shared_ptr<DataType>& type, MemoryPool* pool)
+  DictionaryBuilder(const std::shared_ptr<DataType>& type,
+                    MemoryPool* pool = default_memory_pool())
       : ArrayBuilder(type, pool), values_builder_(pool) {}
-  explicit DictionaryBuilder(MemoryPool* pool)
+  explicit DictionaryBuilder(MemoryPool* pool = default_memory_pool())
       : ArrayBuilder(null(), pool), values_builder_(pool) {}
 
-  DictionaryBuilder(const std::shared_ptr<Array>& dictionary, MemoryPool* pool)
+  DictionaryBuilder(const std::shared_ptr<Array>& dictionary,
+                    MemoryPool* pool = default_memory_pool())
       : ArrayBuilder(dictionary->type(), pool), values_builder_(pool) {}
 
   /// \brief Append a scalar null value
@@ -340,8 +368,11 @@ class DictionaryBuilder<NullType> : public ArrayBuilder {
 class ARROW_EXPORT BinaryDictionaryBuilder : public DictionaryBuilder<BinaryType> {
  public:
   using DictionaryBuilder::Append;
+  using DictionaryBuilder::AppendIndices;
   using DictionaryBuilder::DictionaryBuilder;
 
+  BinaryDictionaryBuilder() : BinaryDictionaryBuilder(default_memory_pool()) {}
+
   Status Append(const uint8_t* value, int32_t length) {
     return Append(reinterpret_cast<const char*>(value), length);
   }
@@ -355,8 +386,11 @@ class ARROW_EXPORT BinaryDictionaryBuilder : public DictionaryBuilder<BinaryType
 class ARROW_EXPORT StringDictionaryBuilder : public DictionaryBuilder<StringType> {
  public:
   using DictionaryBuilder::Append;
+  using DictionaryBuilder::AppendIndices;
   using DictionaryBuilder::DictionaryBuilder;
 
+  StringDictionaryBuilder() : StringDictionaryBuilder(default_memory_pool()) {}
+
   Status Append(const uint8_t* value, int32_t length) {
     return Append(reinterpret_cast<const char*>(value), length);
   }
diff --git a/cpp/src/arrow/table.cc b/cpp/src/arrow/table.cc
index 907cc8c..446010f 100644
--- a/cpp/src/arrow/table.cc
+++ b/cpp/src/arrow/table.cc
@@ -42,6 +42,7 @@ using internal::checked_cast;
 ChunkedArray::ChunkedArray(const ArrayVector& chunks) : chunks_(chunks) {
   length_ = 0;
   null_count_ = 0;
+
   ARROW_CHECK_GT(chunks.size(), 0)
       << "cannot construct ChunkedArray from empty vector and omitted type";
   type_ = chunks[0]->type();
diff --git a/cpp/src/arrow/testing/random.cc b/cpp/src/arrow/testing/random.cc
index f693a45..3346e63 100644
--- a/cpp/src/arrow/testing/random.cc
+++ b/cpp/src/arrow/testing/random.cc
@@ -178,6 +178,16 @@ std::shared_ptr<arrow::Array> RandomArrayGenerator::String(int64_t size,
   return result;
 }
 
+std::shared_ptr<arrow::Array> RandomArrayGenerator::BinaryWithRepeats(
+    int64_t size, int64_t unique, int32_t min_length, int32_t max_length,
+    double null_probability) {
+  auto strings =
+      StringWithRepeats(size, unique, min_length, max_length, null_probability);
+  auto data = strings->data()->Copy();
+  data->type = binary();
+  return MakeArray(data);
+}
+
 std::shared_ptr<arrow::Array> RandomArrayGenerator::StringWithRepeats(
     int64_t size, int64_t unique, int32_t min_length, int32_t max_length,
     double null_probability) {
diff --git a/cpp/src/arrow/testing/random.h b/cpp/src/arrow/testing/random.h
index 6b188fd..3126a69 100644
--- a/cpp/src/arrow/testing/random.h
+++ b/cpp/src/arrow/testing/random.h
@@ -230,6 +230,11 @@ class ARROW_EXPORT RandomArrayGenerator {
                                                   int32_t min_length, int32_t max_length,
                                                   double null_probability);
 
+  /// \brief Like StringWithRepeats but return BinaryArray
+  std::shared_ptr<arrow::Array> BinaryWithRepeats(int64_t size, int64_t unique,
+                                                  int32_t min_length, int32_t max_length,
+                                                  double null_probability);
+
  private:
   SeedType seed() { return seed_distribution_(seed_rng_); }
 
diff --git a/cpp/src/arrow/type.cc b/cpp/src/arrow/type.cc
index 54e0103..76be841 100644
--- a/cpp/src/arrow/type.cc
+++ b/cpp/src/arrow/type.cc
@@ -493,7 +493,11 @@ Schema::~Schema() {}
 
 int Schema::num_fields() const { return static_cast<int>(impl_->fields_.size()); }
 
-std::shared_ptr<Field> Schema::field(int i) const { return impl_->fields_[i]; }
+std::shared_ptr<Field> Schema::field(int i) const {
+  DCHECK_GE(i, 0);
+  DCHECK_LT(i, num_fields());
+  return impl_->fields_[i];
+}
 
 const std::vector<std::shared_ptr<Field>>& Schema::fields() const {
   return impl_->fields_;
diff --git a/cpp/src/arrow/type_traits.h b/cpp/src/arrow/type_traits.h
index 4902f5c..c4c549f 100644
--- a/cpp/src/arrow/type_traits.h
+++ b/cpp/src/arrow/type_traits.h
@@ -271,6 +271,12 @@ struct CTypeTraits<char*> : public TypeTraits<StringType> {
 };
 
 template <>
+struct CTypeTraits<DayTimeIntervalType::DayMilliseconds>
+    : public TypeTraits<DayTimeIntervalType> {
+  using ArrowType = DayTimeIntervalType;
+};
+
+template <>
 struct TypeTraits<ListType> {
   using ArrayType = ListArray;
   using BuilderType = ListBuilder;
diff --git a/cpp/src/arrow/util/hashing.h b/cpp/src/arrow/util/hashing.h
index bad2b49..6a00c1e 100644
--- a/cpp/src/arrow/util/hashing.h
+++ b/cpp/src/arrow/util/hashing.h
@@ -833,7 +833,9 @@ static inline Status ComputeNullBitmap(MemoryPool* pool, const MemoTableType& me
 }
 
 template <typename T, typename Enable = void>
-struct DictionaryTraits {};
+struct DictionaryTraits {
+  using MemoTableType = void;
+};
 
 template <>
 struct DictionaryTraits<BooleanType> {
diff --git a/cpp/src/arrow/util/rle-encoding-test.cc b/cpp/src/arrow/util/rle-encoding-test.cc
index 3d6da69..1ea6394 100644
--- a/cpp/src/arrow/util/rle-encoding-test.cc
+++ b/cpp/src/arrow/util/rle-encoding-test.cc
@@ -26,6 +26,10 @@
 
 #include <boost/utility.hpp>  // IWYU pragma: export
 
+#include "arrow/array.h"
+#include "arrow/buffer.h"
+#include "arrow/testing/random.h"
+#include "arrow/type.h"
 #include "arrow/util/bit-stream-utils.h"
 #include "arrow/util/bit-util.h"
 #include "arrow/util/rle-encoding.h"
@@ -242,12 +246,13 @@ bool CheckRoundTrip(const std::vector<int>& values, int bit_width) {
 
   // Verify batch read
   {
-    RleDecoder decoder(buffer, len, bit_width);
+    RleDecoder decoder(buffer, encoded_len, bit_width);
     std::vector<int> values_read(values.size());
     if (static_cast<int>(values.size()) !=
         decoder.GetBatch(values_read.data(), static_cast<int>(values.size()))) {
       return false;
     }
+
     if (values != values_read) {
       return false;
     }
@@ -464,5 +469,69 @@ TEST(BitRle, Overflow) {
   }
 }
 
+template <typename Type>
+void CheckRoundTripSpaced(const Array& data, int bit_width) {
+  using ArrayType = typename TypeTraits<Type>::ArrayType;
+  using T = typename Type::c_type;
+
+  int num_values = static_cast<int>(data.length());
+  int buffer_size = RleEncoder::MaxBufferSize(bit_width, num_values);
+
+  const T* values = static_cast<const ArrayType&>(data).raw_values();
+
+  std::vector<uint8_t> buffer(buffer_size);
+  RleEncoder encoder(buffer.data(), buffer_size, bit_width);
+  for (int i = 0; i < num_values; ++i) {
+    if (data.IsValid(i)) {
+      if (!encoder.Put(static_cast<uint64_t>(values[i]))) {
+        FAIL() << "Encoding failed";
+      }
+    }
+  }
+  int encoded_size = encoder.Flush();
+
+  // Verify batch read
+  RleDecoder decoder(buffer.data(), encoded_size, bit_width);
+  std::vector<T> values_read(num_values);
+
+  if (num_values != decoder.GetBatchSpaced(
+                        num_values, static_cast<int>(data.null_count()),
+                        data.null_bitmap_data(), data.offset(), values_read.data())) {
+    FAIL();
+  }
+
+  for (int64_t i = 0; i < num_values; ++i) {
+    if (data.IsValid(i)) {
+      if (values_read[i] != values[i]) {
+        FAIL() << "Index " << i << " read " << values_read[i] << " but should be "
+               << values[i];
+      }
+    }
+  }
+}
+
+template <typename T>
+struct GetBatchSpacedTestCase {
+  T max_value;
+  int64_t size;
+  double null_probability;
+  int bit_width;
+};
+
+TEST(RleDecoder, GetBatchSpaced) {
+  uint32_t kSeed = 1337;
+  ::arrow::random::RandomArrayGenerator rand(kSeed);
+
+  std::vector<GetBatchSpacedTestCase<int32_t>> int32_cases{
+      {1, 100000, 0.01, 1}, {1, 100000, 0.1, 1},    {1, 100000, 0.5, 1},
+      {4, 100000, 0.05, 3}, {100, 100000, 0.05, 7},
+  };
+  for (auto case_ : int32_cases) {
+    auto arr = rand.Int32(case_.size, /*min=*/0, case_.max_value, case_.null_probability);
+    CheckRoundTripSpaced<Int32Type>(*arr, case_.bit_width);
+    CheckRoundTripSpaced<Int32Type>(*arr->Slice(1), case_.bit_width);
+  }
+}
+
 }  // namespace util
 }  // namespace arrow
diff --git a/cpp/src/arrow/util/rle-encoding.h b/cpp/src/arrow/util/rle-encoding.h
index 739158a..9961602 100644
--- a/cpp/src/arrow/util/rle-encoding.h
+++ b/cpp/src/arrow/util/rle-encoding.h
@@ -116,6 +116,11 @@ class RleDecoder {
   template <typename T>
   int GetBatch(T* values, int batch_size);
 
+  /// Like GetBatch but add spacing for null entries
+  template <typename T>
+  int GetBatchSpaced(int batch_size, int null_count, const uint8_t* valid_bits,
+                     int64_t valid_bits_offset, T* out);
+
   /// Like GetBatch but the values are then decoded using the provided dictionary
   template <typename T>
   int GetBatchWithDict(const T* dictionary, T* values, int batch_size);
@@ -311,6 +316,82 @@ inline int RleDecoder::GetBatch(T* values, int batch_size) {
 }
 
 template <typename T>
+inline int RleDecoder::GetBatchSpaced(int batch_size, int null_count,
+                                      const uint8_t* valid_bits,
+                                      int64_t valid_bits_offset, T* out) {
+  DCHECK_GE(bit_width_, 0);
+  int values_read = 0;
+  int remaining_nulls = null_count;
+
+  arrow::internal::BitmapReader bit_reader(valid_bits, valid_bits_offset, batch_size);
+
+  while (values_read < batch_size) {
+    bool is_valid = bit_reader.IsSet();
+    bit_reader.Next();
+
+    if (is_valid) {
+      if ((repeat_count_ == 0) && (literal_count_ == 0)) {
+        if (!NextCounts<T>()) return values_read;
+      }
+      if (repeat_count_ > 0) {
+        // The current index is already valid, we don't need to check that again
+        int repeat_batch = 1;
+        repeat_count_--;
+
+        while (repeat_count_ > 0 && (values_read + repeat_batch) < batch_size) {
+          if (bit_reader.IsSet()) {
+            repeat_count_--;
+          } else {
+            remaining_nulls--;
+          }
+          repeat_batch++;
+
+          bit_reader.Next();
+        }
+        std::fill(out, out + repeat_batch, static_cast<T>(current_value_));
+        out += repeat_batch;
+        values_read += repeat_batch;
+      } else if (literal_count_ > 0) {
+        int literal_batch = std::min(batch_size - values_read - remaining_nulls,
+                                     static_cast<int>(literal_count_));
+
+        // Decode the literals
+        constexpr int kBufferSize = 1024;
+        T indices[kBufferSize];
+        literal_batch = std::min(literal_batch, kBufferSize);
+        int actual_read = bit_reader_.GetBatch(bit_width_, &indices[0], literal_batch);
+        DCHECK_EQ(actual_read, literal_batch);
+
+        int skipped = 0;
+        int literals_read = 1;
+        *out++ = indices[0];
+
+        // Read the first bitset to the end
+        while (literals_read < literal_batch) {
+          if (bit_reader.IsSet()) {
+            *out = indices[literals_read];
+            literals_read++;
+          } else {
+            skipped++;
+          }
+          ++out;
+          bit_reader.Next();
+        }
+        literal_count_ -= literal_batch;
+        values_read += literal_batch + skipped;
+        remaining_nulls -= skipped;
+      }
+    } else {
+      ++out;
+      values_read++;
+      remaining_nulls--;
+    }
+  }
+
+  return values_read;
+}
+
+template <typename T>
 inline int RleDecoder::GetBatchWithDict(const T* dictionary, T* values, int batch_size) {
   DCHECK_GE(bit_width_, 0);
   int values_read = 0;
@@ -346,9 +427,8 @@ inline int RleDecoder::GetBatchWithDict(const T* dictionary, T* values, int batc
 }
 
 template <typename T>
-inline int RleDecoder::GetBatchWithDictSpaced(const T* dictionary, T* values,
-                                              int batch_size, int null_count,
-                                              const uint8_t* valid_bits,
+inline int RleDecoder::GetBatchWithDictSpaced(const T* dictionary, T* out, int batch_size,
+                                              int null_count, const uint8_t* valid_bits,
                                               int64_t valid_bits_offset) {
   DCHECK_GE(bit_width_, 0);
   int values_read = 0;
@@ -380,7 +460,8 @@ inline int RleDecoder::GetBatchWithDictSpaced(const T* dictionary, T* values,
 
           bit_reader.Next();
         }
-        std::fill(values + values_read, values + values_read + repeat_batch, value);
+        std::fill(out, out + repeat_batch, value);
+        out += repeat_batch;
         values_read += repeat_batch;
       } else if (literal_count_ > 0) {
         int literal_batch = std::min(batch_size - values_read - remaining_nulls,
@@ -395,18 +476,17 @@ inline int RleDecoder::GetBatchWithDictSpaced(const T* dictionary, T* values,
 
         int skipped = 0;
         int literals_read = 1;
-        values[values_read] = dictionary[indices[0]];
+        *out++ = dictionary[indices[0]];
 
         // Read the first bitset to the end
         while (literals_read < literal_batch) {
           if (bit_reader.IsSet()) {
-            values[values_read + literals_read + skipped] =
-                dictionary[indices[literals_read]];
+            *out = dictionary[indices[literals_read]];
             literals_read++;
           } else {
             skipped++;
           }
-
+          ++out;
           bit_reader.Next();
         }
         literal_count_ -= literal_batch;
@@ -414,6 +494,7 @@ inline int RleDecoder::GetBatchWithDictSpaced(const T* dictionary, T* values,
         remaining_nulls -= skipped;
       }
     } else {
+      ++out;
       values_read++;
       remaining_nulls--;
     }
diff --git a/cpp/src/parquet/arrow/arrow-reader-writer-test.cc b/cpp/src/parquet/arrow/arrow-reader-writer-test.cc
index c5d638d..cfc1c1f 100644
--- a/cpp/src/parquet/arrow/arrow-reader-writer-test.cc
+++ b/cpp/src/parquet/arrow/arrow-reader-writer-test.cc
@@ -85,7 +85,7 @@ static constexpr int LARGE_SIZE = 10000;
 
 static constexpr uint32_t kDefaultSeed = 0;
 
-std::shared_ptr<const LogicalType> get_logical_type(const ::DataType& type) {
+std::shared_ptr<const LogicalType> get_logical_type(const DataType& type) {
   switch (type.id()) {
     case ArrowId::UINT8:
       return LogicalType::Int(8, false);
@@ -154,7 +154,7 @@ std::shared_ptr<const LogicalType> get_logical_type(const ::DataType& type) {
   return LogicalType::None();
 }
 
-ParquetType::type get_physical_type(const ::DataType& type) {
+ParquetType::type get_physical_type(const DataType& type) {
   switch (type.id()) {
     case ArrowId::BOOL:
       return ParquetType::BOOLEAN;
@@ -332,7 +332,7 @@ const std::string test_traits<::arrow::BinaryType>::value("\x00\x01\x02\x03");
 const std::string test_traits<::arrow::FixedSizeBinaryType>::value("Fixed");    // NOLINT
 
 template <typename T>
-using ParquetDataType = DataType<test_traits<T>::parquet_enum>;
+using ParquetDataType = PhysicalType<test_traits<T>::parquet_enum>;
 
 template <typename T>
 using ParquetWriter = TypedColumnWriter<ParquetDataType<T>>;
@@ -401,11 +401,11 @@ void CheckConfiguredRoundtrip(
   if (expected_table) {
     ASSERT_NO_FATAL_FAILURE(
         ::arrow::AssertSchemaEqual(*actual_table->schema(), *expected_table->schema()));
-    ASSERT_NO_FATAL_FAILURE(::arrow::AssertTablesEqual(*actual_table, *expected_table));
+    ASSERT_NO_FATAL_FAILURE(::arrow::AssertTablesEqual(*expected_table, *actual_table));
   } else {
     ASSERT_NO_FATAL_FAILURE(
         ::arrow::AssertSchemaEqual(*actual_table->schema(), *input_table->schema()));
-    ASSERT_NO_FATAL_FAILURE(::arrow::AssertTablesEqual(*actual_table, *input_table));
+    ASSERT_NO_FATAL_FAILURE(::arrow::AssertTablesEqual(*input_table, *actual_table));
   }
 }
 
@@ -444,14 +444,14 @@ void CheckSimpleRoundtrip(const std::shared_ptr<Table>& table, int64_t row_group
   ASSERT_NO_FATAL_FAILURE(::arrow::AssertTablesEqual(*table, *result, false));
 }
 
-static std::shared_ptr<GroupNode> MakeSimpleSchema(const ::DataType& type,
+static std::shared_ptr<GroupNode> MakeSimpleSchema(const DataType& type,
                                                    Repetition::type repetition) {
   int32_t byte_width = -1;
 
   switch (type.id()) {
     case ::arrow::Type::DICTIONARY: {
       const auto& dict_type = static_cast<const ::arrow::DictionaryType&>(type);
-      const ::DataType& values_type = *dict_type.value_type();
+      const DataType& values_type = *dict_type.value_type();
       switch (values_type.id()) {
         case ::arrow::Type::FIXED_SIZE_BINARY:
           byte_width =
@@ -595,7 +595,8 @@ class TestParquetIO : public ::testing::Test {
     SchemaDescriptor descriptor;
     ASSERT_NO_THROW(descriptor.Init(schema));
     std::shared_ptr<::arrow::Schema> arrow_schema;
-    ASSERT_OK_NO_THROW(FromParquetSchema(&descriptor, &arrow_schema));
+    ArrowReaderProperties props;
+    ASSERT_OK_NO_THROW(FromParquetSchema(&descriptor, props, &arrow_schema));
     FileWriter writer(::arrow::default_memory_pool(), MakeWriter(schema), arrow_schema);
     ASSERT_OK_NO_THROW(writer.NewRowGroup(values->length()));
     ASSERT_OK_NO_THROW(writer.WriteColumnChunk(*values));
@@ -786,7 +787,8 @@ TYPED_TEST(TestParquetIO, SingleColumnRequiredChunkedWrite) {
   SchemaDescriptor descriptor;
   ASSERT_NO_THROW(descriptor.Init(schema));
   std::shared_ptr<::arrow::Schema> arrow_schema;
-  ASSERT_OK_NO_THROW(FromParquetSchema(&descriptor, &arrow_schema));
+  ArrowReaderProperties props;
+  ASSERT_OK_NO_THROW(FromParquetSchema(&descriptor, props, &arrow_schema));
   FileWriter writer(default_memory_pool(), this->MakeWriter(schema), arrow_schema);
   for (int i = 0; i < 4; i++) {
     ASSERT_OK_NO_THROW(writer.NewRowGroup(chunk_size));
@@ -855,7 +857,8 @@ TYPED_TEST(TestParquetIO, SingleColumnOptionalChunkedWrite) {
   SchemaDescriptor descriptor;
   ASSERT_NO_THROW(descriptor.Init(schema));
   std::shared_ptr<::arrow::Schema> arrow_schema;
-  ASSERT_OK_NO_THROW(FromParquetSchema(&descriptor, &arrow_schema));
+  ArrowReaderProperties props;
+  ASSERT_OK_NO_THROW(FromParquetSchema(&descriptor, props, &arrow_schema));
   FileWriter writer(::arrow::default_memory_pool(), this->MakeWriter(schema),
                     arrow_schema);
   for (int i = 0; i < 4; i++) {
@@ -1791,7 +1794,7 @@ void MakeDoubleTable(int num_columns, int num_rows, int nchunks,
 }
 
 void MakeListArray(int num_rows, int max_value_length,
-                   std::shared_ptr<::DataType>* out_type,
+                   std::shared_ptr<DataType>* out_type,
                    std::shared_ptr<Array>* out_array) {
   std::vector<int32_t> length_draws;
   randint(num_rows, 0, max_value_length, &length_draws);
@@ -1965,7 +1968,7 @@ TEST(TestArrowReadWrite, ListLargeRecords) {
   const int row_group_size = 100;
 
   std::shared_ptr<Array> list_array;
-  std::shared_ptr<::DataType> list_type;
+  std::shared_ptr<DataType> list_type;
 
   MakeListArray(num_rows, 20, &list_type, &list_array);
 
@@ -2009,14 +2012,14 @@ TEST(TestArrowReadWrite, ListLargeRecords) {
   ASSERT_TRUE(table->Equals(*chunked_table));
 }
 
-typedef std::function<void(int, std::shared_ptr<::DataType>*, std::shared_ptr<Array>*)>
+typedef std::function<void(int, std::shared_ptr<DataType>*, std::shared_ptr<Array>*)>
     ArrayFactory;
 
 template <typename ArrowType>
 struct GenerateArrayFunctor {
   explicit GenerateArrayFunctor(double pct_null = 0.1) : pct_null(pct_null) {}
 
-  void operator()(int length, std::shared_ptr<::DataType>* type,
+  void operator()(int length, std::shared_ptr<DataType>* type,
                   std::shared_ptr<Array>* array) {
     using T = typename ArrowType::c_type;
 
@@ -2034,16 +2037,16 @@ struct GenerateArrayFunctor {
   double pct_null;
 };
 
-typedef std::function<void(int, std::shared_ptr<::DataType>*, std::shared_ptr<Array>*)>
+typedef std::function<void(int, std::shared_ptr<DataType>*, std::shared_ptr<Array>*)>
     ArrayFactory;
 
-auto GenerateInt32 = [](int length, std::shared_ptr<::DataType>* type,
+auto GenerateInt32 = [](int length, std::shared_ptr<DataType>* type,
                         std::shared_ptr<Array>* array) {
   GenerateArrayFunctor<::arrow::Int32Type> func;
   func(length, type, array);
 };
 
-auto GenerateList = [](int length, std::shared_ptr<::DataType>* type,
+auto GenerateList = [](int length, std::shared_ptr<DataType>* type,
                        std::shared_ptr<Array>* array) {
   MakeListArray(length, 100, type, array);
 };
@@ -2078,7 +2081,7 @@ TEST(TestArrowReadWrite, TableWithChunkedColumns) {
   for (const auto& datagen_func : functions) {
     ::arrow::ArrayVector arrays;
     std::shared_ptr<Array> arr;
-    std::shared_ptr<::DataType> type;
+    std::shared_ptr<DataType> type;
     datagen_func(total_length, &type, &arr);
 
     int64_t offset = 0;
@@ -2575,15 +2578,16 @@ void TryReadDataFile(const std::string& path,
   auto pool = ::arrow::default_memory_pool();
 
   std::unique_ptr<FileReader> arrow_reader;
+  Status s =
+      FileReader::Make(pool, ParquetFileReader::OpenFile(path, false), &arrow_reader);
+  if (s.ok()) {
+    std::shared_ptr<::arrow::Table> table;
+    s = arrow_reader->ReadTable(&table);
+  }
 
-  arrow_reader.reset(new FileReader(pool, ParquetFileReader::OpenFile(path, false)));
-  std::shared_ptr<::arrow::Table> table;
-  auto status = arrow_reader->ReadTable(&table);
-
-  ASSERT_TRUE(status.code() == expected_code)
+  ASSERT_TRUE(s.code() == expected_code)
       << "Expected reading file to return "
-      << arrow::Status(expected_code, "").CodeAsString() << ", but got "
-      << status.ToString();
+      << arrow::Status(expected_code, "").CodeAsString() << ", but got " << s.ToString();
 }
 
 TEST(TestArrowReaderAdHoc, Int96BadMemoryAccess) {
@@ -2634,8 +2638,10 @@ TEST(TestArrowReaderAdHoc, DISABLED_LargeStringColumn) {
   array.reset();
 
   auto reader = ParquetFileReader::Open(std::make_shared<BufferReader>(tables_buffer));
-  FileReader arrow_reader(default_memory_pool(), std::move(reader));
-  ASSERT_OK_NO_THROW(arrow_reader.ReadTable(&table));
+
+  std::unique_ptr<FileReader> arrow_reader;
+  ASSERT_OK(FileReader::Make(default_memory_pool(), std::move(reader), &arrow_reader));
+  ASSERT_OK_NO_THROW(arrow_reader->ReadTable(&table));
   ASSERT_OK(table->Validate());
 }
 
@@ -2647,13 +2653,13 @@ TEST(TestArrowReaderAdHoc, HandleDictPageOffsetZero) {
 
 class TestArrowReaderAdHocSparkAndHvr
     : public ::testing::TestWithParam<
-          std::tuple<std::string, std::shared_ptr<::DataType>>> {};
+          std::tuple<std::string, std::shared_ptr<DataType>>> {};
 
 TEST_P(TestArrowReaderAdHocSparkAndHvr, ReadDecimals) {
   std::string path(test::get_data_dir());
 
   std::string filename;
-  std::shared_ptr<::DataType> decimal_type;
+  std::shared_ptr<DataType> decimal_type;
   std::tie(filename, decimal_type) = GetParam();
 
   path += "/" + filename;
@@ -2662,8 +2668,8 @@ TEST_P(TestArrowReaderAdHocSparkAndHvr, ReadDecimals) {
   auto pool = ::arrow::default_memory_pool();
 
   std::unique_ptr<FileReader> arrow_reader;
-  ASSERT_NO_THROW(
-      arrow_reader.reset(new FileReader(pool, ParquetFileReader::OpenFile(path, false))));
+  ASSERT_OK_NO_THROW(
+      FileReader::Make(pool, ParquetFileReader::OpenFile(path, false), &arrow_reader));
   std::shared_ptr<::arrow::Table> table;
   ASSERT_OK_NO_THROW(arrow_reader->ReadTable(&table));
 
@@ -2727,12 +2733,15 @@ TEST(TestArrowWriterAdHoc, SchemaMismatch) {
 
 // ----------------------------------------------------------------------
 // Tests for directly reading DictionaryArray
+
 class TestArrowReadDictionary : public ::testing::TestWithParam<double> {
  public:
   void SetUp() override {
     GenerateData(GetParam());
+
+    // Write 4 row groups; each row group will have a different dictionary
     ASSERT_NO_FATAL_FAILURE(
-        WriteTableToBuffer(expected_dense_, expected_dense_->num_rows() / 2,
+        WriteTableToBuffer(expected_dense_, expected_dense_->num_rows() / 4,
                            default_arrow_writer_properties(), &buffer_));
 
     properties_ = default_arrow_reader_properties();
@@ -2740,28 +2749,13 @@ class TestArrowReadDictionary : public ::testing::TestWithParam<double> {
 
   void GenerateData(double null_probability) {
     constexpr int num_unique = 100;
-    constexpr int repeat = 10;
+    constexpr int repeat = 100;
     constexpr int64_t min_length = 2;
-    constexpr int64_t max_length = 10;
+    constexpr int64_t max_length = 100;
     ::arrow::random::RandomArrayGenerator rag(0);
-    auto dense_array = rag.StringWithRepeats(repeat * num_unique, num_unique, min_length,
-                                             max_length, null_probability);
-    expected_dense_ = MakeSimpleTable(dense_array, /*nullable=*/true);
-
-    ::arrow::StringDictionaryBuilder builder(default_memory_pool());
-    const auto& string_array = static_cast<const ::arrow::StringArray&>(*dense_array);
-    ASSERT_OK(builder.AppendArray(string_array));
-
-    std::shared_ptr<::arrow::Array> dict_array;
-    ASSERT_OK(builder.Finish(&dict_array));
-    expected_dict_ = MakeSimpleTable(dict_array, /*nullable=*/true);
-
-    // TODO(hatemhelal): Figure out if we can use the following to init the expected_dict_
-    // Currently fails due to DataType mismatch for indices array.
-    //    Datum out;
-    //    FunctionContext ctx(default_memory_pool());
-    //    ASSERT_OK(DictionaryEncode(&ctx, Datum(dense_array), &out));
-    //    expected_dict_ = MakeSimpleTable(out.make_array(), /*nullable=*/true);
+    dense_values_ = rag.StringWithRepeats(repeat * num_unique, num_unique, min_length,
+                                          max_length, null_probability);
+    expected_dense_ = MakeSimpleTable(dense_values_, /*nullable=*/true);
   }
 
   void TearDown() override {}
@@ -2773,21 +2767,37 @@ class TestArrowReadDictionary : public ::testing::TestWithParam<double> {
 
     std::shared_ptr<Table> actual;
     ASSERT_OK_NO_THROW(reader->ReadTable(&actual));
-    ::arrow::AssertTablesEqual(*actual, expected, /*same_chunk_layout=*/false);
+    ::arrow::AssertTablesEqual(expected, *actual, /*same_chunk_layout=*/false);
   }
 
   static std::vector<double> null_probabilites() { return {0.0, 0.5, 1}; }
 
  protected:
+  std::shared_ptr<Array> dense_values_;
   std::shared_ptr<Table> expected_dense_;
   std::shared_ptr<Table> expected_dict_;
   std::shared_ptr<Buffer> buffer_;
   ArrowReaderProperties properties_;
 };
 
+void AsDictionaryEncoded(const Array& arr, std::shared_ptr<Array>* out) {
+  ::arrow::StringDictionaryBuilder builder(default_memory_pool());
+  const auto& string_array = static_cast<const ::arrow::StringArray&>(arr);
+  ASSERT_OK(builder.AppendArray(string_array));
+  ASSERT_OK(builder.Finish(out));
+}
+
 TEST_P(TestArrowReadDictionary, ReadWholeFileDict) {
   properties_.set_read_dictionary(0, true);
-  CheckReadWholeFile(*expected_dict_);
+
+  std::vector<std::shared_ptr<Array>> chunks(4);
+  const int64_t chunk_size = expected_dense_->num_rows() / 4;
+  for (int i = 0; i < 4; ++i) {
+    AsDictionaryEncoded(*dense_values_->Slice(chunk_size * i, chunk_size), &chunks[i]);
+  }
+  auto ex_table = MakeSimpleTable(std::make_shared<ChunkedArray>(chunks),
+                                  /*nullable=*/true);
+  CheckReadWholeFile(*ex_table);
 }
 
 TEST_P(TestArrowReadDictionary, ReadWholeFileDense) {
diff --git a/cpp/src/parquet/arrow/arrow-schema-test.cc b/cpp/src/parquet/arrow/arrow-schema-test.cc
index dc0a02a..06646b4 100644
--- a/cpp/src/parquet/arrow/arrow-schema-test.cc
+++ b/cpp/src/parquet/arrow/arrow-schema-test.cc
@@ -20,6 +20,7 @@
 
 #include "gtest/gtest.h"
 
+#include "parquet/arrow/reader.h"
 #include "parquet/arrow/schema.h"
 #include "parquet/file_reader.h"
 #include "parquet/schema.h"
@@ -74,14 +75,16 @@ class TestConvertParquetSchema : public ::testing::Test {
   ::arrow::Status ConvertSchema(const std::vector<NodePtr>& nodes) {
     NodePtr schema = GroupNode::Make("schema", Repetition::REPEATED, nodes);
     descr_.Init(schema);
-    return FromParquetSchema(&descr_, &result_schema_);
+    ArrowReaderProperties props;
+    return FromParquetSchema(&descr_, props, &result_schema_);
   }
 
   ::arrow::Status ConvertSchema(const std::vector<NodePtr>& nodes,
                                 const std::vector<int>& column_indices) {
     NodePtr schema = GroupNode::Make("schema", Repetition::REPEATED, nodes);
     descr_.Init(schema);
-    return FromParquetSchema(&descr_, column_indices, &result_schema_);
+    ArrowReaderProperties props;
+    return FromParquetSchema(&descr_, column_indices, props, &result_schema_);
   }
 
   ::arrow::Status ConvertSchema(
@@ -89,7 +92,8 @@ class TestConvertParquetSchema : public ::testing::Test {
       const std::shared_ptr<const KeyValueMetadata>& key_value_metadata) {
     NodePtr schema = GroupNode::Make("schema", Repetition::REPEATED, nodes);
     descr_.Init(schema);
-    return FromParquetSchema(&descr_, {}, key_value_metadata, &result_schema_);
+    ArrowReaderProperties props;
+    return FromParquetSchema(&descr_, {}, props, key_value_metadata, &result_schema_);
   }
 
  protected:
@@ -1048,7 +1052,8 @@ TEST(TestFromParquetSchema, CorruptMetadata) {
       parquet::ParquetFileReader::OpenFile(path);
   const auto parquet_schema = reader->metadata()->schema();
   std::shared_ptr<::arrow::Schema> arrow_schema;
-  ASSERT_RAISES(IOError, FromParquetSchema(parquet_schema, &arrow_schema));
+  ArrowReaderProperties props;
+  ASSERT_RAISES(IOError, FromParquetSchema(parquet_schema, props, &arrow_schema));
 }
 
 }  // namespace arrow
diff --git a/cpp/src/parquet/arrow/reader-writer-benchmark.cc b/cpp/src/parquet/arrow/reader-writer-benchmark.cc
index 239d707..59c238b 100644
--- a/cpp/src/parquet/arrow/reader-writer-benchmark.cc
+++ b/cpp/src/parquet/arrow/reader-writer-benchmark.cc
@@ -179,9 +179,11 @@ static void BM_ReadColumn(::benchmark::State& state) {
   while (state.KeepRunning()) {
     auto reader =
         ParquetFileReader::Open(std::make_shared<::arrow::io::BufferReader>(buffer));
-    FileReader filereader(::arrow::default_memory_pool(), std::move(reader));
+    std::unique_ptr<FileReader> arrow_reader;
+    EXIT_NOT_OK(FileReader::Make(::arrow::default_memory_pool(), std::move(reader),
+                                 &arrow_reader));
     std::shared_ptr<::arrow::Table> table;
-    EXIT_NOT_OK(filereader.ReadTable(&table));
+    EXIT_NOT_OK(arrow_reader->ReadTable(&table));
   }
   SetBytesProcessed<nullable, ParquetType>(state);
 }
@@ -212,14 +214,16 @@ static void BM_ReadIndividualRowGroups(::benchmark::State& state) {
   while (state.KeepRunning()) {
     auto reader =
         ParquetFileReader::Open(std::make_shared<::arrow::io::BufferReader>(buffer));
-    FileReader filereader(::arrow::default_memory_pool(), std::move(reader));
+    std::unique_ptr<FileReader> arrow_reader;
+    EXIT_NOT_OK(FileReader::Make(::arrow::default_memory_pool(), std::move(reader),
+                                 &arrow_reader));
 
     std::vector<std::shared_ptr<::arrow::Table>> tables;
-    for (int i = 0; i < filereader.num_row_groups(); i++) {
+    for (int i = 0; i < arrow_reader->num_row_groups(); i++) {
       // Only read the even numbered RowGroups
       if ((i % 2) == 0) {
         std::shared_ptr<::arrow::Table> table;
-        EXIT_NOT_OK(filereader.RowGroup(i)->ReadTable(&table));
+        EXIT_NOT_OK(arrow_reader->RowGroup(i)->ReadTable(&table));
         tables.push_back(table);
       }
     }
@@ -245,11 +249,13 @@ static void BM_ReadMultipleRowGroups(::benchmark::State& state) {
   while (state.KeepRunning()) {
     auto reader =
         ParquetFileReader::Open(std::make_shared<::arrow::io::BufferReader>(buffer));
-    FileReader filereader(::arrow::default_memory_pool(), std::move(reader));
+    std::unique_ptr<FileReader> arrow_reader;
+    EXIT_NOT_OK(FileReader::Make(::arrow::default_memory_pool(), std::move(reader),
+                                 &arrow_reader));
 
     std::vector<std::shared_ptr<::arrow::Table>> tables;
     std::vector<int> rgs;
-    for (int i = 0; i < filereader.num_row_groups(); i++) {
+    for (int i = 0; i < arrow_reader->num_row_groups(); i++) {
       // Only read the even numbered RowGroups
       if ((i % 2) == 0) {
         rgs.push_back(i);
@@ -257,7 +263,7 @@ static void BM_ReadMultipleRowGroups(::benchmark::State& state) {
     }
 
     std::shared_ptr<::arrow::Table> table;
-    EXIT_NOT_OK(filereader.ReadRowGroups(rgs, &table));
+    EXIT_NOT_OK(arrow_reader->ReadRowGroups(rgs, &table));
   }
   SetBytesProcessed<true, Int64Type>(state);
 }
diff --git a/cpp/src/parquet/arrow/reader.cc b/cpp/src/parquet/arrow/reader.cc
index 45071b5..fc953da 100644
--- a/cpp/src/parquet/arrow/reader.cc
+++ b/cpp/src/parquet/arrow/reader.cc
@@ -38,6 +38,7 @@
 #include "arrow/util/int-util.h"
 #include "arrow/util/logging.h"
 #include "arrow/util/thread-pool.h"
+#include "arrow/util/ubsan.h"
 
 // For arrow::compute::Datum. This should perhaps be promoted. See ARROW-4022
 #include "arrow/compute/kernel.h"
@@ -55,6 +56,7 @@
 using arrow::Array;
 using arrow::BooleanArray;
 using arrow::ChunkedArray;
+using arrow::DataType;
 using arrow::Field;
 using arrow::Int32Array;
 using arrow::ListArray;
@@ -65,6 +67,10 @@ using arrow::StructArray;
 using arrow::Table;
 using arrow::TimestampArray;
 
+using ::arrow::BitUtil::FromBigEndian;
+using ::arrow::internal::SafeLeftShift;
+using ::arrow::util::SafeLoadAs;
+
 // For Array/ChunkedArray variant
 using arrow::compute::Datum;
 
@@ -79,10 +85,6 @@ using parquet::internal::RecordReader;
 namespace parquet {
 namespace arrow {
 
-using ::arrow::BitUtil::FromBigEndian;
-using ::arrow::internal::SafeLeftShift;
-using ::arrow::util::SafeLoadAs;
-
 template <typename ArrowType>
 using ArrayType = typename ::arrow::TypeTraits<ArrowType>::ArrayType;
 
@@ -227,6 +229,11 @@ class FileReader::Impl {
 
   virtual ~Impl() {}
 
+  Status Init() {
+    // TODO(wesm): Smarter schema/column-reader initialization for nested data
+    return Status::OK();
+  }
+
   Status GetColumn(int i, FileColumnIteratorFactory iterator_factory,
                    std::unique_ptr<ColumnReader>* out);
 
@@ -246,6 +253,7 @@ class FileReader::Impl {
   Status GetSchema(std::shared_ptr<::arrow::Schema>* out);
   Status GetSchema(const std::vector<int>& indices,
                    std::shared_ptr<::arrow::Schema>* out);
+
   Status ReadRowGroup(int row_group_index, std::shared_ptr<Table>* table);
   Status ReadRowGroup(int row_group_index, const std::vector<int>& indices,
                       std::shared_ptr<::arrow::Table>* out);
@@ -298,8 +306,11 @@ class ColumnReader::ColumnReaderImpl {
 class PARQUET_NO_EXPORT PrimitiveImpl : public ColumnReader::ColumnReaderImpl {
  public:
   PrimitiveImpl(MemoryPool* pool, std::unique_ptr<FileColumnIterator> input,
-                const bool read_dictionary)
-      : pool_(pool), input_(std::move(input)), descr_(input_->descr()) {
+                bool read_dictionary)
+      : pool_(pool),
+        input_(std::move(input)),
+        descr_(input_->descr()),
+        read_dictionary_(read_dictionary) {
     record_reader_ = RecordReader::Make(descr_, pool_, read_dictionary);
     Status s = NodeToField(*input_->descr()->schema_node(), &field_);
     DCHECK_OK(s);
@@ -308,7 +319,6 @@ class PARQUET_NO_EXPORT PrimitiveImpl : public ColumnReader::ColumnReaderImpl {
 
   Status NextBatch(int64_t records_to_read, std::shared_ptr<ChunkedArray>* out) override;
 
-  template <typename ParquetType>
   Status WrapIntoListArray(Datum* inout_array);
 
   Status GetDefLevels(const int16_t** data, size_t* length) override;
@@ -324,8 +334,10 @@ class PARQUET_NO_EXPORT PrimitiveImpl : public ColumnReader::ColumnReaderImpl {
   const ColumnDescriptor* descr_;
 
   std::shared_ptr<RecordReader> record_reader_;
-
   std::shared_ptr<Field> field_;
+
+  // Are we reading directly to dictionary-encoded?
+  bool read_dictionary_;
 };
 
 // Reader implementation for struct array
@@ -358,6 +370,20 @@ FileReader::FileReader(MemoryPool* pool, std::unique_ptr<ParquetFileReader> read
                        const ArrowReaderProperties& properties)
     : impl_(new FileReader::Impl(pool, std::move(reader), properties)) {}
 
+Status FileReader::Make(::arrow::MemoryPool* pool,
+                        std::unique_ptr<ParquetFileReader> reader,
+                        const ArrowReaderProperties& properties,
+                        std::unique_ptr<FileReader>* out) {
+  out->reset(new FileReader(pool, std::move(reader), properties));
+  return (*out)->impl_->Init();
+}
+
+Status FileReader::Make(::arrow::MemoryPool* pool,
+                        std::unique_ptr<ParquetFileReader> reader,
+                        std::unique_ptr<FileReader>* out) {
+  return Make(pool, std::move(reader), default_arrow_reader_properties(), out);
+}
+
 FileReader::~FileReader() {}
 
 Status FileReader::Impl::GetColumn(int i, FileColumnIteratorFactory iterator_factory,
@@ -370,10 +396,9 @@ Status FileReader::Impl::GetColumn(int i, FileColumnIteratorFactory iterator_fac
   }
 
   std::unique_ptr<FileColumnIterator> input(iterator_factory(i, reader_.get()));
-  bool read_dict = reader_properties_.read_dictionary(i);
 
   std::unique_ptr<ColumnReader::ColumnReaderImpl> impl(
-      new PrimitiveImpl(pool_, std::move(input), read_dict));
+      new PrimitiveImpl(pool_, std::move(input), reader_properties_.read_dictionary(i)));
   *out = std::unique_ptr<ColumnReader>(new ColumnReader(std::move(impl)));
   return Status::OK();
 }
@@ -480,14 +505,14 @@ Status FileReader::Impl::ReadColumn(int i, std::shared_ptr<ChunkedArray>* out) {
   return flat_column_reader->NextBatch(records_to_read, out);
 }
 
-Status FileReader::Impl::GetSchema(const std::vector<int>& indices,
-                                   std::shared_ptr<::arrow::Schema>* out) {
-  return FromParquetSchema(reader_->metadata()->schema(), indices,
+Status FileReader::Impl::GetSchema(std::shared_ptr<::arrow::Schema>* out) {
+  return FromParquetSchema(reader_->metadata()->schema(), reader_properties_,
                            reader_->metadata()->key_value_metadata(), out);
 }
 
-Status FileReader::Impl::GetSchema(std::shared_ptr<::arrow::Schema>* out) {
-  return FromParquetSchema(reader_->metadata()->schema(),
+Status FileReader::Impl::GetSchema(const std::vector<int>& indices,
+                                   std::shared_ptr<::arrow::Schema>* out) {
+  return FromParquetSchema(reader_->metadata()->schema(), indices, reader_properties_,
                            reader_->metadata()->key_value_metadata(), out);
 }
 
@@ -576,7 +601,6 @@ Status FileReader::Impl::ReadRowGroup(int row_group_index,
   }
 
   auto dict_indices = GetDictionaryIndices(indices);
-
   if (!dict_indices.empty()) {
     schema = FixSchema(*schema, dict_indices, columns);
   }
@@ -628,7 +652,6 @@ Status FileReader::Impl::ReadTable(const std::vector<int>& indices,
   }
 
   auto dict_indices = GetDictionaryIndices(indices);
-
   if (!dict_indices.empty()) {
     schema = FixSchema(*schema, dict_indices, columns);
   }
@@ -701,29 +724,27 @@ std::shared_ptr<::arrow::Schema> FileReader::Impl::FixSchema(
 
 // Static ctor
 Status OpenFile(const std::shared_ptr<::arrow::io::RandomAccessFile>& file,
-                MemoryPool* allocator, const ReaderProperties& props,
+                MemoryPool* pool, const ReaderProperties& props,
                 const std::shared_ptr<FileMetaData>& metadata,
                 std::unique_ptr<FileReader>* reader) {
   std::unique_ptr<ParquetReader> pq_reader;
   PARQUET_CATCH_NOT_OK(pq_reader = ParquetReader::Open(file, props, metadata));
-  reader->reset(new FileReader(allocator, std::move(pq_reader)));
-  return Status::OK();
+  return FileReader::Make(pool, std::move(pq_reader), default_arrow_reader_properties(),
+                          reader);
 }
 
 Status OpenFile(const std::shared_ptr<::arrow::io::RandomAccessFile>& file,
-                MemoryPool* allocator, std::unique_ptr<FileReader>* reader) {
-  return OpenFile(file, allocator, ::parquet::default_reader_properties(), nullptr,
-                  reader);
+                MemoryPool* pool, std::unique_ptr<FileReader>* reader) {
+  return OpenFile(file, pool, ::parquet::default_reader_properties(), nullptr, reader);
 }
 
 Status OpenFile(const std::shared_ptr<::arrow::io::RandomAccessFile>& file,
-                ::arrow::MemoryPool* allocator, const ArrowReaderProperties& properties,
+                ::arrow::MemoryPool* pool, const ArrowReaderProperties& properties,
                 std::unique_ptr<FileReader>* reader) {
   std::unique_ptr<ParquetReader> pq_reader;
   PARQUET_CATCH_NOT_OK(pq_reader = ParquetReader::Open(
                            file, ::parquet::default_reader_properties(), nullptr));
-  reader->reset(new FileReader(allocator, std::move(pq_reader), properties));
-  return Status::OK();
+  return FileReader::Make(pool, std::move(pq_reader), properties, reader);
 }
 
 Status FileReader::GetColumn(int i, std::unique_ptr<ColumnReader>* out) {
@@ -880,7 +901,6 @@ const ParquetFileReader* FileReader::parquet_reader() const {
   return impl_->parquet_reader();
 }
 
-template <typename ParquetType>
 Status PrimitiveImpl::WrapIntoListArray(Datum* inout_array) {
   if (descr_->max_repetition_level() == 0) {
     // Flat, no action
@@ -908,11 +928,18 @@ Status PrimitiveImpl::WrapIntoListArray(Datum* inout_array) {
   const int64_t total_levels_read = record_reader_->levels_position();
 
   std::shared_ptr<::arrow::Schema> arrow_schema;
-  RETURN_NOT_OK(FromParquetSchema(input_->schema(), {input_->column_index()},
-                                  input_->metadata()->key_value_metadata(),
-                                  &arrow_schema));
+  RETURN_NOT_OK(FromParquetSchema(
+      input_->schema(), {input_->column_index()}, default_arrow_reader_properties(),
+      input_->metadata()->key_value_metadata(), &arrow_schema));
   std::shared_ptr<Field> current_field = arrow_schema->field(0);
 
+  if (current_field->type()->num_children() > 0 &&
+      flat_array->type_id() == ::arrow::Type::DICTIONARY) {
+    // XXX(wesm): Handling of nested types and dictionary encoding needs to be
+    // significantly refactored
+    return Status::Invalid("Cannot have nested types containing dictionary arrays yet");
+  }
+
   // Walk downwards to extract nullability
   std::vector<bool> nullable;
   std::vector<std::shared_ptr<::arrow::Int32Builder>> offset_builders;
@@ -1016,113 +1043,65 @@ Status PrimitiveImpl::WrapIntoListArray(Datum* inout_array) {
   return Status::OK();
 }
 
-template <typename ArrowType, typename ParquetType>
-struct supports_fast_path_impl {
-  using ArrowCType = typename ArrowType::c_type;
-  using ParquetCType = typename ParquetType::c_type;
-  static constexpr bool value = std::is_same<ArrowCType, ParquetCType>::value;
-};
-
-template <typename ArrowType>
-struct supports_fast_path_impl<ArrowType, ByteArrayType> {
-  static constexpr bool value = false;
-};
+// ----------------------------------------------------------------------
+// Primitive types
 
-template <typename ArrowType>
-struct supports_fast_path_impl<ArrowType, FLBAType> {
-  static constexpr bool value = false;
-};
+template <typename ArrowType, typename ParquetType, typename Enable = void>
+struct TransferFunctor {};
 
 template <typename ArrowType, typename ParquetType>
-using supports_fast_path =
-    typename std::enable_if<supports_fast_path_impl<ArrowType, ParquetType>::value>::type;
-
-template <typename ArrowType, typename ParquetType, typename Enable = void>
-struct TransferFunctor {
+Status TransferInt(RecordReader* reader, MemoryPool* pool,
+                   const std::shared_ptr<DataType>& type, Datum* out) {
   using ArrowCType = typename ArrowType::c_type;
   using ParquetCType = typename ParquetType::c_type;
+  int64_t length = reader->values_written();
+  std::shared_ptr<Buffer> data;
+  RETURN_NOT_OK(::arrow::AllocateBuffer(pool, length * sizeof(ArrowCType), &data));
 
-  Status operator()(RecordReader* reader, MemoryPool* pool,
-                    const std::shared_ptr<::arrow::DataType>& type, Datum* out) {
-    static_assert(!std::is_same<ArrowType, ::arrow::Int32Type>::value,
-                  "The fast path transfer functor should be used "
-                  "for primitive values");
+  auto values = reinterpret_cast<const ParquetCType*>(reader->values());
+  auto out_ptr = reinterpret_cast<ArrowCType*>(data->mutable_data());
+  std::copy(values, values + length, out_ptr);
+  *out = std::make_shared<ArrayType<ArrowType>>(
+      type, length, data, reader->ReleaseIsValid(), reader->null_count());
+  return Status::OK();
+}
 
-    int64_t length = reader->values_written();
-    std::shared_ptr<Buffer> data;
-    RETURN_NOT_OK(::arrow::AllocateBuffer(pool, length * sizeof(ArrowCType), &data));
+std::shared_ptr<Array> TransferZeroCopy(RecordReader* reader,
+                                        const std::shared_ptr<DataType>& type) {
+  std::vector<std::shared_ptr<Buffer>> buffers = {reader->ReleaseIsValid(),
+                                                  reader->ReleaseValues()};
+  auto data = std::make_shared<::arrow::ArrayData>(type, reader->values_written(),
+                                                   buffers, reader->null_count());
+  return MakeArray(data);
+}
 
-    auto values = reinterpret_cast<const ParquetCType*>(reader->values());
-    auto out_ptr = reinterpret_cast<ArrowCType*>(data->mutable_data());
-    std::copy(values, values + length, out_ptr);
+Status TransferBool(RecordReader* reader, MemoryPool* pool, Datum* out) {
+  int64_t length = reader->values_written();
+  std::shared_ptr<Buffer> data;
 
-    if (reader->nullable_values()) {
-      std::shared_ptr<ResizableBuffer> is_valid = reader->ReleaseIsValid();
-      *out = std::make_shared<ArrayType<ArrowType>>(type, length, data, is_valid,
-                                                    reader->null_count());
-    } else {
-      *out = std::make_shared<ArrayType<ArrowType>>(type, length, data);
-    }
-    return Status::OK();
-  }
-};
+  const int64_t buffer_size = BitUtil::BytesForBits(length);
+  RETURN_NOT_OK(::arrow::AllocateBuffer(pool, buffer_size, &data));
 
-template <typename ArrowType, typename ParquetType>
-struct TransferFunctor<ArrowType, ParquetType,
-                       supports_fast_path<ArrowType, ParquetType>> {
-  Status operator()(RecordReader* reader, MemoryPool* pool,
-                    const std::shared_ptr<::arrow::DataType>& type, Datum* out) {
-    int64_t length = reader->values_written();
-    std::shared_ptr<ResizableBuffer> values = reader->ReleaseValues();
+  // Transfer boolean values to packed bitmap
+  auto values = reinterpret_cast<const bool*>(reader->values());
+  uint8_t* data_ptr = data->mutable_data();
+  memset(data_ptr, 0, buffer_size);
 
-    if (reader->nullable_values()) {
-      std::shared_ptr<ResizableBuffer> is_valid = reader->ReleaseIsValid();
-      *out = std::make_shared<ArrayType<ArrowType>>(type, length, values, is_valid,
-                                                    reader->null_count());
-    } else {
-      *out = std::make_shared<ArrayType<ArrowType>>(type, length, values);
+  for (int64_t i = 0; i < length; i++) {
+    if (values[i]) {
+      ::arrow::BitUtil::SetBit(data_ptr, i);
     }
-    return Status::OK();
   }
-};
 
-template <>
-struct TransferFunctor<::arrow::BooleanType, BooleanType> {
-  Status operator()(RecordReader* reader, MemoryPool* pool,
-                    const std::shared_ptr<::arrow::DataType>& type, Datum* out) {
-    int64_t length = reader->values_written();
-    std::shared_ptr<Buffer> data;
-
-    const int64_t buffer_size = BitUtil::BytesForBits(length);
-    RETURN_NOT_OK(::arrow::AllocateBuffer(pool, buffer_size, &data));
-
-    // Transfer boolean values to packed bitmap
-    auto values = reinterpret_cast<const bool*>(reader->values());
-    uint8_t* data_ptr = data->mutable_data();
-    memset(data_ptr, 0, buffer_size);
-
-    for (int64_t i = 0; i < length; i++) {
-      if (values[i]) {
-        ::arrow::BitUtil::SetBit(data_ptr, i);
-      }
-    }
-
-    if (reader->nullable_values()) {
-      std::shared_ptr<ResizableBuffer> is_valid = reader->ReleaseIsValid();
-      RETURN_NOT_OK(is_valid->Resize(BitUtil::BytesForBits(length), false));
-      *out = std::make_shared<BooleanArray>(type, length, data, is_valid,
-                                            reader->null_count());
-    } else {
-      *out = std::make_shared<BooleanArray>(type, length, data);
-    }
-    return Status::OK();
-  }
-};
+  *out = std::make_shared<BooleanArray>(length, data, reader->ReleaseIsValid(),
+                                        reader->null_count());
+  return Status::OK();
+}
 
 template <>
 struct TransferFunctor<::arrow::TimestampType, Int96Type> {
   Status operator()(RecordReader* reader, MemoryPool* pool,
-                    const std::shared_ptr<::arrow::DataType>& type, Datum* out) {
+                    const std::shared_ptr<DataType>& type, Datum* out) {
     int64_t length = reader->values_written();
     auto values = reinterpret_cast<const Int96*>(reader->values());
 
@@ -1134,59 +1113,88 @@ struct TransferFunctor<::arrow::TimestampType, Int96Type> {
       *data_ptr++ = Int96GetNanoSeconds(values[i]);
     }
 
-    if (reader->nullable_values()) {
-      std::shared_ptr<ResizableBuffer> is_valid = reader->ReleaseIsValid();
-      *out = std::make_shared<TimestampArray>(type, length, data, is_valid,
-                                              reader->null_count());
-    } else {
-      *out = std::make_shared<TimestampArray>(type, length, data);
-    }
-
+    *out = std::make_shared<TimestampArray>(type, length, data, reader->ReleaseIsValid(),
+                                            reader->null_count());
     return Status::OK();
   }
 };
 
-template <>
-struct TransferFunctor<::arrow::Date64Type, Int32Type> {
-  Status operator()(RecordReader* reader, MemoryPool* pool,
-                    const std::shared_ptr<::arrow::DataType>& type, Datum* out) {
-    int64_t length = reader->values_written();
-    auto values = reinterpret_cast<const int32_t*>(reader->values());
+Status TransferDate64(RecordReader* reader, MemoryPool* pool,
+                      const std::shared_ptr<DataType>& type, Datum* out) {
+  int64_t length = reader->values_written();
+  auto values = reinterpret_cast<const int32_t*>(reader->values());
 
-    std::shared_ptr<Buffer> data;
-    RETURN_NOT_OK(::arrow::AllocateBuffer(pool, length * sizeof(int64_t), &data));
-    auto out_ptr = reinterpret_cast<int64_t*>(data->mutable_data());
+  std::shared_ptr<Buffer> data;
+  RETURN_NOT_OK(::arrow::AllocateBuffer(pool, length * sizeof(int64_t), &data));
+  auto out_ptr = reinterpret_cast<int64_t*>(data->mutable_data());
 
-    for (int64_t i = 0; i < length; i++) {
-      *out_ptr++ = static_cast<int64_t>(values[i]) * kMillisecondsPerDay;
-    }
+  for (int64_t i = 0; i < length; i++) {
+    *out_ptr++ = static_cast<int64_t>(values[i]) * kMillisecondsPerDay;
+  }
 
-    if (reader->nullable_values()) {
-      std::shared_ptr<ResizableBuffer> is_valid = reader->ReleaseIsValid();
-      *out = std::make_shared<::arrow::Date64Array>(type, length, data, is_valid,
-                                                    reader->null_count());
-    } else {
-      *out = std::make_shared<::arrow::Date64Array>(type, length, data);
-    }
-    return Status::OK();
+  *out = std::make_shared<::arrow::Date64Array>(
+      type, length, data, reader->ReleaseIsValid(), reader->null_count());
+  return Status::OK();
+}
+
+// ----------------------------------------------------------------------
+// Binary, direct to dictionary-encoded
+
+// Some ugly hacks here for now to handle binary dictionaries casted to their
+// logical type
+std::shared_ptr<Array> ShallowCast(const Array& arr,
+                                   const std::shared_ptr<DataType>& new_type) {
+  std::shared_ptr<::arrow::ArrayData> new_data = arr.data()->Copy();
+  new_data->type = new_type;
+  if (new_type->id() == ::arrow::Type::DICTIONARY) {
+    // Cast dictionary, too
+    const auto& dict_type = static_cast<const ::arrow::DictionaryType&>(*new_type);
+    new_data->dictionary = ShallowCast(*new_data->dictionary, dict_type.value_type());
   }
-};
+  return MakeArray(new_data);
+}
 
-template <typename ArrowType, typename ParquetType>
-struct TransferFunctor<
-    ArrowType, ParquetType,
-    typename std::enable_if<
-        (std::is_base_of<::arrow::BinaryType, ArrowType>::value ||
-         std::is_same<::arrow::FixedSizeBinaryType, ArrowType>::value) &&
-        (std::is_same<ParquetType, ByteArrayType>::value ||
-         std::is_same<ParquetType, FLBAType>::value)>::type> {
-  Status operator()(RecordReader* reader, MemoryPool* pool,
-                    const std::shared_ptr<::arrow::DataType>& type, Datum* out) {
-    std::vector<std::shared_ptr<Array>> chunks = reader->GetBuilderChunks();
-    *out = std::make_shared<ChunkedArray>(chunks);
-    return Status::OK();
+std::shared_ptr<ChunkedArray> CastChunksTo(
+    const ChunkedArray& data, const std::shared_ptr<DataType>& logical_value_type) {
+  std::vector<std::shared_ptr<Array>> string_chunks;
+  for (int i = 0; i < data.num_chunks(); ++i) {
+    string_chunks.push_back(ShallowCast(*data.chunk(i), logical_value_type));
   }
-};
+  return std::make_shared<ChunkedArray>(string_chunks);
+}
+
+Status TransferDictionary(RecordReader* reader,
+                          const std::shared_ptr<DataType>& logical_value_type,
+                          std::shared_ptr<ChunkedArray>* out) {
+  auto dict_reader = dynamic_cast<internal::DictionaryRecordReader*>(reader);
+  DCHECK(dict_reader);
+  *out = dict_reader->GetResult();
+
+  const auto& dict_type = static_cast<const ::arrow::DictionaryType&>(*(*out)->type());
+  if (!logical_value_type->Equals(*dict_type.value_type())) {
+    *out = CastChunksTo(**out,
+                        ::arrow::dictionary(dict_type.index_type(), logical_value_type));
+  }
+  return Status::OK();
+}
+
+Status TransferBinary(RecordReader* reader,
+                      const std::shared_ptr<DataType>& logical_value_type,
+                      bool read_dictionary, std::shared_ptr<ChunkedArray>* out) {
+  if (read_dictionary) {
+    return TransferDictionary(reader, logical_value_type, out);
+  }
+  auto binary_reader = dynamic_cast<internal::BinaryRecordReader*>(reader);
+  DCHECK(binary_reader);
+  *out = std::make_shared<ChunkedArray>(binary_reader->GetBuilderChunks());
+  if (!logical_value_type->Equals(*(*out)->type())) {
+    *out = CastChunksTo(**out, logical_value_type);
+  }
+  return Status::OK();
+}
+
+// ----------------------------------------------------------------------
+// INT32 / INT64 / BYTE_ARRAY / FIXED_LEN_BYTE_ARRAY -> Decimal128
 
 static uint64_t BytesToInteger(const uint8_t* bytes, int32_t start, int32_t stop) {
   const int32_t length = stop - start;
@@ -1305,18 +1313,15 @@ static inline void RawBytesToDecimalBytes(const uint8_t* value, int32_t byte_wid
   BytesToIntegerPair(value, byte_width, high, low);
 }
 
-// ----------------------------------------------------------------------
-// BYTE_ARRAY / FIXED_LEN_BYTE_ARRAY -> Decimal128
-
 template <typename T>
-Status ConvertToDecimal128(const Array& array, const std::shared_ptr<::arrow::DataType>&,
+Status ConvertToDecimal128(const Array& array, const std::shared_ptr<DataType>&,
                            MemoryPool* pool, std::shared_ptr<Array>*) {
   return Status::NotImplemented("not implemented");
 }
 
 template <>
 Status ConvertToDecimal128<FLBAType>(const Array& array,
-                                     const std::shared_ptr<::arrow::DataType>& type,
+                                     const std::shared_ptr<DataType>& type,
                                      MemoryPool* pool, std::shared_ptr<Array>* out) {
   const auto& fixed_size_binary_array =
       static_cast<const ::arrow::FixedSizeBinaryArray&>(array);
@@ -1364,7 +1369,7 @@ Status ConvertToDecimal128<FLBAType>(const Array& array,
 
 template <>
 Status ConvertToDecimal128<ByteArrayType>(const Array& array,
-                                          const std::shared_ptr<::arrow::DataType>& type,
+                                          const std::shared_ptr<DataType>& type,
                                           MemoryPool* pool, std::shared_ptr<Array>* out) {
   const auto& binary_array = static_cast<const ::arrow::BinaryArray&>(array);
   const int64_t length = binary_array.length();
@@ -1405,37 +1410,6 @@ Status ConvertToDecimal128<ByteArrayType>(const Array& array,
   return Status::OK();
 }
 
-/// \brief Convert an arrow::BinaryArray to an arrow::Decimal128Array
-/// We do this by:
-/// 1. Creating an arrow::BinaryArray from the RecordReader's builder
-/// 2. Allocating a buffer for the arrow::Decimal128Array
-/// 3. Converting the big-endian bytes in each BinaryArray entry to two integers
-///    representing the high and low bits of each decimal value.
-template <typename ArrowType, typename ParquetType>
-struct TransferFunctor<
-    ArrowType, ParquetType,
-    typename std::enable_if<std::is_same<ArrowType, ::arrow::Decimal128Type>::value &&
-                            (std::is_same<ParquetType, ByteArrayType>::value ||
-                             std::is_same<ParquetType, FLBAType>::value)>::type> {
-  Status operator()(RecordReader* reader, MemoryPool* pool,
-                    const std::shared_ptr<::arrow::DataType>& type, Datum* out) {
-    DCHECK_EQ(type->id(), ::arrow::Type::DECIMAL);
-
-    ::arrow::ArrayVector chunks = reader->GetBuilderChunks();
-
-    for (size_t i = 0; i < chunks.size(); ++i) {
-      std::shared_ptr<Array> chunk_as_decimal;
-      RETURN_NOT_OK(
-          ConvertToDecimal128<ParquetType>(*chunks[i], type, pool, &chunk_as_decimal));
-
-      // Replace the chunk, which will hopefully also free memory as we go
-      chunks[i] = chunk_as_decimal;
-    }
-    *out = std::make_shared<ChunkedArray>(chunks);
-    return Status::OK();
-  }
-};
-
 /// \brief Convert an Int32 or Int64 array into a Decimal128Array
 /// The parquet spec allows systems to write decimals in int32, int64 if the values are
 /// small enough to fit in less 4 bytes or less than 8 bytes, respectively.
@@ -1445,8 +1419,7 @@ template <typename ParquetIntegerType,
               std::is_same<ParquetIntegerType, Int32Type>::value ||
               std::is_same<ParquetIntegerType, Int64Type>::value>::type>
 static Status DecimalIntegerTransfer(RecordReader* reader, MemoryPool* pool,
-                                     const std::shared_ptr<::arrow::DataType>& type,
-                                     Datum* out) {
+                                     const std::shared_ptr<DataType>& type, Datum* out) {
   DCHECK_EQ(type->id(), ::arrow::Type::DECIMAL);
 
   const int64_t length = reader->values_written();
@@ -1490,32 +1463,62 @@ static Status DecimalIntegerTransfer(RecordReader* reader, MemoryPool* pool,
   return Status::OK();
 }
 
-template <>
-struct TransferFunctor<::arrow::Decimal128Type, Int32Type> {
+/// \brief Convert an arrow::BinaryArray to an arrow::Decimal128Array
+/// We do this by:
+/// 1. Creating an arrow::BinaryArray from the RecordReader's builder
+/// 2. Allocating a buffer for the arrow::Decimal128Array
+/// 3. Converting the big-endian bytes in each BinaryArray entry to two integers
+///    representing the high and low bits of each decimal value.
+template <typename ArrowType, typename ParquetType>
+struct TransferFunctor<
+    ArrowType, ParquetType,
+    typename std::enable_if<std::is_same<ArrowType, ::arrow::Decimal128Type>::value &&
+                            (std::is_same<ParquetType, ByteArrayType>::value ||
+                             std::is_same<ParquetType, FLBAType>::value)>::type> {
   Status operator()(RecordReader* reader, MemoryPool* pool,
-                    const std::shared_ptr<::arrow::DataType>& type, Datum* out) {
-    return DecimalIntegerTransfer<Int32Type>(reader, pool, type, out);
-  }
-};
+                    const std::shared_ptr<DataType>& type, Datum* out) {
+    DCHECK_EQ(type->id(), ::arrow::Type::DECIMAL);
 
-template <>
-struct TransferFunctor<::arrow::Decimal128Type, Int64Type> {
-  Status operator()(RecordReader* reader, MemoryPool* pool,
-                    const std::shared_ptr<::arrow::DataType>& type, Datum* out) {
-    return DecimalIntegerTransfer<Int64Type>(reader, pool, type, out);
+    auto binary_reader = dynamic_cast<internal::BinaryRecordReader*>(reader);
+    DCHECK(binary_reader);
+    ::arrow::ArrayVector chunks = binary_reader->GetBuilderChunks();
+
+    for (size_t i = 0; i < chunks.size(); ++i) {
+      std::shared_ptr<Array> chunk_as_decimal;
+      RETURN_NOT_OK(
+          ConvertToDecimal128<ParquetType>(*chunks[i], type, pool, &chunk_as_decimal));
+
+      // Replace the chunk, which will hopefully also free memory as we go
+      chunks[i] = chunk_as_decimal;
+    }
+    *out = std::make_shared<ChunkedArray>(chunks);
+    return Status::OK();
   }
 };
 
-#define TRANSFER_DATA(ArrowType, ParquetType)                                \
-  TransferFunctor<ArrowType, ParquetType> func;                              \
-  RETURN_NOT_OK(func(record_reader_.get(), pool_, field_->type(), &result)); \
-  RETURN_NOT_OK(WrapIntoListArray<ParquetType>(&result))
+#define TRANSFER_DATA(ArrowType, ParquetType)   \
+  TransferFunctor<ArrowType, ParquetType> func; \
+  RETURN_NOT_OK(func(record_reader_.get(), pool_, value_type, &result));
 
 #define TRANSFER_CASE(ENUM, ArrowType, ParquetType) \
   case ::arrow::Type::ENUM: {                       \
     TRANSFER_DATA(ArrowType, ParquetType);          \
   } break;
 
+#define TRANSFER_INT32(ENUM, ArrowType)                                       \
+  case ::arrow::Type::ENUM: {                                                 \
+    Status s = TransferInt<ArrowType, Int32Type>(record_reader_.get(), pool_, \
+                                                 value_type, &result);        \
+    RETURN_NOT_OK(s);                                                         \
+  } break;
+
+#define TRANSFER_INT64(ENUM, ArrowType)                                       \
+  case ::arrow::Type::ENUM: {                                                 \
+    Status s = TransferInt<ArrowType, Int64Type>(record_reader_.get(), pool_, \
+                                                 value_type, &result);        \
+    RETURN_NOT_OK(s);                                                         \
+  } break;
+
 Status PrimitiveImpl::NextBatch(int64_t records_to_read,
                                 std::shared_ptr<ChunkedArray>* out) {
   try {
@@ -1537,36 +1540,52 @@ Status PrimitiveImpl::NextBatch(int64_t records_to_read,
     return ::arrow::Status::IOError(e.what());
   }
 
+  std::shared_ptr<DataType> value_type = field_->type();
+
   Datum result;
-  switch (field_->type()->id()) {
-    TRANSFER_CASE(BOOL, ::arrow::BooleanType, BooleanType)
-    TRANSFER_CASE(UINT8, ::arrow::UInt8Type, Int32Type)
-    TRANSFER_CASE(INT8, ::arrow::Int8Type, Int32Type)
-    TRANSFER_CASE(UINT16, ::arrow::UInt16Type, Int32Type)
-    TRANSFER_CASE(INT16, ::arrow::Int16Type, Int32Type)
-    TRANSFER_CASE(UINT32, ::arrow::UInt32Type, Int32Type)
-    TRANSFER_CASE(INT32, ::arrow::Int32Type, Int32Type)
-    TRANSFER_CASE(UINT64, ::arrow::UInt64Type, Int64Type)
-    TRANSFER_CASE(INT64, ::arrow::Int64Type, Int64Type)
-    TRANSFER_CASE(FLOAT, ::arrow::FloatType, FloatType)
-    TRANSFER_CASE(DOUBLE, ::arrow::DoubleType, DoubleType)
-    TRANSFER_CASE(STRING, ::arrow::StringType, ByteArrayType)
-    TRANSFER_CASE(BINARY, ::arrow::BinaryType, ByteArrayType)
-    TRANSFER_CASE(DATE32, ::arrow::Date32Type, Int32Type)
-    TRANSFER_CASE(DATE64, ::arrow::Date64Type, Int32Type)
-    TRANSFER_CASE(FIXED_SIZE_BINARY, ::arrow::FixedSizeBinaryType, FLBAType)
+  switch (value_type->id()) {
     case ::arrow::Type::NA: {
       result = std::make_shared<::arrow::NullArray>(record_reader_->values_written());
-      RETURN_NOT_OK(WrapIntoListArray<Int32Type>(&result));
       break;
     }
+    case ::arrow::Type::INT32:
+    case ::arrow::Type::INT64:
+    case ::arrow::Type::FLOAT:
+    case ::arrow::Type::DOUBLE:
+      result = TransferZeroCopy(record_reader_.get(), value_type);
+      break;
+    case ::arrow::Type::BOOL:
+      RETURN_NOT_OK(TransferBool(record_reader_.get(), pool_, &result));
+      break;
+      TRANSFER_INT32(UINT8, ::arrow::UInt8Type);
+      TRANSFER_INT32(INT8, ::arrow::Int8Type);
+      TRANSFER_INT32(UINT16, ::arrow::UInt16Type);
+      TRANSFER_INT32(INT16, ::arrow::Int16Type);
+      TRANSFER_INT32(UINT32, ::arrow::UInt32Type);
+      TRANSFER_INT64(UINT64, ::arrow::UInt64Type);
+      TRANSFER_INT32(DATE32, ::arrow::Date32Type);
+      TRANSFER_INT32(TIME32, ::arrow::Time32Type);
+      TRANSFER_INT64(TIME64, ::arrow::Time64Type);
+    case ::arrow::Type::DATE64:
+      RETURN_NOT_OK(TransferDate64(record_reader_.get(), pool_, value_type, &result));
+      break;
+    case ::arrow::Type::FIXED_SIZE_BINARY:
+    case ::arrow::Type::BINARY:
+    case ::arrow::Type::STRING: {
+      std::shared_ptr<ChunkedArray> out;
+      RETURN_NOT_OK(
+          TransferBinary(record_reader_.get(), value_type, read_dictionary_, &out));
+      result = out;
+    } break;
     case ::arrow::Type::DECIMAL: {
       switch (descr_->physical_type()) {
         case ::parquet::Type::INT32: {
-          TRANSFER_DATA(::arrow::Decimal128Type, Int32Type);
+          RETURN_NOT_OK(DecimalIntegerTransfer<Int32Type>(record_reader_.get(), pool_,
+                                                          value_type, &result));
         } break;
         case ::parquet::Type::INT64: {
-          TRANSFER_DATA(::arrow::Decimal128Type, Int64Type);
+          RETURN_NOT_OK(DecimalIntegerTransfer<Int64Type>(record_reader_.get(), pool_,
+                                                          value_type, &result));
         } break;
         case ::parquet::Type::BYTE_ARRAY: {
           TRANSFER_DATA(::arrow::Decimal128Type, ByteArrayType);
@@ -1581,31 +1600,32 @@ Status PrimitiveImpl::NextBatch(int64_t records_to_read,
       }
     } break;
     case ::arrow::Type::TIMESTAMP: {
-      ::arrow::TimestampType* timestamp_type =
-          static_cast<::arrow::TimestampType*>(field_->type().get());
-      switch (timestamp_type->unit()) {
+      const ::arrow::TimestampType& timestamp_type =
+          static_cast<::arrow::TimestampType&>(*value_type);
+      switch (timestamp_type.unit()) {
         case ::arrow::TimeUnit::MILLI:
         case ::arrow::TimeUnit::MICRO: {
-          TRANSFER_DATA(::arrow::TimestampType, Int64Type);
+          result = TransferZeroCopy(record_reader_.get(), value_type);
         } break;
         case ::arrow::TimeUnit::NANO: {
           if (descr_->physical_type() == ::parquet::Type::INT96) {
             TRANSFER_DATA(::arrow::TimestampType, Int96Type);
           } else {
-            TRANSFER_DATA(::arrow::TimestampType, Int64Type);
+            result = TransferZeroCopy(record_reader_.get(), value_type);
           }
         } break;
         default:
           return Status::NotImplemented("TimeUnit not supported");
       }
     } break;
-      TRANSFER_CASE(TIME32, ::arrow::Time32Type, Int32Type)
-      TRANSFER_CASE(TIME64, ::arrow::Time64Type, Int64Type)
     default:
       return Status::NotImplemented("No support for reading columns of type ",
-                                    field_->type()->ToString());
+                                    value_type->ToString());
   }
 
+  // Nest nested types
+  RETURN_NOT_OK(WrapIntoListArray(&result));
+
   DCHECK_NE(result.kind(), Datum::NONE);
 
   if (result.kind() == Datum::ARRAY) {
diff --git a/cpp/src/parquet/arrow/reader.h b/cpp/src/parquet/arrow/reader.h
index 97e93b9..3491314 100644
--- a/cpp/src/parquet/arrow/reader.h
+++ b/cpp/src/parquet/arrow/reader.h
@@ -156,8 +156,14 @@ ArrowReaderProperties default_arrow_reader_properties();
 // arrays
 class PARQUET_EXPORT FileReader {
  public:
-  FileReader(::arrow::MemoryPool* pool, std::unique_ptr<ParquetFileReader> reader,
-             const ArrowReaderProperties& properties = default_arrow_reader_properties());
+  static ::arrow::Status Make(::arrow::MemoryPool* pool,
+                              std::unique_ptr<ParquetFileReader> reader,
+                              const ArrowReaderProperties& properties,
+                              std::unique_ptr<FileReader>* out);
+
+  static ::arrow::Status Make(::arrow::MemoryPool* pool,
+                              std::unique_ptr<ParquetFileReader> reader,
+                              std::unique_ptr<FileReader>* out);
 
   // Since the distribution of columns amongst a Parquet file's row groups may
   // be uneven (the number of values in each column chunk can be different), we
@@ -261,6 +267,9 @@ class PARQUET_EXPORT FileReader {
   virtual ~FileReader();
 
  private:
+  FileReader(::arrow::MemoryPool* pool, std::unique_ptr<ParquetFileReader> reader,
+             const ArrowReaderProperties& properties);
+
   friend ColumnChunkReader;
   friend RowGroupReader;
 
diff --git a/cpp/src/parquet/arrow/schema.cc b/cpp/src/parquet/arrow/schema.cc
index f77bf38..1648f0e 100644
--- a/cpp/src/parquet/arrow/schema.cc
+++ b/cpp/src/parquet/arrow/schema.cc
@@ -29,6 +29,7 @@
 #include "arrow/util/checked_cast.h"
 #include "arrow/util/logging.h"
 
+#include "parquet/arrow/reader.h"
 #include "parquet/arrow/writer.h"
 #include "parquet/exception.h"
 #include "parquet/properties.h"
@@ -424,8 +425,13 @@ Status NodeToFieldInternal(const Node& node,
   return Status::OK();
 }
 
+std::shared_ptr<Field> ToDictionary32(const Field& field) {
+  auto new_ty = ::arrow::dictionary(::arrow::int32(), field.type());
+  return field.WithType(new_ty);
+}
+
 Status FromParquetSchema(
-    const SchemaDescriptor* parquet_schema,
+    const SchemaDescriptor* parquet_schema, const ArrowReaderProperties& properties,
     const std::shared_ptr<const KeyValueMetadata>& key_value_metadata,
     std::shared_ptr<::arrow::Schema>* out) {
   const GroupNode& schema_node = *parquet_schema->group_node();
@@ -435,13 +441,13 @@ Status FromParquetSchema(
   for (int i = 0; i < num_fields; i++) {
     RETURN_NOT_OK(NodeToField(*schema_node.field(i), &fields[i]));
   }
-
   *out = std::make_shared<::arrow::Schema>(fields, key_value_metadata);
   return Status::OK();
 }
 
 Status FromParquetSchema(
     const SchemaDescriptor* parquet_schema, const std::vector<int>& column_indices,
+    const ArrowReaderProperties& properties,
     const std::shared_ptr<const KeyValueMetadata>& key_value_metadata,
     std::shared_ptr<::arrow::Schema>* out) {
   // TODO(wesm): Consider adding an arrow::Schema name attribute, which comes
@@ -456,10 +462,12 @@ Status FromParquetSchema(
   std::unordered_set<const Node*> included_leaf_nodes(num_columns);
   for (int i = 0; i < num_columns; i++) {
     const ColumnDescriptor* column_desc = parquet_schema->Column(column_indices[i]);
-    included_leaf_nodes.insert(column_desc->schema_node().get());
+    const Node* node = column_desc->schema_node().get();
+
+    included_leaf_nodes.insert(node);
     const Node* column_root = parquet_schema->GetColumnRoot(column_indices[i]);
-    auto insertion = top_nodes.insert(column_root);
-    if (insertion.second) {
+    auto it = top_nodes.insert(column_root);
+    if (it.second) {
       base_nodes.push_back(column_root);
     }
   }
@@ -479,13 +487,15 @@ Status FromParquetSchema(
 
 Status FromParquetSchema(const SchemaDescriptor* parquet_schema,
                          const std::vector<int>& column_indices,
+                         const ArrowReaderProperties& properties,
                          std::shared_ptr<::arrow::Schema>* out) {
-  return FromParquetSchema(parquet_schema, column_indices, nullptr, out);
+  return FromParquetSchema(parquet_schema, column_indices, properties, nullptr, out);
 }
 
 Status FromParquetSchema(const SchemaDescriptor* parquet_schema,
+                         const ArrowReaderProperties& properties,
                          std::shared_ptr<::arrow::Schema>* out) {
-  return FromParquetSchema(parquet_schema, nullptr, out);
+  return FromParquetSchema(parquet_schema, properties, nullptr, out);
 }
 
 Status ListToNode(const std::shared_ptr<::arrow::ListType>& type, const std::string& name,
diff --git a/cpp/src/parquet/arrow/schema.h b/cpp/src/parquet/arrow/schema.h
index 52fb843..477597e 100644
--- a/cpp/src/parquet/arrow/schema.h
+++ b/cpp/src/parquet/arrow/schema.h
@@ -40,6 +40,7 @@ class WriterProperties;
 
 namespace arrow {
 
+class ArrowReaderProperties;
 class ArrowWriterProperties;
 
 PARQUET_EXPORT
@@ -51,44 +52,53 @@ PARQUET_EXPORT
 /// \param column_indices indices of leaf nodes in parquet schema tree. Appearing ordering
 ///                       matters for the converted schema. Repeated indices are ignored
 ///                       except for the first one
+/// \param properties reader options for FileReader
 /// \param key_value_metadata optional metadata, can be nullptr
 /// \param out the corresponding arrow schema
 /// \return Status::OK() on a successful conversion.
 PARQUET_EXPORT
 ::arrow::Status FromParquetSchema(
     const SchemaDescriptor* parquet_schema, const std::vector<int>& column_indices,
+    const ArrowReaderProperties& properties,
     const std::shared_ptr<const KeyValueMetadata>& key_value_metadata,
     std::shared_ptr<::arrow::Schema>* out);
 
 // Without indices
 PARQUET_EXPORT
 ::arrow::Status FromParquetSchema(
-    const SchemaDescriptor* parquet_schema,
+    const SchemaDescriptor* parquet_schema, const ArrowReaderProperties& properties,
     const std::shared_ptr<const KeyValueMetadata>& key_value_metadata,
     std::shared_ptr<::arrow::Schema>* out);
 
 // Without metadata
-::arrow::Status PARQUET_EXPORT FromParquetSchema(const SchemaDescriptor* parquet_schema,
-                                                 const std::vector<int>& column_indices,
-                                                 std::shared_ptr<::arrow::Schema>* out);
+PARQUET_EXPORT
+::arrow::Status FromParquetSchema(const SchemaDescriptor* parquet_schema,
+                                  const std::vector<int>& column_indices,
+                                  const ArrowReaderProperties& properties,
+                                  std::shared_ptr<::arrow::Schema>* out);
 
 // Without metadata or indices
-::arrow::Status PARQUET_EXPORT FromParquetSchema(const SchemaDescriptor* parquet_schema,
-                                                 std::shared_ptr<::arrow::Schema>* out);
-
-::arrow::Status PARQUET_EXPORT FieldToNode(const std::shared_ptr<::arrow::Field>& field,
-                                           const WriterProperties& properties,
-                                           const ArrowWriterProperties& arrow_properties,
-                                           schema::NodePtr* out);
-
-::arrow::Status PARQUET_EXPORT
-ToParquetSchema(const ::arrow::Schema* arrow_schema, const WriterProperties& properties,
-                const ArrowWriterProperties& arrow_properties,
-                std::shared_ptr<SchemaDescriptor>* out);
-
-::arrow::Status PARQUET_EXPORT ToParquetSchema(const ::arrow::Schema* arrow_schema,
-                                               const WriterProperties& properties,
-                                               std::shared_ptr<SchemaDescriptor>* out);
+PARQUET_EXPORT
+::arrow::Status FromParquetSchema(const SchemaDescriptor* parquet_schema,
+                                  const ArrowReaderProperties& properties,
+                                  std::shared_ptr<::arrow::Schema>* out);
+
+PARQUET_EXPORT
+::arrow::Status FieldToNode(const std::shared_ptr<::arrow::Field>& field,
+                            const WriterProperties& properties,
+                            const ArrowWriterProperties& arrow_properties,
+                            schema::NodePtr* out);
+
+PARQUET_EXPORT
+::arrow::Status ToParquetSchema(const ::arrow::Schema* arrow_schema,
+                                const WriterProperties& properties,
+                                const ArrowWriterProperties& arrow_properties,
+                                std::shared_ptr<SchemaDescriptor>* out);
+
+PARQUET_EXPORT
+::arrow::Status ToParquetSchema(const ::arrow::Schema* arrow_schema,
+                                const WriterProperties& properties,
+                                std::shared_ptr<SchemaDescriptor>* out);
 
 PARQUET_EXPORT
 int32_t DecimalSize(int32_t precision);
diff --git a/cpp/src/parquet/arrow/test-util.h b/cpp/src/parquet/arrow/test-util.h
index 8760d91..cc4774f 100644
--- a/cpp/src/parquet/arrow/test-util.h
+++ b/cpp/src/parquet/arrow/test-util.h
@@ -39,6 +39,7 @@ using internal::RecordReader;
 namespace arrow {
 
 using ::arrow::Array;
+using ::arrow::ChunkedArray;
 using ::arrow::Status;
 
 template <int32_t PRECISION>
@@ -429,11 +430,16 @@ Status MakeEmptyListsArray(int64_t size, std::shared_ptr<Array>* out_array) {
   return Status::OK();
 }
 
+std::shared_ptr<::arrow::Table> MakeSimpleTable(
+    const std::shared_ptr<ChunkedArray>& values, bool nullable) {
+  auto schema = ::arrow::schema({::arrow::field("col", values->type(), nullable)});
+  return ::arrow::Table::Make(schema, {values});
+}
+
 std::shared_ptr<::arrow::Table> MakeSimpleTable(const std::shared_ptr<Array>& values,
                                                 bool nullable) {
   auto carr = std::make_shared<::arrow::ChunkedArray>(values);
-  auto schema = ::arrow::schema({::arrow::field("col", values->type(), nullable)});
-  return ::arrow::Table::Make(schema, {carr});
+  return MakeSimpleTable(carr, nullable);
 }
 
 template <typename T>
diff --git a/cpp/src/parquet/arrow/writer.cc b/cpp/src/parquet/arrow/writer.cc
index fd40319..f2096d7 100644
--- a/cpp/src/parquet/arrow/writer.cc
+++ b/cpp/src/parquet/arrow/writer.cc
@@ -34,6 +34,7 @@
 
 #include "arrow/util/logging.h"
 
+#include "parquet/arrow/reader.h"
 #include "parquet/arrow/schema.h"
 #include "parquet/column_writer.h"
 #include "parquet/deprecated_io.h"
@@ -1079,6 +1080,7 @@ class FileWriter::Impl {
     int current_column_idx = row_group_writer_->current_column();
     std::shared_ptr<::arrow::Schema> arrow_schema;
     RETURN_NOT_OK(FromParquetSchema(writer_->schema(), {current_column_idx - 1},
+                                    default_arrow_reader_properties(),
                                     writer_->key_value_metadata(), &arrow_schema));
 
     ArrowColumnWriter arrow_writer(&column_write_context_, column_writer,
diff --git a/cpp/src/parquet/column_reader.cc b/cpp/src/parquet/column_reader.cc
index 6727fe6..4562d7b 100644
--- a/cpp/src/parquet/column_reader.cc
+++ b/cpp/src/parquet/column_reader.cc
@@ -25,7 +25,9 @@
 
 #include "arrow/buffer.h"
 #include "arrow/builder.h"
+#include "arrow/table.h"
 #include "arrow/util/bit-stream-utils.h"
+#include "arrow/util/checked_cast.h"
 #include "arrow/util/compression.h"
 #include "arrow/util/logging.h"
 #include "arrow/util/rle-encoding.h"
@@ -38,6 +40,7 @@
 #include "parquet/thrift.h"
 
 using arrow::MemoryPool;
+using arrow::internal::checked_cast;
 
 namespace parquet {
 
@@ -286,7 +289,8 @@ class ColumnReaderImplBase {
         num_buffered_values_(0),
         num_decoded_values_(0),
         pool_(pool),
-        current_decoder_(nullptr) {}
+        current_decoder_(nullptr),
+        current_encoding_(Encoding::UNKNOWN) {}
 
   virtual ~ColumnReaderImplBase() = default;
 
@@ -409,6 +413,7 @@ class ColumnReaderImplBase {
       ParquetException::NYI("only plain dictionary encoding has been implemented");
     }
 
+    new_dictionary_ = true;
     current_decoder_ = decoders_[encoding].get();
     DCHECK(current_decoder_);
   }
@@ -493,6 +498,7 @@ class ColumnReaderImplBase {
           throw ParquetException("Unknown encoding type.");
       }
     }
+    current_encoding_ = encoding;
     current_decoder_->SetData(static_cast<int>(num_buffered_values_), buffer,
                               static_cast<int>(data_size));
   }
@@ -526,6 +532,11 @@ class ColumnReaderImplBase {
 
   using DecoderType = typename EncodingTraits<DType>::Decoder;
   DecoderType* current_decoder_;
+  Encoding::type current_encoding_;
+
+  /// Flag to signal when a new dictionary has been set, for the benefit of
+  /// DictionaryRecordReader
+  bool new_dictionary_;
 
   // Map of encoding type to the respective decoder object. For example, a
   // column chunk's data pages may include both dictionary-encoded and
@@ -770,7 +781,8 @@ namespace internal {
 constexpr int64_t kMinLevelBatchSize = 1024;
 
 template <typename DType>
-class TypedRecordReader : public ColumnReaderImplBase<DType>, public RecordReader {
+class TypedRecordReader : public ColumnReaderImplBase<DType>,
+                          virtual public RecordReader {
  public:
   using T = typename DType::c_type;
   using BASE = ColumnReaderImplBase<DType>;
@@ -883,9 +895,13 @@ class TypedRecordReader : public ColumnReaderImplBase<DType>, public RecordReade
   }
 
   std::shared_ptr<ResizableBuffer> ReleaseIsValid() override {
-    auto result = valid_bits_;
-    valid_bits_ = AllocateBuffer(this->pool_);
-    return result;
+    if (nullable_values_) {
+      auto result = valid_bits_;
+      valid_bits_ = AllocateBuffer(this->pool_);
+      return result;
+    } else {
+      return nullptr;
+    }
   }
 
   // Process written repetition/definition levels to reach the end of
@@ -1113,10 +1129,6 @@ class TypedRecordReader : public ColumnReaderImplBase<DType>, public RecordReade
     std::cout << std::endl;
   }
 
-  std::vector<std::shared_ptr<::arrow::Array>> GetBuilderChunks() override {
-    throw ParquetException("GetChunks only implemented for binary types");
-  }
-
   void ResetValues() {
     if (values_written_ > 0) {
       // Resize to 0, but do not shrink to fit
@@ -1137,7 +1149,8 @@ class TypedRecordReader : public ColumnReaderImplBase<DType>, public RecordReade
   }
 };
 
-class FLBARecordReader : public TypedRecordReader<FLBAType> {
+class FLBARecordReader : public TypedRecordReader<FLBAType>,
+                         virtual public BinaryRecordReader {
  public:
   FLBARecordReader(const ColumnDescriptor* descr, ::arrow::MemoryPool* pool)
       : TypedRecordReader<FLBAType>(descr, pool), builder_(nullptr) {
@@ -1189,22 +1202,16 @@ class FLBARecordReader : public TypedRecordReader<FLBAType> {
   std::unique_ptr<::arrow::FixedSizeBinaryBuilder> builder_;
 };
 
-class ByteArrayChunkedRecordReader : public TypedRecordReader<ByteArrayType> {
+class ByteArrayChunkedRecordReader : public TypedRecordReader<ByteArrayType>,
+                                     virtual public BinaryRecordReader {
  public:
-  using BuilderType = ::arrow::internal::ChunkedBinaryBuilder;
-
   ByteArrayChunkedRecordReader(const ColumnDescriptor* descr, ::arrow::MemoryPool* pool)
       : TypedRecordReader<ByteArrayType>(descr, pool), builder_(nullptr) {
     // ARROW-4688(wesm): Using 2^31 - 1 chunks for now
     constexpr int32_t kBinaryChunksize = 2147483647;
     DCHECK_EQ(descr_->physical_type(), Type::BYTE_ARRAY);
-    if (descr_->converted_type() == ConvertedType::UTF8) {
-      builder_.reset(
-          new ::arrow::internal::ChunkedStringBuilder(kBinaryChunksize, this->pool_));
-    } else {
-      builder_.reset(
-          new ::arrow::internal::ChunkedBinaryBuilder(kBinaryChunksize, this->pool_));
-    }
+    builder_.reset(
+        new ::arrow::internal::ChunkedBinaryBuilder(kBinaryChunksize, this->pool_));
   }
 
   ::arrow::ArrayVector GetBuilderChunks() override {
@@ -1229,39 +1236,83 @@ class ByteArrayChunkedRecordReader : public TypedRecordReader<ByteArrayType> {
   }
 
  private:
-  std::unique_ptr<BuilderType> builder_;
+  std::unique_ptr<::arrow::internal::ChunkedBinaryBuilder> builder_;
 };
 
-template <typename BuilderType>
-class ByteArrayDictionaryRecordReader : public TypedRecordReader<ByteArrayType> {
+class ByteArrayDictionaryRecordReader : public TypedRecordReader<ByteArrayType>,
+                                        virtual public DictionaryRecordReader {
  public:
   ByteArrayDictionaryRecordReader(const ColumnDescriptor* descr,
                                   ::arrow::MemoryPool* pool)
-      : TypedRecordReader<ByteArrayType>(descr, pool), builder_(new BuilderType(pool)) {}
+      : TypedRecordReader<ByteArrayType>(descr, pool), builder_(pool) {}
 
-  ::arrow::ArrayVector GetBuilderChunks() override {
-    std::shared_ptr<::arrow::Array> chunk;
-    PARQUET_THROW_NOT_OK(builder_->Finish(&chunk));
-    return ::arrow::ArrayVector({chunk});
+  std::shared_ptr<::arrow::ChunkedArray> GetResult() override {
+    FlushBuilder();
+    return std::make_shared<::arrow::ChunkedArray>(result_chunks_);
+  }
+
+  void FlushBuilder() {
+    if (builder_.length() > 0) {
+      std::shared_ptr<::arrow::Array> chunk;
+      PARQUET_THROW_NOT_OK(builder_.Finish(&chunk));
+      result_chunks_.emplace_back(std::move(chunk));
+
+      // Reset clears the dictionary memo table
+      builder_.Reset();
+    }
+  }
+
+  void MaybeWriteNewDictionary() {
+    if (this->new_dictionary_) {
+      /// If there is a new dictionary, we may need to flush the builder, then
+      /// insert the new dictionary values
+      FlushBuilder();
+      auto decoder = dynamic_cast<BinaryDictDecoder*>(this->current_decoder_);
+      decoder->InsertDictionary(&builder_);
+      this->new_dictionary_ = false;
+    }
   }
 
   void ReadValuesDense(int64_t values_to_read) override {
-    int64_t num_decoded = this->current_decoder_->DecodeArrowNonNull(
-        static_cast<int>(values_to_read), builder_.get());
+    int64_t num_decoded = 0;
+    if (current_encoding_ == Encoding::RLE_DICTIONARY) {
+      MaybeWriteNewDictionary();
+      auto decoder = dynamic_cast<BinaryDictDecoder*>(this->current_decoder_);
+      num_decoded = decoder->DecodeIndices(static_cast<int>(values_to_read), &builder_);
+    } else {
+      num_decoded = this->current_decoder_->DecodeArrowNonNull(
+          static_cast<int>(values_to_read), &builder_);
+
+      /// Flush values since they have been copied into the builder
+      ResetValues();
+    }
     DCHECK_EQ(num_decoded, values_to_read);
-    ResetValues();
   }
 
   void ReadValuesSpaced(int64_t values_to_read, int64_t null_count) override {
-    int64_t num_decoded = this->current_decoder_->DecodeArrow(
-        static_cast<int>(values_to_read), static_cast<int>(null_count),
-        valid_bits_->mutable_data(), values_written_, builder_.get());
+    int64_t num_decoded = 0;
+    if (current_encoding_ == Encoding::RLE_DICTIONARY) {
+      MaybeWriteNewDictionary();
+      auto decoder = dynamic_cast<BinaryDictDecoder*>(this->current_decoder_);
+      num_decoded = decoder->DecodeIndicesSpaced(
+          static_cast<int>(values_to_read), static_cast<int>(null_count),
+          valid_bits_->mutable_data(), values_written_, &builder_);
+    } else {
+      num_decoded = this->current_decoder_->DecodeArrow(
+          static_cast<int>(values_to_read), static_cast<int>(null_count),
+          valid_bits_->mutable_data(), values_written_, &builder_);
+
+      /// Flush values since they have been copied into the builder
+      ResetValues();
+    }
     DCHECK_EQ(num_decoded, values_to_read);
-    ResetValues();
   }
 
  private:
-  std::unique_ptr<BuilderType> builder_;
+  using BinaryDictDecoder = DictDecoder<ByteArrayType>;
+
+  ::arrow::BinaryDictionaryBuilder builder_;
+  std::vector<std::shared_ptr<::arrow::Array>> result_chunks_;
 };
 
 // TODO(wesm): Implement these to some satisfaction
@@ -1278,13 +1329,7 @@ std::shared_ptr<RecordReader> MakeByteArrayRecordReader(const ColumnDescriptor*
                                                         arrow::MemoryPool* pool,
                                                         bool read_dictionary) {
   if (read_dictionary) {
-    if (descr->converted_type() == ConvertedType::UTF8) {
-      using Builder = ::arrow::StringDictionaryBuilder;
-      return std::make_shared<ByteArrayDictionaryRecordReader<Builder>>(descr, pool);
-    } else {
-      using Builder = ::arrow::BinaryDictionaryBuilder;
-      return std::make_shared<ByteArrayDictionaryRecordReader<Builder>>(descr, pool);
-    }
+    return std::make_shared<ByteArrayDictionaryRecordReader>(descr, pool);
   } else {
     return std::make_shared<ByteArrayChunkedRecordReader>(descr, pool);
   }
diff --git a/cpp/src/parquet/column_reader.h b/cpp/src/parquet/column_reader.h
index 461cf72..8022be9 100644
--- a/cpp/src/parquet/column_reader.h
+++ b/cpp/src/parquet/column_reader.h
@@ -33,6 +33,7 @@
 namespace arrow {
 
 class Array;
+class ChunkedArray;
 
 namespace BitUtil {
 class BitReader;
@@ -225,9 +226,6 @@ class RecordReader {
 
   virtual void DebugPrintState() = 0;
 
-  // For BYTE_ARRAY, FIXED_LEN_BYTE_ARRAY types that may have chunked output
-  virtual std::vector<std::shared_ptr<::arrow::Array>> GetBuilderChunks() = 0;
-
   /// \brief Decoded definition levels
   int16_t* def_levels() const {
     return reinterpret_cast<int16_t*>(def_levels_->mutable_data());
@@ -282,6 +280,18 @@ class RecordReader {
   std::shared_ptr<::arrow::ResizableBuffer> rep_levels_;
 };
 
+class BinaryRecordReader : virtual public RecordReader {
+ public:
+  virtual std::vector<std::shared_ptr<::arrow::Array>> GetBuilderChunks() = 0;
+};
+
+/// \brief Read records directly to dictionary-encoded Arrow form (int32
+/// indices). Only valid for BYTE_ARRAY columns
+class DictionaryRecordReader : virtual public RecordReader {
+ public:
+  virtual std::shared_ptr<::arrow::ChunkedArray> GetResult() = 0;
+};
+
 static inline void DefinitionLevelsToBitmap(
     const int16_t* def_levels, int64_t num_def_levels, const int16_t max_definition_level,
     const int16_t max_repetition_level, int64_t* values_read, int64_t* null_count,
diff --git a/cpp/src/parquet/encoding-test.cc b/cpp/src/parquet/encoding-test.cc
index cca9edd..799b81d 100644
--- a/cpp/src/parquet/encoding-test.cc
+++ b/cpp/src/parquet/encoding-test.cc
@@ -324,26 +324,11 @@ TEST(TestDictionaryEncoding, CannotDictDecodeBoolean) {
 
 // ----------------------------------------------------------------------
 // Shared arrow builder decode tests
-template <typename T>
-struct BuilderTraits {};
-
-template <>
-struct BuilderTraits<::arrow::BinaryType> {
-  using DenseArrayBuilder = ::arrow::internal::ChunkedBinaryBuilder;
-  using DictArrayBuilder = ::arrow::BinaryDictionaryBuilder;
-};
 
-template <>
-struct BuilderTraits<::arrow::StringType> {
-  using DenseArrayBuilder = ::arrow::internal::ChunkedStringBuilder;
-  using DictArrayBuilder = ::arrow::StringDictionaryBuilder;
-};
-
-template <typename DType>
 class TestArrowBuilderDecoding : public ::testing::Test {
  public:
-  using DenseBuilder = typename BuilderTraits<DType>::DenseArrayBuilder;
-  using DictBuilder = typename BuilderTraits<DType>::DictArrayBuilder;
+  using DenseBuilder = ::arrow::internal::ChunkedBinaryBuilder;
+  using DictBuilder = ::arrow::BinaryDictionaryBuilder;
 
   void SetUp() override { null_probabilities_ = {0.0, 0.5, 1.0}; }
   void TearDown() override {}
@@ -355,23 +340,13 @@ class TestArrowBuilderDecoding : public ::testing::Test {
 
   void GenerateInputData(double null_probability) {
     constexpr int num_unique = 100;
-    constexpr int repeat = 10;
+    constexpr int repeat = 100;
     constexpr int64_t min_length = 2;
     constexpr int64_t max_length = 10;
     ::arrow::random::RandomArrayGenerator rag(0);
-    expected_dense_ = rag.StringWithRepeats(repeat * num_unique, num_unique, min_length,
+    expected_dense_ = rag.BinaryWithRepeats(repeat * num_unique, num_unique, min_length,
                                             max_length, null_probability);
 
-    std::shared_ptr<::arrow::DataType> data_type = std::make_shared<DType>();
-
-    if (data_type->id() == ::arrow::BinaryType::type_id) {
-      // TODO(hatemhelal):  this is a kludge.  Probably best to extend the
-      // RandomArrayGenerator to also generate BinaryType arrays.
-      auto data = expected_dense_->data()->Copy();
-      data->type = data_type;
-      expected_dense_ = std::make_shared<::arrow::BinaryArray>(data);
-    }
-
     num_values_ = static_cast<int>(expected_dense_->length());
     null_count_ = static_cast<int>(expected_dense_->null_count());
     valid_bits_ = expected_dense_->null_bitmap()->data();
@@ -471,55 +446,41 @@ class TestArrowBuilderDecoding : public ::testing::Test {
   std::vector<ByteArray> input_data_;
   const uint8_t* valid_bits_;
   std::unique_ptr<ByteArrayEncoder> encoder_;
-  std::unique_ptr<ByteArrayDecoder> decoder_;
+  ByteArrayDecoder* decoder_;
+  std::unique_ptr<ByteArrayDecoder> plain_decoder_;
+  std::unique_ptr<DictDecoder<ByteArrayType>> dict_decoder_;
   std::shared_ptr<Buffer> buffer_;
 };
 
-#define TEST_ARROW_BUILDER_BASE_MEMBERS()             \
-  using TestArrowBuilderDecoding<DType>::encoder_;    \
-  using TestArrowBuilderDecoding<DType>::decoder_;    \
-  using TestArrowBuilderDecoding<DType>::input_data_; \
-  using TestArrowBuilderDecoding<DType>::valid_bits_; \
-  using TestArrowBuilderDecoding<DType>::num_values_; \
-  using TestArrowBuilderDecoding<DType>::buffer_
-
-template <typename DType>
-class PlainEncoding : public TestArrowBuilderDecoding<DType> {
+class PlainEncoding : public TestArrowBuilderDecoding {
  public:
   void SetupEncoderDecoder() override {
     encoder_ = MakeTypedEncoder<ByteArrayType>(Encoding::PLAIN);
-    decoder_ = MakeTypedDecoder<ByteArrayType>(Encoding::PLAIN);
+    plain_decoder_ = MakeTypedDecoder<ByteArrayType>(Encoding::PLAIN);
+    decoder_ = plain_decoder_.get();
     ASSERT_NO_THROW(encoder_->PutSpaced(input_data_.data(), num_values_, valid_bits_, 0));
     buffer_ = encoder_->FlushValues();
     decoder_->SetData(num_values_, buffer_->data(), static_cast<int>(buffer_->size()));
   }
-
- protected:
-  TEST_ARROW_BUILDER_BASE_MEMBERS();
 };
 
-using BuilderArrayTypes = ::testing::Types<::arrow::BinaryType, ::arrow::StringType>;
-// using BuilderArrayTypes = ::testing::Types<::arrow::StringType>;
-TYPED_TEST_CASE(PlainEncoding, BuilderArrayTypes);
-
-TYPED_TEST(PlainEncoding, CheckDecodeArrowUsingDenseBuilder) {
+TEST_F(PlainEncoding, CheckDecodeArrowUsingDenseBuilder) {
   this->CheckDecodeArrowUsingDenseBuilder();
 }
 
-TYPED_TEST(PlainEncoding, CheckDecodeArrowUsingDictBuilder) {
+TEST_F(PlainEncoding, CheckDecodeArrowUsingDictBuilder) {
   this->CheckDecodeArrowUsingDictBuilder();
 }
 
-TYPED_TEST(PlainEncoding, CheckDecodeArrowNonNullDenseBuilder) {
+TEST_F(PlainEncoding, CheckDecodeArrowNonNullDenseBuilder) {
   this->CheckDecodeArrowNonNullUsingDenseBuilder();
 }
 
-TYPED_TEST(PlainEncoding, CheckDecodeArrowNonNullDictBuilder) {
+TEST_F(PlainEncoding, CheckDecodeArrowNonNullDictBuilder) {
   this->CheckDecodeArrowNonNullUsingDictBuilder();
 }
 
-template <typename DType>
-class DictEncoding : public TestArrowBuilderDecoding<DType> {
+class DictEncoding : public TestArrowBuilderDecoding {
  public:
   void SetupEncoderDecoder() override {
     auto node = schema::ByteArray("name");
@@ -544,36 +505,48 @@ class DictEncoding : public TestArrowBuilderDecoding<DType> {
     dict_decoder_->SetDict(plain_decoder_.get());
     dict_decoder_->SetData(num_values_, buffer_->data(),
                            static_cast<int>(buffer_->size()));
-    decoder_ = std::unique_ptr<ByteArrayDecoder>(
-        dynamic_cast<ByteArrayDecoder*>(dict_decoder_.release()));
+    decoder_ = dynamic_cast<ByteArrayDecoder*>(dict_decoder_.get());
   }
 
  protected:
-  TEST_ARROW_BUILDER_BASE_MEMBERS();
   std::unique_ptr<ColumnDescriptor> descr_;
-  std::unique_ptr<ByteArrayDecoder> plain_decoder_;
-  std::unique_ptr<DictDecoder<ByteArrayType>> dict_decoder_;
   std::shared_ptr<Buffer> dict_buffer_;
 };
 
-TYPED_TEST_CASE(DictEncoding, BuilderArrayTypes);
-
-TYPED_TEST(DictEncoding, CheckDecodeArrowUsingDenseBuilder) {
+TEST_F(DictEncoding, CheckDecodeArrowUsingDenseBuilder) {
   this->CheckDecodeArrowUsingDenseBuilder();
 }
 
-TYPED_TEST(DictEncoding, CheckDecodeArrowUsingDictBuilder) {
+TEST_F(DictEncoding, CheckDecodeArrowUsingDictBuilder) {
   this->CheckDecodeArrowUsingDictBuilder();
 }
 
-TYPED_TEST(DictEncoding, CheckDecodeArrowNonNullDenseBuilder) {
+TEST_F(DictEncoding, CheckDecodeArrowNonNullDenseBuilder) {
   this->CheckDecodeArrowNonNullUsingDenseBuilder();
 }
 
-TYPED_TEST(DictEncoding, CheckDecodeArrowNonNullDictBuilder) {
+TEST_F(DictEncoding, CheckDecodeArrowNonNullDictBuilder) {
   this->CheckDecodeArrowNonNullUsingDictBuilder();
 }
 
-}  // namespace test
+TEST_F(DictEncoding, CheckDecodeIndicesSpaced) {
+  for (auto np : null_probabilities_) {
+    InitTestCase(np);
+    auto builder = CreateDictBuilder();
+    dict_decoder_->InsertDictionary(builder.get());
+    auto actual_num_values = dict_decoder_->DecodeIndicesSpaced(
+        num_values_, null_count_, valid_bits_, 0, builder.get());
+    CheckDict(actual_num_values, *builder);
+  }
+}
 
+TEST_F(DictEncoding, CheckDecodeIndicesNoNulls) {
+  InitTestCase(/*null_probability=*/0.0);
+  auto builder = CreateDictBuilder();
+  dict_decoder_->InsertDictionary(builder.get());
+  auto actual_num_values = dict_decoder_->DecodeIndices(num_values_, builder.get());
+  CheckDict(actual_num_values, *builder);
+}
+
+}  // namespace test
 }  // namespace parquet
diff --git a/cpp/src/parquet/encoding.cc b/cpp/src/parquet/encoding.cc
index 304724b..ef42e88 100644
--- a/cpp/src/parquet/encoding.cc
+++ b/cpp/src/parquet/encoding.cc
@@ -25,6 +25,7 @@
 #include <vector>
 
 #include "arrow/util/bit-stream-utils.h"
+#include "arrow/util/checked_cast.h"
 #include "arrow/util/hashing.h"
 #include "arrow/util/logging.h"
 #include "arrow/util/rle-encoding.h"
@@ -36,6 +37,8 @@
 #include "parquet/schema.h"
 #include "parquet/types.h"
 
+using arrow::internal::checked_cast;
+
 namespace parquet {
 
 constexpr int64_t kInMemoryDefaultCapacity = 1024;
@@ -706,12 +709,45 @@ class PlainByteArrayDecoder : public PlainDecoder<ByteArrayType>,
   using Base::DecodeSpaced;
   using Base::PlainDecoder;
 
+  int DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits,
+                  int64_t valid_bits_offset,
+                  ::arrow::BinaryDictionaryBuilder* builder) override {
+    int result = 0;
+    PARQUET_THROW_NOT_OK(DecodeArrow(num_values, null_count, valid_bits,
+                                     valid_bits_offset, builder, &result));
+    return result;
+  }
+
+  int DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits,
+                  int64_t valid_bits_offset,
+                  ::arrow::internal::ChunkedBinaryBuilder* builder) override {
+    int result = 0;
+    PARQUET_THROW_NOT_OK(DecodeArrow(num_values, null_count, valid_bits,
+                                     valid_bits_offset, builder, &result));
+    return result;
+  }
+
+  int DecodeArrowNonNull(int num_values,
+                         ::arrow::BinaryDictionaryBuilder* builder) override {
+    int result = 0;
+    PARQUET_THROW_NOT_OK(DecodeArrowNonNull(num_values, builder, &result));
+    return result;
+  }
+
+  int DecodeArrowNonNull(int num_values,
+                         ::arrow::internal::ChunkedBinaryBuilder* builder) override {
+    int result = 0;
+    PARQUET_THROW_NOT_OK(DecodeArrowNonNull(num_values, builder, &result));
+    return result;
+  }
+
  private:
+  template <typename BuilderType>
   ::arrow::Status DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits,
-                              int64_t valid_bits_offset, WrappedBuilderInterface* builder,
-                              int* values_decoded) override {
+                              int64_t valid_bits_offset, BuilderType* builder,
+                              int* values_decoded) {
     num_values = std::min(num_values, num_values_);
-    builder->Reserve(num_values);
+    RETURN_NOT_OK(builder->Reserve(num_values));
     ::arrow::internal::BitmapReader bit_reader(valid_bits, valid_bits_offset, num_values);
     int increment;
     int i = 0;
@@ -725,12 +761,12 @@ class PlainByteArrayDecoder : public PlainDecoder<ByteArrayType>,
         if (data_size < increment) {
           ParquetException::EofException();
         }
-        builder->Append(data + sizeof(uint32_t), len);
+        RETURN_NOT_OK(builder->Append(data + sizeof(uint32_t), len));
         data += increment;
         data_size -= increment;
         bytes_decoded += increment;
       } else {
-        builder->AppendNull();
+        RETURN_NOT_OK(builder->AppendNull());
       }
       bit_reader.Next();
       ++i;
@@ -743,10 +779,11 @@ class PlainByteArrayDecoder : public PlainDecoder<ByteArrayType>,
     return ::arrow::Status::OK();
   }
 
-  ::arrow::Status DecodeArrowNonNull(int num_values, WrappedBuilderInterface* builder,
-                                     int* values_decoded) override {
+  template <typename BuilderType>
+  ::arrow::Status DecodeArrowNonNull(int num_values, BuilderType* builder,
+                                     int* values_decoded) {
     num_values = std::min(num_values, num_values_);
-    builder->Reserve(num_values);
+    RETURN_NOT_OK(builder->Reserve(num_values));
     int i = 0;
     const uint8_t* data = data_;
     int64_t data_size = len_;
@@ -756,7 +793,7 @@ class PlainByteArrayDecoder : public PlainDecoder<ByteArrayType>,
       uint32_t len = arrow::util::SafeLoadAs<uint32_t>(data);
       int increment = static_cast<int>(sizeof(uint32_t) + len);
       if (data_size < increment) ParquetException::EofException();
-      builder->Append(data + sizeof(uint32_t), len);
+      RETURN_NOT_OK(builder->Append(data + sizeof(uint32_t), len));
       data += increment;
       data_size -= increment;
       bytes_decoded += increment;
@@ -792,7 +829,10 @@ class DictDecoderImpl : public DecoderImpl, virtual public DictDecoder<Type> {
                            ::arrow::MemoryPool* pool = ::arrow::default_memory_pool())
       : DecoderImpl(descr, Encoding::RLE_DICTIONARY),
         dictionary_(AllocateBuffer(pool, 0)),
-        byte_array_data_(AllocateBuffer(pool, 0)) {}
+        dictionary_length_(0),
+        byte_array_data_(AllocateBuffer(pool, 0)),
+        byte_array_offsets_(AllocateBuffer(pool, 0)),
+        indices_scratch_space_(AllocateBuffer(pool, 0)) {}
 
   // Perform type-specific initiatialization
   void SetDict(TypedDecoder<Type>* dictionary) override;
@@ -806,112 +846,238 @@ class DictDecoderImpl : public DecoderImpl, virtual public DictDecoder<Type> {
     idx_decoder_ = ::arrow::util::RleDecoder(data, len, bit_width);
   }
 
-  int Decode(T* buffer, int max_values) override {
-    max_values = std::min(max_values, num_values_);
+  int Decode(T* buffer, int num_values) override {
+    num_values = std::min(num_values, num_values_);
     int decoded_values = idx_decoder_.GetBatchWithDict(
-        reinterpret_cast<const T*>(dictionary_->data()), buffer, max_values);
-    if (decoded_values != max_values) {
+        reinterpret_cast<const T*>(dictionary_->data()), buffer, num_values);
+    if (decoded_values != num_values) {
       ParquetException::EofException();
     }
-    num_values_ -= max_values;
-    return max_values;
+    num_values_ -= num_values;
+    return num_values;
   }
 
   int DecodeSpaced(T* buffer, int num_values, int null_count, const uint8_t* valid_bits,
                    int64_t valid_bits_offset) override {
-    int decoded_values = idx_decoder_.GetBatchWithDictSpaced(
-        reinterpret_cast<const T*>(dictionary_->data()), buffer, num_values, null_count,
-        valid_bits, valid_bits_offset);
-    if (decoded_values != num_values) {
+    num_values = std::min(num_values, num_values_);
+    if (num_values != idx_decoder_.GetBatchWithDictSpaced(
+                          reinterpret_cast<const T*>(dictionary_->data()), buffer,
+                          num_values, null_count, valid_bits, valid_bits_offset)) {
       ParquetException::EofException();
     }
-    return decoded_values;
+    num_values_ -= num_values;
+    return num_values;
+  }
+
+  void InsertDictionary(::arrow::ArrayBuilder* builder) override;
+
+  int DecodeIndicesSpaced(int num_values, int null_count, const uint8_t* valid_bits,
+                          int64_t valid_bits_offset,
+                          ::arrow::ArrayBuilder* builder) override {
+    num_values = std::min(num_values, num_values_);
+    if (num_values > 0) {
+      // TODO(wesm): Refactor to batch reads for improved memory use. It is not
+      // trivial because the null_count is relative to the entire bitmap
+      PARQUET_THROW_NOT_OK(indices_scratch_space_->TypedResize<int64_t>(
+          num_values, /*shrink_to_fit=*/false));
+    }
+
+    auto indices_buffer =
+        reinterpret_cast<int64_t*>(indices_scratch_space_->mutable_data());
+
+    if (num_values != idx_decoder_.GetBatchSpaced(num_values, null_count, valid_bits,
+                                                  valid_bits_offset, indices_buffer)) {
+      ParquetException::EofException();
+    }
+
+    /// XXX(wesm): Cannot append "valid bits" directly to the builder
+    std::vector<uint8_t> valid_bytes(num_values);
+    ::arrow::internal::BitmapReader bit_reader(valid_bits, valid_bits_offset, num_values);
+    for (int64_t i = 0; i < num_values; ++i) {
+      valid_bytes[i] = static_cast<uint8_t>(bit_reader.IsSet());
+      bit_reader.Next();
+    }
+
+    auto binary_builder = checked_cast<::arrow::BinaryDictionaryBuilder*>(builder);
+    PARQUET_THROW_NOT_OK(
+        binary_builder->AppendIndices(indices_buffer, num_values, valid_bytes.data()));
+    num_values_ -= num_values;
+    return num_values;
+  }
+
+  int DecodeIndices(int num_values, ::arrow::ArrayBuilder* builder) override {
+    num_values = std::min(num_values, num_values_);
+    num_values = std::min(num_values, num_values_);
+    if (num_values > 0) {
+      // TODO(wesm): Refactor to batch reads for improved memory use. This is
+      // relatively simple here because we don't have to do any bookkeeping of
+      // nulls
+      PARQUET_THROW_NOT_OK(indices_scratch_space_->TypedResize<int64_t>(
+          num_values, /*shrink_to_fit=*/false));
+    }
+    auto indices_buffer =
+        reinterpret_cast<int64_t*>(indices_scratch_space_->mutable_data());
+    if (num_values != idx_decoder_.GetBatch(indices_buffer, num_values)) {
+      ParquetException::EofException();
+    }
+    auto binary_builder = checked_cast<::arrow::BinaryDictionaryBuilder*>(builder);
+    PARQUET_THROW_NOT_OK(binary_builder->AppendIndices(indices_buffer, num_values));
+    num_values_ -= num_values;
+    return num_values;
   }
 
  protected:
   inline void DecodeDict(TypedDecoder<Type>* dictionary) {
-    int num_dictionary_values = dictionary->values_left();
-    PARQUET_THROW_NOT_OK(dictionary_->Resize(num_dictionary_values * sizeof(T)));
+    dictionary_length_ = static_cast<int32_t>(dictionary->values_left());
+    PARQUET_THROW_NOT_OK(dictionary_->Resize(dictionary_length_ * sizeof(T),
+                                             /*shrink_to_fit=*/false));
     dictionary->Decode(reinterpret_cast<T*>(dictionary_->mutable_data()),
-                       num_dictionary_values);
+                       dictionary_length_);
   }
 
   // Only one is set.
   std::shared_ptr<ResizableBuffer> dictionary_;
 
+  int32_t dictionary_length_;
+
   // Data that contains the byte array data (byte_array_dictionary_ just has the
   // pointers).
   std::shared_ptr<ResizableBuffer> byte_array_data_;
 
+  // Arrow-style byte offsets for each dictionary value. We maintain two
+  // representations of the dictionary, one as ByteArray* for non-Arrow
+  // consumers and this one for Arrow conumers. Since dictionaries are
+  // generally pretty small to begin with this doesn't mean too much extra
+  // memory use in most cases
+  std::shared_ptr<ResizableBuffer> byte_array_offsets_;
+
+  // Reusable buffer for decoding dictionary indices to be appended to a
+  // BinaryDictionaryBuilder
+  std::shared_ptr<ResizableBuffer> indices_scratch_space_;
+
   ::arrow::util::RleDecoder idx_decoder_;
 };
 
 template <typename Type>
-inline void DictDecoderImpl<Type>::SetDict(TypedDecoder<Type>* dictionary) {
+void DictDecoderImpl<Type>::SetDict(TypedDecoder<Type>* dictionary) {
   DecodeDict(dictionary);
 }
 
 template <>
-inline void DictDecoderImpl<BooleanType>::SetDict(TypedDecoder<BooleanType>* dictionary) {
+void DictDecoderImpl<BooleanType>::SetDict(TypedDecoder<BooleanType>* dictionary) {
   ParquetException::NYI("Dictionary encoding is not implemented for boolean values");
 }
 
 template <>
-inline void DictDecoderImpl<ByteArrayType>::SetDict(
-    TypedDecoder<ByteArrayType>* dictionary) {
-  int num_dictionary_values = dictionary->values_left();
+void DictDecoderImpl<ByteArrayType>::SetDict(TypedDecoder<ByteArrayType>* dictionary) {
   DecodeDict(dictionary);
 
   auto dict_values = reinterpret_cast<ByteArray*>(dictionary_->mutable_data());
 
   int total_size = 0;
-  for (int i = 0; i < num_dictionary_values; ++i) {
+  for (int i = 0; i < dictionary_length_; ++i) {
     total_size += dict_values[i].len;
   }
   if (total_size > 0) {
-    PARQUET_THROW_NOT_OK(byte_array_data_->Resize(total_size, false));
+    PARQUET_THROW_NOT_OK(byte_array_data_->Resize(total_size,
+                                                  /*shrink_to_fit=*/false));
+    PARQUET_THROW_NOT_OK(
+        byte_array_offsets_->Resize((dictionary_length_ + 1) * sizeof(int32_t),
+                                    /*shrink_to_fit=*/false));
   }
 
-  int offset = 0;
+  int32_t offset = 0;
   uint8_t* bytes_data = byte_array_data_->mutable_data();
-  for (int i = 0; i < num_dictionary_values; ++i) {
+  int32_t* bytes_offsets =
+      reinterpret_cast<int32_t*>(byte_array_offsets_->mutable_data());
+  for (int i = 0; i < dictionary_length_; ++i) {
     memcpy(bytes_data + offset, dict_values[i].ptr, dict_values[i].len);
+    bytes_offsets[i] = offset;
     dict_values[i].ptr = bytes_data + offset;
     offset += dict_values[i].len;
   }
+  bytes_offsets[dictionary_length_] = offset;
 }
 
 template <>
 inline void DictDecoderImpl<FLBAType>::SetDict(TypedDecoder<FLBAType>* dictionary) {
-  int num_dictionary_values = dictionary->values_left();
   DecodeDict(dictionary);
 
   auto dict_values = reinterpret_cast<FLBA*>(dictionary_->mutable_data());
 
   int fixed_len = descr_->type_length();
-  int total_size = num_dictionary_values * fixed_len;
+  int total_size = dictionary_length_ * fixed_len;
 
-  PARQUET_THROW_NOT_OK(byte_array_data_->Resize(total_size, false));
+  PARQUET_THROW_NOT_OK(byte_array_data_->Resize(total_size,
+                                                /*shrink_to_fit=*/false));
   uint8_t* bytes_data = byte_array_data_->mutable_data();
-  for (int32_t i = 0, offset = 0; i < num_dictionary_values; ++i, offset += fixed_len) {
+  for (int32_t i = 0, offset = 0; i < dictionary_length_; ++i, offset += fixed_len) {
     memcpy(bytes_data + offset, dict_values[i].ptr, fixed_len);
     dict_values[i].ptr = bytes_data + offset;
   }
 }
 
-class DictByteArrayDecoder : public DictDecoderImpl<ByteArrayType>,
-                             virtual public ByteArrayDecoder {
+template <typename Type>
+void DictDecoderImpl<Type>::InsertDictionary(::arrow::ArrayBuilder* builder) {
+  ParquetException::NYI("InsertDictionary only implemented for BYTE_ARRAY types");
+}
+
+template <>
+void DictDecoderImpl<ByteArrayType>::InsertDictionary(::arrow::ArrayBuilder* builder) {
+  auto binary_builder = checked_cast<::arrow::BinaryDictionaryBuilder*>(builder);
+
+  // Make an BinaryArray referencing the internal dictionary data
+  auto arr = std::make_shared<::arrow::BinaryArray>(
+      dictionary_length_, byte_array_offsets_, byte_array_data_);
+  PARQUET_THROW_NOT_OK(binary_builder->InsertMemoValues(*arr));
+}
+
+class DictByteArrayDecoderImpl : public DictDecoderImpl<ByteArrayType>,
+                                 virtual public ByteArrayDecoder {
  public:
   using BASE = DictDecoderImpl<ByteArrayType>;
   using BASE::DictDecoderImpl;
 
+  int DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits,
+                  int64_t valid_bits_offset,
+                  ::arrow::BinaryDictionaryBuilder* builder) override {
+    int result = 0;
+    PARQUET_THROW_NOT_OK(DecodeArrow(num_values, null_count, valid_bits,
+                                     valid_bits_offset, builder, &result));
+    return result;
+  }
+
+  int DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits,
+                  int64_t valid_bits_offset,
+                  ::arrow::internal::ChunkedBinaryBuilder* builder) override {
+    int result = 0;
+    PARQUET_THROW_NOT_OK(DecodeArrow(num_values, null_count, valid_bits,
+                                     valid_bits_offset, builder, &result));
+    return result;
+  }
+
+  int DecodeArrowNonNull(int num_values,
+                         ::arrow::BinaryDictionaryBuilder* builder) override {
+    int result = 0;
+    PARQUET_THROW_NOT_OK(DecodeArrowNonNull(num_values, builder, &result));
+    return result;
+  }
+
+  int DecodeArrowNonNull(int num_values,
+                         ::arrow::internal::ChunkedBinaryBuilder* builder) override {
+    int result = 0;
+    PARQUET_THROW_NOT_OK(DecodeArrowNonNull(num_values, builder, &result));
+    return result;
+  }
+
  private:
+  template <typename BuilderType>
   ::arrow::Status DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits,
-                              int64_t valid_bits_offset, WrappedBuilderInterface* builder,
-                              int* out_num_values) override {
+                              int64_t valid_bits_offset, BuilderType* builder,
+                              int* out_num_values) {
     constexpr int32_t buffer_size = 1024;
     int32_t indices_buffer[buffer_size];
-    builder->Reserve(num_values);
+    RETURN_NOT_OK(builder->Reserve(num_values));
     ::arrow::internal::BitmapReader bit_reader(valid_bits, valid_bits_offset, num_values);
 
     auto dict_values = reinterpret_cast<const ByteArray*>(dictionary_->data());
@@ -931,10 +1097,10 @@ class DictByteArrayDecoder : public DictDecoderImpl<ByteArrayType>,
           // Consume all indices
           if (is_valid) {
             const auto& val = dict_values[indices_buffer[i]];
-            builder->Append(val.ptr, val.len);
+            RETURN_NOT_OK(builder->Append(val.ptr, val.len));
             ++i;
           } else {
-            builder->AppendNull();
+            RETURN_NOT_OK(builder->AppendNull());
             --null_count;
           }
           ++values_decoded;
@@ -947,7 +1113,7 @@ class DictByteArrayDecoder : public DictDecoderImpl<ByteArrayType>,
           bit_reader.Next();
         }
       } else {
-        builder->AppendNull();
+        RETURN_NOT_OK(builder->AppendNull());
         --null_count;
         ++values_decoded;
       }
@@ -960,12 +1126,13 @@ class DictByteArrayDecoder : public DictDecoderImpl<ByteArrayType>,
     return ::arrow::Status::OK();
   }
 
-  ::arrow::Status DecodeArrowNonNull(int num_values, WrappedBuilderInterface* builder,
-                                     int* out_num_values) override {
+  template <typename BuilderType>
+  ::arrow::Status DecodeArrowNonNull(int num_values, BuilderType* builder,
+                                     int* out_num_values) {
     constexpr int32_t buffer_size = 2048;
     int32_t indices_buffer[buffer_size];
     int values_decoded = 0;
-    builder->Reserve(num_values);
+    RETURN_NOT_OK(builder->Reserve(num_values));
 
     auto dict_values = reinterpret_cast<const ByteArray*>(dictionary_->data());
 
@@ -975,7 +1142,7 @@ class DictByteArrayDecoder : public DictDecoderImpl<ByteArrayType>,
       if (num_indices == 0) break;
       for (int i = 0; i < num_indices; ++i) {
         const auto& val = dict_values[indices_buffer[i]];
-        builder->Append(val.ptr, val.len);
+        RETURN_NOT_OK(builder->Append(val.ptr, val.len));
       }
       values_decoded += num_indices;
     }
@@ -1234,7 +1401,7 @@ std::unique_ptr<Decoder> MakeDictDecoder(Type::type type_num,
     case Type::DOUBLE:
       return std::unique_ptr<Decoder>(new DictDecoderImpl<DoubleType>(descr, pool));
     case Type::BYTE_ARRAY:
-      return std::unique_ptr<Decoder>(new DictByteArrayDecoder(descr, pool));
+      return std::unique_ptr<Decoder>(new DictByteArrayDecoderImpl(descr, pool));
     case Type::FIXED_LEN_BYTE_ARRAY:
       return std::unique_ptr<Decoder>(new DictFLBADecoder(descr, pool));
     default:
@@ -1245,5 +1412,4 @@ std::unique_ptr<Decoder> MakeDictDecoder(Type::type type_num,
 }
 
 }  // namespace detail
-
 }  // namespace parquet
diff --git a/cpp/src/parquet/encoding.h b/cpp/src/parquet/encoding.h
index 28a9b98..4918a13 100644
--- a/cpp/src/parquet/encoding.h
+++ b/cpp/src/parquet/encoding.h
@@ -26,6 +26,18 @@
 #include "parquet/platform.h"
 #include "parquet/types.h"
 
+namespace arrow {
+
+class ArrayBuilder;
+class BinaryDictionaryBuilder;
+
+namespace internal {
+
+class ChunkedBinaryBuilder;
+
+}  // namespace internal
+}  // namespace arrow
+
 namespace parquet {
 
 class ColumnDescriptor;
@@ -156,6 +168,26 @@ template <typename DType>
 class DictDecoder : virtual public TypedDecoder<DType> {
  public:
   virtual void SetDict(TypedDecoder<DType>* dictionary) = 0;
+
+  /// \brief Insert dictionary values into the Arrow dictionary builder's memo,
+  /// but do not append any indices
+  virtual void InsertDictionary(::arrow::ArrayBuilder* builder) = 0;
+
+  /// \brief Decode only dictionary indices and append to dictionary
+  /// builder. The builder must have had the dictionary from this decoder
+  /// inserted already.
+  ///
+  /// Remember to reset the builder each time the dict decoder is initialized
+  /// with a new dictionary page
+  virtual int DecodeIndicesSpaced(int num_values, int null_count,
+                                  const uint8_t* valid_bits, int64_t valid_bits_offset,
+                                  ::arrow::ArrayBuilder* builder) = 0;
+
+  /// \brief Decode only dictionary indices (no nulls)
+  ///
+  /// Remember to reset the builder each time the dict decoder is initialized
+  /// with a new dictionary page
+  virtual int DecodeIndices(int num_values, ::arrow::ArrayBuilder* builder) = 0;
 };
 
 // ----------------------------------------------------------------------
@@ -191,60 +223,19 @@ class ByteArrayDecoder : virtual public TypedDecoder<ByteArrayType> {
  public:
   using TypedDecoder<ByteArrayType>::DecodeSpaced;
 
-  class WrappedBuilderInterface {
-   public:
-    virtual void Reserve(int64_t values) = 0;
-    virtual void Append(const uint8_t* value, uint32_t length) = 0;
-    virtual void AppendNull() = 0;
-    virtual ~WrappedBuilderInterface() = default;
-  };
-
-  template <typename Builder>
-  class WrappedBuilder : public WrappedBuilderInterface {
-   public:
-    explicit WrappedBuilder(Builder* builder) : builder_(builder) {}
-
-    void Reserve(int64_t values) override {
-      PARQUET_THROW_NOT_OK(builder_->Reserve(values));
-    }
-    void Append(const uint8_t* value, uint32_t length) override {
-      PARQUET_THROW_NOT_OK(builder_->Append(value, length));
-    }
-
-    void AppendNull() override { PARQUET_THROW_NOT_OK(builder_->AppendNull()); }
-
-   private:
-    Builder* builder_;
-  };
+  virtual int DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits,
+                          int64_t valid_bits_offset,
+                          ::arrow::BinaryDictionaryBuilder* builder) = 0;
 
-  template <typename Builder>
-  int DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits,
-                  int64_t valid_bits_offset, Builder* builder) {
-    int result = 0;
-    WrappedBuilder<Builder> wrapped_builder(builder);
-    PARQUET_THROW_NOT_OK(DecodeArrow(num_values, null_count, valid_bits,
-                                     valid_bits_offset, &wrapped_builder, &result));
-    return result;
-  }
-
-  template <typename Builder>
-  int DecodeArrowNonNull(int num_values, Builder* builder) {
-    int result = 0;
-    WrappedBuilder<Builder> wrapped_builder(builder);
-    PARQUET_THROW_NOT_OK(DecodeArrowNonNull(num_values, &wrapped_builder, &result));
-    return result;
-  }
+  virtual int DecodeArrowNonNull(int num_values,
+                                 ::arrow::BinaryDictionaryBuilder* builder) = 0;
 
- private:
-  virtual ::arrow::Status DecodeArrow(int num_values, int null_count,
-                                      const uint8_t* valid_bits,
-                                      int64_t valid_bits_offset,
-                                      WrappedBuilderInterface* builder,
-                                      int* values_decoded) = 0;
+  virtual int DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits,
+                          int64_t valid_bits_offset,
+                          ::arrow::internal::ChunkedBinaryBuilder* builder) = 0;
 
-  virtual ::arrow::Status DecodeArrowNonNull(int num_values,
-                                             WrappedBuilderInterface* builder,
-                                             int* values_decoded) = 0;
+  virtual int DecodeArrowNonNull(int num_values,
+                                 ::arrow::internal::ChunkedBinaryBuilder* builder) = 0;
 };
 
 class FLBADecoder : virtual public TypedDecoder<FLBAType> {
diff --git a/cpp/src/parquet/types.h b/cpp/src/parquet/types.h
index dfa056e..f24ff85 100644
--- a/cpp/src/parquet/types.h
+++ b/cpp/src/parquet/types.h
@@ -452,7 +452,8 @@ struct Encoding {
     DELTA_BINARY_PACKED = 5,
     DELTA_LENGTH_BYTE_ARRAY = 6,
     DELTA_BYTE_ARRAY = 7,
-    RLE_DICTIONARY = 8
+    RLE_DICTIONARY = 8,
+    UNKNOWN = 999
   };
 };
 
@@ -632,19 +633,19 @@ struct type_traits<Type::FIXED_LEN_BYTE_ARRAY> {
 };
 
 template <Type::type TYPE>
-struct DataType {
+struct PhysicalType {
   using c_type = typename type_traits<TYPE>::value_type;
   static constexpr Type::type type_num = TYPE;
 };
 
-using BooleanType = DataType<Type::BOOLEAN>;
-using Int32Type = DataType<Type::INT32>;
-using Int64Type = DataType<Type::INT64>;
-using Int96Type = DataType<Type::INT96>;
-using FloatType = DataType<Type::FLOAT>;
-using DoubleType = DataType<Type::DOUBLE>;
-using ByteArrayType = DataType<Type::BYTE_ARRAY>;
-using FLBAType = DataType<Type::FIXED_LEN_BYTE_ARRAY>;
+using BooleanType = PhysicalType<Type::BOOLEAN>;
+using Int32Type = PhysicalType<Type::INT32>;
+using Int64Type = PhysicalType<Type::INT64>;
+using Int96Type = PhysicalType<Type::INT96>;
+using FloatType = PhysicalType<Type::FLOAT>;
+using DoubleType = PhysicalType<Type::DOUBLE>;
+using ByteArrayType = PhysicalType<Type::BYTE_ARRAY>;
+using FLBAType = PhysicalType<Type::FIXED_LEN_BYTE_ARRAY>;
 
 template <typename Type>
 inline std::string format_fwf(int width) {
diff --git a/python/pyarrow/_parquet.pxd b/python/pyarrow/_parquet.pxd
index 6a11fc8..270bb61 100644
--- a/python/pyarrow/_parquet.pxd
+++ b/python/pyarrow/_parquet.pxd
@@ -358,6 +358,11 @@ cdef extern from "parquet/api/writer.h" namespace "parquet" nogil:
 
 
 cdef extern from "parquet/arrow/reader.h" namespace "parquet::arrow" nogil:
+    cdef cppclass ArrowReaderProperties:
+        pass
+
+    ArrowReaderProperties default_arrow_reader_properties()
+
     CStatus OpenFile(const shared_ptr[RandomAccessFile]& file,
                      CMemoryPool* allocator,
                      const ReaderProperties& properties,
@@ -389,11 +394,13 @@ cdef extern from "parquet/arrow/reader.h" namespace "parquet::arrow" nogil:
 cdef extern from "parquet/arrow/schema.h" namespace "parquet::arrow" nogil:
     CStatus FromParquetSchema(
         const SchemaDescriptor* parquet_schema,
+        const ArrowReaderProperties& properties,
         const shared_ptr[const CKeyValueMetadata]& key_value_metadata,
         shared_ptr[CSchema]* out)
 
     CStatus ToParquetSchema(
         const CSchema* arrow_schema,
+        const ArrowReaderProperties& properties,
         const shared_ptr[const CKeyValueMetadata]& key_value_metadata,
         shared_ptr[SchemaDescriptor]* out)
 
diff --git a/python/pyarrow/_parquet.pyx b/python/pyarrow/_parquet.pyx
index eb74dea..1944ab6 100644
--- a/python/pyarrow/_parquet.pyx
+++ b/python/pyarrow/_parquet.pyx
@@ -743,7 +743,8 @@ cdef class ParquetSchema:
 
         with nogil:
             check_status(FromParquetSchema(
-                self.schema, self.parent._metadata.key_value_metadata(),
+                self.schema, default_arrow_reader_properties(),
+                self.parent._metadata.key_value_metadata(),
                 &sp_arrow_schema))
 
         return pyarrow_wrap_schema(sp_arrow_schema)