You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2019/07/30 14:41:29 UTC

[arrow] branch master updated: ARROW-6065: [C++][Parquet] Clean up parquet/arrow/reader.cc, reduce code duplication, improve readability

This is an automated email from the ASF dual-hosted git repository.

wesm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new dbd93e3  ARROW-6065: [C++][Parquet] Clean up parquet/arrow/reader.cc, reduce code duplication, improve readability
dbd93e3 is described below

commit dbd93e3b8f8690681bd09a6fb596afd141318dbd
Author: Wes McKinney <we...@apache.org>
AuthorDate: Tue Jul 30 09:41:16 2019 -0500

    ARROW-6065: [C++][Parquet] Clean up parquet/arrow/reader.cc, reduce code duplication, improve readability
    
    This is strictly a refactoring PR. I'm going to start working (for a new PR) on some refactoring of the handling of schemas and nested types (which is also pretty messy in my opinion). The motivation for this is to be able to more cleanly reason about direct dictionary-decoding without having to resort to such hacks as the current `FixSchemas` function
    
    Also cleans up Parquet includes using IWYU
    
    Closes #4963 from wesm/parquet-arrow-read-refactor and squashes the following commits:
    
    cf9f5e303 <Wes McKinney> Add missing API include, fix Python bindings
    9d9a30b04 <Wes McKinney> More cleaning
    174f046b4 <Wes McKinney> Refactor complete, tests passing again. IWYU pass
    02153bf56 <Wes McKinney> More refactoring progress
    
    Authored-by: Wes McKinney <we...@apache.org>
    Signed-off-by: Wes McKinney <we...@apache.org>
---
 cpp/build-support/iwyu/mappings/arrow-misc.imp     |    3 +
 cpp/src/parquet/CMakeLists.txt                     |    1 +
 cpp/src/parquet/api/reader.h                       |    1 +
 cpp/src/parquet/api/writer.h                       |    1 +
 cpp/src/parquet/arrow/arrow-reader-writer-test.cc  |    6 +-
 cpp/src/parquet/arrow/reader.cc                    | 2003 ++++++--------------
 cpp/src/parquet/arrow/reader.h                     |  163 +-
 cpp/src/parquet/arrow/reader_internal.cc           |  616 ++++++
 .../{api/reader.h => arrow/reader_internal.h}      |   53 +-
 cpp/src/parquet/arrow/schema.cc                    |    4 -
 cpp/src/parquet/arrow/schema.h                     |    1 -
 cpp/src/parquet/arrow/writer.cc                    |    5 +-
 cpp/src/parquet/arrow/writer.h                     |    9 +-
 cpp/src/parquet/bloom_filter.cc                    |    2 -
 cpp/src/parquet/bloom_filter.h                     |    6 -
 cpp/src/parquet/column_reader.cc                   |   15 +-
 cpp/src/parquet/column_reader.h                    |   10 +-
 cpp/src/parquet/column_writer.cc                   |    8 +-
 cpp/src/parquet/column_writer.h                    |    9 +-
 cpp/src/parquet/deprecated_io.cc                   |    6 -
 cpp/src/parquet/deprecated_io.h                    |   11 -
 cpp/src/parquet/encoding.cc                        |    3 +-
 cpp/src/parquet/file_reader.cc                     |    3 +-
 cpp/src/parquet/file_reader.h                      |    2 +-
 cpp/src/parquet/file_writer.cc                     |    3 +
 cpp/src/parquet/file_writer.h                      |   13 -
 cpp/src/parquet/metadata.cc                        |    1 -
 cpp/src/parquet/metadata.h                         |    1 -
 cpp/src/parquet/platform.h                         |    1 +
 cpp/src/parquet/properties.cc                      |    1 -
 cpp/src/parquet/schema.cc                          |    4 +
 cpp/src/parquet/schema.h                           |    4 -
 cpp/src/parquet/statistics.cc                      |    1 +
 cpp/src/parquet/statistics.h                       |    4 +-
 cpp/src/parquet/types.cc                           |    3 -
 cpp/src/parquet/types.h                            |    1 -
 36 files changed, 1327 insertions(+), 1651 deletions(-)

diff --git a/cpp/build-support/iwyu/mappings/arrow-misc.imp b/cpp/build-support/iwyu/mappings/arrow-misc.imp
index 7ff9910..a251006 100644
--- a/cpp/build-support/iwyu/mappings/arrow-misc.imp
+++ b/cpp/build-support/iwyu/mappings/arrow-misc.imp
@@ -27,6 +27,7 @@
   { include: ["<ext/alloc_traits.h>", private, "<unordered_map>", public ] },
   { include: ["<ext/alloc_traits.h>", private, "<unordered_set>", public ] },
   { include: ["<ext/alloc_traits.h>", private, "<vector>", public ] },
+  { include: ["<bits/exception.h>", private, "<exception>", public ] },
   { include: ["<bits/stdint-intn.h>", private, "<cstdint>", public ] },
   { include: ["<bits/stdint-uintn.h>", private, "<cstdint>", public ] },
   { include: ["<bits/shared_ptr.h>", private, "<memory>", public ] },
@@ -49,7 +50,9 @@
   { symbol: ["shared_ptr", private, "<memory>", public ] },
   { symbol: ["_Node_const_iterator", private, "<flatbuffers/flatbuffers.h>", public ] },
   { symbol: ["unordered_map<>::mapped_type", private, "<flatbuffers/flatbuffers.h>", public ] },
+  { symbol: ["std::copy", private, "<algorithm>", public ] },
   { symbol: ["std::move", private, "<utility>", public ] },
+  { symbol: ["std::transform", private, "<algorithm>", public ] },
   { symbol: ["pair", private, "<utility>", public ] },
   { symbol: ["errno", private, "<cerrno>", public ] },
   { symbol: ["posix_memalign", private, "<cstdlib>", public ] }
diff --git a/cpp/src/parquet/CMakeLists.txt b/cpp/src/parquet/CMakeLists.txt
index 8a07798..db00ab4 100644
--- a/cpp/src/parquet/CMakeLists.txt
+++ b/cpp/src/parquet/CMakeLists.txt
@@ -157,6 +157,7 @@ add_custom_command(OUTPUT ${THRIFT_OUTPUT_FILES}
 
 set(PARQUET_SRCS
     arrow/reader.cc
+    arrow/reader_internal.cc
     arrow/schema.cc
     arrow/writer.cc
     bloom_filter.cc
diff --git a/cpp/src/parquet/api/reader.h b/cpp/src/parquet/api/reader.h
index b29ca72..49f9ac6 100644
--- a/cpp/src/parquet/api/reader.h
+++ b/cpp/src/parquet/api/reader.h
@@ -27,6 +27,7 @@
 #include "parquet/platform.h"
 #include "parquet/printer.h"
 #include "parquet/properties.h"
+#include "parquet/statistics.h"
 
 // Schemas
 #include "parquet/api/schema.h"
diff --git a/cpp/src/parquet/api/writer.h b/cpp/src/parquet/api/writer.h
index 3b4e42f..54ac08f 100644
--- a/cpp/src/parquet/api/writer.h
+++ b/cpp/src/parquet/api/writer.h
@@ -23,5 +23,6 @@
 #include "parquet/column_writer.h"
 #include "parquet/exception.h"
 #include "parquet/file_writer.h"
+#include "parquet/statistics.h"
 
 #endif  // PARQUET_API_WRITER_H
diff --git a/cpp/src/parquet/arrow/arrow-reader-writer-test.cc b/cpp/src/parquet/arrow/arrow-reader-writer-test.cc
index f733ea5..d0d2536 100644
--- a/cpp/src/parquet/arrow/arrow-reader-writer-test.cc
+++ b/cpp/src/parquet/arrow/arrow-reader-writer-test.cc
@@ -1845,8 +1845,8 @@ TEST(TestArrowReadWrite, MultithreadedRead) {
 }
 
 TEST(TestArrowReadWrite, ReadSingleRowGroup) {
-  const int num_columns = 20;
-  const int num_rows = 1000;
+  const int num_columns = 10;
+  const int num_rows = 100;
 
   std::shared_ptr<Table> table;
   ASSERT_NO_FATAL_FAILURE(MakeDoubleTable(num_columns, num_rows, 1, &table));
@@ -1872,7 +1872,7 @@ TEST(TestArrowReadWrite, ReadSingleRowGroup) {
   std::shared_ptr<Table> concatenated;
 
   ASSERT_OK(ConcatenateTables({r1, r2}, &concatenated));
-  ASSERT_TRUE(table->Equals(*concatenated));
+  AssertTablesEqual(*concatenated, *table, /*same_chunk_layout=*/false);
 
   ASSERT_TRUE(table->Equals(*r3));
   ASSERT_TRUE(r2->Equals(*r4));
diff --git a/cpp/src/parquet/arrow/reader.cc b/cpp/src/parquet/arrow/reader.cc
index fc953da..e710e3b 100644
--- a/cpp/src/parquet/arrow/reader.cc
+++ b/cpp/src/parquet/arrow/reader.cc
@@ -18,31 +18,23 @@
 #include "parquet/arrow/reader.h"
 
 #include <algorithm>
-#include <climits>
 #include <cstring>
 #include <deque>
+#include <functional>
 #include <future>
 #include <numeric>
-#include <type_traits>
 #include <utility>
 #include <vector>
 
 #include "arrow/array.h"
-#include "arrow/buffer.h"
 #include "arrow/builder.h"
 #include "arrow/record_batch.h"
-#include "arrow/status.h"
 #include "arrow/table.h"
 #include "arrow/type.h"
-#include "arrow/type_traits.h"
-#include "arrow/util/int-util.h"
 #include "arrow/util/logging.h"
 #include "arrow/util/thread-pool.h"
-#include "arrow/util/ubsan.h"
-
-// For arrow::compute::Datum. This should perhaps be promoted. See ARROW-4022
-#include "arrow/compute/kernel.h"
 
+#include "parquet/arrow/reader_internal.h"
 #include "parquet/arrow/schema.h"
 #include "parquet/column_reader.h"
 #include "parquet/exception.h"
@@ -51,7 +43,6 @@
 #include "parquet/properties.h"
 #include "parquet/schema-internal.h"
 #include "parquet/schema.h"
-#include "parquet/types.h"
 
 using arrow::Array;
 using arrow::BooleanArray;
@@ -67,13 +58,6 @@ using arrow::StructArray;
 using arrow::Table;
 using arrow::TimestampArray;
 
-using ::arrow::BitUtil::FromBigEndian;
-using ::arrow::internal::SafeLeftShift;
-using ::arrow::util::SafeLoadAs;
-
-// For Array/ChunkedArray variant
-using arrow::compute::Datum;
-
 using parquet::schema::Node;
 
 // Help reduce verbosity
@@ -82,24 +66,18 @@ using arrow::RecordBatchReader;
 
 using parquet::internal::RecordReader;
 
+#define BEGIN_PARQUET_CATCH_EXCEPTIONS try {
+#define END_PARQUET_CATCH_EXCEPTIONS             \
+  }                                              \
+  catch (const ::parquet::ParquetException& e) { \
+    return ::arrow::Status::IOError(e.what());   \
+  }
+
 namespace parquet {
 namespace arrow {
 
-template <typename ArrowType>
-using ArrayType = typename ::arrow::TypeTraits<ArrowType>::ArrayType;
-
-namespace {
-
-Status GetSingleChunk(const ChunkedArray& chunked, std::shared_ptr<Array>* out) {
-  DCHECK_GT(chunked.num_chunks(), 0);
-  if (chunked.num_chunks() > 1) {
-    return Status::Invalid("Function call returned a chunked array");
-  }
-  *out = chunked.chunk(0);
-  return Status::OK();
-}
-
-}  // namespace
+class ColumnChunkReaderImpl;
+class ColumnReaderImpl;
 
 ArrowReaderProperties default_arrow_reader_properties() {
   static ArrowReaderProperties default_reader_props;
@@ -132,19 +110,6 @@ class FileColumnIterator {
     return row_group_reader->GetColumnPageReader(column_index_);
   }
 
-  static FileColumnIterator* MakeAllRowGroupsIterator(int column_index,
-                                                      ParquetFileReader* reader) {
-    std::vector<int> row_groups(reader->metadata()->num_row_groups());
-    std::iota(row_groups.begin(), row_groups.end(), 0);
-    return new FileColumnIterator(column_index, reader, row_groups);
-  }
-
-  static FileColumnIterator* MakeSingleRowGroupIterator(int column_index,
-                                                        ParquetFileReader* reader,
-                                                        int row_group) {
-    return new FileColumnIterator(column_index, reader, {row_group});
-  }
-
   const SchemaDescriptor* schema() const { return schema_; }
 
   const ColumnDescriptor* descr() const { return schema_->Column(column_index_); }
@@ -160,6 +125,9 @@ class FileColumnIterator {
   std::deque<int> row_groups_;
 };
 
+using FileColumnIteratorFactory =
+    std::function<FileColumnIterator*(int, ParquetFileReader*)>;
+
 class RowGroupRecordBatchReader : public ::arrow::RecordBatchReader {
  public:
   explicit RowGroupRecordBatchReader(const std::vector<int>& row_group_indices,
@@ -216,118 +184,316 @@ class RowGroupRecordBatchReader : public ::arrow::RecordBatchReader {
 };
 
 // ----------------------------------------------------------------------
-// File reader implementation
+// FileReaderImpl forward declaration
 
-using FileColumnIteratorFactory =
-    std::function<FileColumnIterator*(int, ParquetFileReader*)>;
-
-class FileReader::Impl {
+class FileReaderImpl : public FileReader {
  public:
-  Impl(MemoryPool* pool, std::unique_ptr<ParquetFileReader> reader,
-       const ArrowReaderProperties& properties)
+  FileReaderImpl(MemoryPool* pool, std::unique_ptr<ParquetFileReader> reader,
+                 const ArrowReaderProperties& properties)
       : pool_(pool), reader_(std::move(reader)), reader_properties_(properties) {}
 
-  virtual ~Impl() {}
-
   Status Init() {
     // TODO(wesm): Smarter schema/column-reader initialization for nested data
     return Status::OK();
   }
 
+  FileColumnIteratorFactory SomeRowGroupsFactory(std::vector<int> row_groups) {
+    return [row_groups](int i, ParquetFileReader* reader) {
+      return new FileColumnIterator(i, reader, row_groups);
+    };
+  }
+
+  std::vector<int> AllRowGroups() {
+    std::vector<int> row_groups(reader_->metadata()->num_row_groups());
+    std::iota(row_groups.begin(), row_groups.end(), 0);
+    return row_groups;
+  }
+
+  std::vector<int> AllColumnIndices() {
+    std::vector<int> indices(reader_->metadata()->num_columns());
+    std::iota(indices.begin(), indices.end(), 0);
+    return indices;
+  }
+
+  FileColumnIteratorFactory AllRowGroupsFactory() {
+    return SomeRowGroupsFactory(AllRowGroups());
+  }
+
+  int64_t GetTotalRecords(const std::vector<int>& row_groups, int column_chunk = 0) {
+    // Can throw exception
+    int64_t records = 0;
+    for (int j = 0; j < static_cast<int>(row_groups.size()); j++) {
+      records += reader_->metadata()
+                     ->RowGroup(row_groups[j])
+                     ->ColumnChunk(column_chunk)
+                     ->num_values();
+    }
+    return records;
+  }
+
+  std::shared_ptr<RowGroupReader> RowGroup(int row_group_index) override;
+
+  Status GetReaderForNode(int index, const Node* node, const std::vector<int>& indices,
+                          int16_t def_level, FileColumnIteratorFactory iterator_factory,
+                          std::unique_ptr<ColumnReaderImpl>* out);
+
+  Status ReadTable(const std::vector<int>& indices,
+                   std::shared_ptr<Table>* out) override {
+    return ReadRowGroups(AllRowGroups(), indices, out);
+  }
+
   Status GetColumn(int i, FileColumnIteratorFactory iterator_factory,
                    std::unique_ptr<ColumnReader>* out);
 
-  Status ReadSchemaField(int i, std::shared_ptr<ChunkedArray>* out);
+  Status GetColumn(int i, std::unique_ptr<ColumnReader>* out) override {
+    return GetColumn(i, AllRowGroupsFactory(), out);
+  }
+
+  Status GetSchema(std::shared_ptr<::arrow::Schema>* out) override {
+    return FromParquetSchema(reader_->metadata()->schema(), reader_properties_,
+                             reader_->metadata()->key_value_metadata(), out);
+  }
+
+  Status GetSchema(const std::vector<int>& indices,
+                   std::shared_ptr<::arrow::Schema>* out) override {
+    return FromParquetSchema(reader_->metadata()->schema(), indices, reader_properties_,
+                             reader_->metadata()->key_value_metadata(), out);
+  }
+
   Status ReadSchemaField(int i, const std::vector<int>& indices,
+                         const std::vector<int>& row_groups,
                          std::shared_ptr<ChunkedArray>* out);
-  Status ReadColumn(int i, std::shared_ptr<ChunkedArray>* out);
-  Status ReadColumnChunk(int column_index, int row_group_index,
-                         std::shared_ptr<ChunkedArray>* out);
-  Status ReadColumnChunk(int column_index, const std::vector<int>& indices,
-                         int row_group_index, std::shared_ptr<ChunkedArray>* out);
 
-  Status GetReaderForNode(int index, const Node* node, const std::vector<int>& indices,
-                          int16_t def_level, FileColumnIteratorFactory iterator_factory,
-                          std::unique_ptr<ColumnReader::ColumnReaderImpl>* out);
+  Status ReadSchemaField(int i, const std::vector<int>& indices,
+                         std::shared_ptr<ChunkedArray>* out) {
+    return ReadSchemaField(i, indices, AllRowGroups(), out);
+  }
 
-  Status GetSchema(std::shared_ptr<::arrow::Schema>* out);
-  Status GetSchema(const std::vector<int>& indices,
-                   std::shared_ptr<::arrow::Schema>* out);
-
-  Status ReadRowGroup(int row_group_index, std::shared_ptr<Table>* table);
-  Status ReadRowGroup(int row_group_index, const std::vector<int>& indices,
-                      std::shared_ptr<::arrow::Table>* out);
-  Status ReadTable(const std::vector<int>& indices, std::shared_ptr<Table>* table);
-  Status ReadTable(std::shared_ptr<Table>* table);
-  Status ReadRowGroups(const std::vector<int>& row_groups, std::shared_ptr<Table>* table);
-  Status ReadRowGroups(const std::vector<int>& row_groups,
-                       const std::vector<int>& indices,
-                       std::shared_ptr<::arrow::Table>* out);
+  Status ReadSchemaField(int i, std::shared_ptr<ChunkedArray>* out) override {
+    return ReadSchemaField(i, AllColumnIndices(), AllRowGroups(), out);
+  }
 
-  bool CheckForFlatColumn(const ColumnDescriptor* descr);
-  bool CheckForFlatListColumn(const ColumnDescriptor* descr);
+  Status ReadColumn(int i, const std::vector<int>& row_groups,
+                    std::shared_ptr<ChunkedArray>* out) {
+    std::unique_ptr<ColumnReader> flat_column_reader;
+    RETURN_NOT_OK(GetColumn(i, SomeRowGroupsFactory(row_groups), &flat_column_reader));
+    BEGIN_PARQUET_CATCH_EXCEPTIONS
+    int64_t records_to_read = GetTotalRecords(row_groups, i);
+    return flat_column_reader->NextBatch(records_to_read, out);
+    END_PARQUET_CATCH_EXCEPTIONS
+  }
 
-  const ParquetFileReader* parquet_reader() const { return reader_.get(); }
+  Status ReadColumn(int i, std::shared_ptr<ChunkedArray>* out) override {
+    return ReadColumn(i, AllRowGroups(), out);
+  }
 
-  int num_row_groups() const { return reader_->metadata()->num_row_groups(); }
+  Status ReadTable(std::shared_ptr<Table>* table) override {
+    std::vector<int> indices(reader_->metadata()->num_columns());
+    for (size_t i = 0; i < indices.size(); ++i) {
+      indices[i] = static_cast<int>(i);
+    }
+    return ReadTable(indices, table);
+  }
 
-  int num_columns() const { return reader_->metadata()->num_columns(); }
+  Status ReadRowGroups(const std::vector<int>& row_groups,
+                       const std::vector<int>& indices,
+                       std::shared_ptr<Table>* table) override;
 
-  void set_use_threads(bool use_threads) {
-    reader_properties_.set_use_threads(use_threads);
+  Status ReadRowGroups(const std::vector<int>& row_groups,
+                       std::shared_ptr<Table>* table) override {
+    return ReadRowGroups(row_groups, AllColumnIndices(), table);
+  }
+
+  Status ReadRowGroup(int row_group_index, const std::vector<int>& column_indices,
+                      std::shared_ptr<Table>* out) override {
+    return ReadRowGroups({row_group_index}, column_indices, out);
+  }
+
+  Status ReadRowGroup(int i, std::shared_ptr<Table>* table) override {
+    return ReadRowGroup(i, AllColumnIndices(), table);
   }
 
-  ParquetFileReader* reader() { return reader_.get(); }
+  std::vector<int> GetDictionaryIndices(const std::vector<int>& indices) {
+    // Select the column indices that were read as DictionaryArray
+    std::vector<int> dict_indices(indices);
+    auto remove_func = [this](int i) { return !reader_properties_.read_dictionary(i); };
+    auto it = std::remove_if(dict_indices.begin(), dict_indices.end(), remove_func);
+    dict_indices.erase(it, dict_indices.end());
+    return dict_indices;
+  }
 
-  std::vector<int> GetDictionaryIndices(const std::vector<int>& indices);
   std::shared_ptr<::arrow::Schema> FixSchema(
       const ::arrow::Schema& old_schema, const std::vector<int>& dict_indices,
-      const std::vector<std::shared_ptr<::arrow::ChunkedArray>>& columns);
+      const std::vector<std::shared_ptr<::arrow::ChunkedArray>>& columns) {
+    // Fix the schema with the actual DictionaryType that was read
+    auto fields = old_schema.fields();
+
+    for (int idx : dict_indices) {
+      fields[idx] = old_schema.field(idx)->WithType(columns[idx]->type());
+    }
+    return std::make_shared<::arrow::Schema>(fields, old_schema.metadata());
+  }
+
+  Status GetRecordBatchReader(const std::vector<int>& row_group_indices,
+                              std::shared_ptr<RecordBatchReader>* out) override {
+    return GetRecordBatchReader(row_group_indices, AllColumnIndices(), out);
+  }
+
+  Status BoundsCheckRowGroup(int row_group) {
+    // row group indices check
+    if (row_group < 0 || row_group >= num_row_groups()) {
+      return Status::Invalid("Some index in row_group_indices is ", row_group,
+                             ", which is either < 0 or >= num_row_groups(",
+                             num_row_groups(), ")");
+    }
+    return Status::OK();
+  }
+
+  Status GetRecordBatchReader(const std::vector<int>& row_group_indices,
+                              const std::vector<int>& column_indices,
+                              std::shared_ptr<RecordBatchReader>* out) override {
+    // column indices check
+    std::shared_ptr<::arrow::Schema> schema;
+    RETURN_NOT_OK(GetSchema(column_indices, &schema));
+    for (auto row_group_index : row_group_indices) {
+      RETURN_NOT_OK(BoundsCheckRowGroup(row_group_index));
+    }
+    *out = std::make_shared<RowGroupRecordBatchReader>(row_group_indices, column_indices,
+                                                       schema, this, batch_size());
+    return Status::OK();
+  }
+
+  int num_columns() const { return reader_->metadata()->num_columns(); }
+
+  ParquetFileReader* parquet_reader() const override { return reader_.get(); }
+
+  int num_row_groups() const override { return reader_->metadata()->num_row_groups(); }
+
+  void set_use_threads(bool use_threads) override {
+    reader_properties_.set_use_threads(use_threads);
+  }
 
   int64_t batch_size() const { return reader_properties_.batch_size(); }
 
+  Status ScanContents(std::vector<int> columns, const int32_t column_batch_size,
+                      int64_t* num_rows) override {
+    BEGIN_PARQUET_CATCH_EXCEPTIONS
+    *num_rows = ScanFileContents(columns, column_batch_size, reader_.get());
+    return Status::OK();
+    END_PARQUET_CATCH_EXCEPTIONS
+  }
+
  private:
   MemoryPool* pool_;
   std::unique_ptr<ParquetFileReader> reader_;
   ArrowReaderProperties reader_properties_;
 };
 
-class ColumnReader::ColumnReaderImpl {
+class ColumnChunkReaderImpl : public ColumnChunkReader {
+ public:
+  ColumnChunkReaderImpl(FileReaderImpl* impl, int row_group_index, int column_index)
+      : impl_(impl), column_index_(column_index), row_group_index_(row_group_index) {}
+
+  Status Read(std::shared_ptr<::arrow::ChunkedArray>* out) override {
+    return impl_->ReadColumn(column_index_, {row_group_index_}, out);
+  }
+
+ private:
+  FileReaderImpl* impl_;
+  int column_index_;
+  int row_group_index_;
+};
+
+class RowGroupReaderImpl : public RowGroupReader {
+ public:
+  RowGroupReaderImpl(FileReaderImpl* impl, int row_group_index)
+      : impl_(impl), row_group_index_(row_group_index) {}
+
+  std::shared_ptr<ColumnChunkReader> Column(int column_index) override {
+    return std::shared_ptr<ColumnChunkReader>(
+        new ColumnChunkReaderImpl(impl_, row_group_index_, column_index));
+  }
+
+  Status ReadTable(const std::vector<int>& column_indices,
+                   std::shared_ptr<::arrow::Table>* out) override {
+    return impl_->ReadRowGroup(row_group_index_, column_indices, out);
+  }
+
+  Status ReadTable(std::shared_ptr<::arrow::Table>* out) override {
+    return impl_->ReadRowGroup(row_group_index_, out);
+  }
+
+ private:
+  FileReaderImpl* impl_;
+  int row_group_index_;
+};
+
+class ColumnReaderImpl : public ColumnReader {
  public:
-  virtual ~ColumnReaderImpl() {}
-  virtual Status NextBatch(int64_t records_to_read,
-                           std::shared_ptr<ChunkedArray>* out) = 0;
   virtual Status GetDefLevels(const int16_t** data, size_t* length) = 0;
   virtual Status GetRepLevels(const int16_t** data, size_t* length) = 0;
   virtual const std::shared_ptr<Field> field() = 0;
 };
 
 // Reader implementation for primitive arrays
-class PARQUET_NO_EXPORT PrimitiveImpl : public ColumnReader::ColumnReaderImpl {
+class PARQUET_NO_EXPORT PrimitiveImpl : public ColumnReaderImpl {
  public:
   PrimitiveImpl(MemoryPool* pool, std::unique_ptr<FileColumnIterator> input,
                 bool read_dictionary)
-      : pool_(pool),
-        input_(std::move(input)),
-        descr_(input_->descr()),
-        read_dictionary_(read_dictionary) {
+      : pool_(pool), input_(std::move(input)), descr_(input_->descr()) {
     record_reader_ = RecordReader::Make(descr_, pool_, read_dictionary);
     Status s = NodeToField(*input_->descr()->schema_node(), &field_);
     DCHECK_OK(s);
     NextRowGroup();
   }
 
-  Status NextBatch(int64_t records_to_read, std::shared_ptr<ChunkedArray>* out) override;
+  Status GetDefLevels(const int16_t** data, size_t* length) override {
+    *data = record_reader_->def_levels();
+    *length = record_reader_->levels_written();
+    return Status::OK();
+  }
 
-  Status WrapIntoListArray(Datum* inout_array);
+  Status GetRepLevels(const int16_t** data, size_t* length) override {
+    *data = record_reader_->rep_levels();
+    *length = record_reader_->levels_written();
+    return Status::OK();
+  }
 
-  Status GetDefLevels(const int16_t** data, size_t* length) override;
-  Status GetRepLevels(const int16_t** data, size_t* length) override;
+  Status NextBatch(int64_t records_to_read, std::shared_ptr<ChunkedArray>* out) override {
+    BEGIN_PARQUET_CATCH_EXCEPTIONS
+
+    // Pre-allocation gives much better performance for flat columns
+    record_reader_->Reserve(records_to_read);
+
+    record_reader_->Reset();
+    while (records_to_read > 0) {
+      if (!record_reader_->HasMoreData()) {
+        break;
+      }
+      int64_t records_read = record_reader_->ReadRecords(records_to_read);
+      records_to_read -= records_read;
+      if (records_read == 0) {
+        NextRowGroup();
+      }
+    }
+    RETURN_NOT_OK(
+        TransferColumnData(record_reader_.get(), field_->type(), descr_, pool_, out));
+
+    // Nest nested types, if not nested returns unmodifed
+    RETURN_NOT_OK(WrapIntoListArray(out));
+    return Status::OK();
+    END_PARQUET_CATCH_EXCEPTIONS
+  }
+
+  Status WrapIntoListArray(std::shared_ptr<ChunkedArray>* inout_array);
 
   const std::shared_ptr<Field> field() override { return field_; }
 
  private:
-  void NextRowGroup();
+  void NextRowGroup() {
+    std::unique_ptr<PageReader> page_reader = input_->NextChunk();
+    record_reader_->SetPageReader(std::move(page_reader));
+  }
 
   MemoryPool* pool_;
   std::unique_ptr<FileColumnIterator> input_;
@@ -335,1343 +501,171 @@ class PARQUET_NO_EXPORT PrimitiveImpl : public ColumnReader::ColumnReaderImpl {
 
   std::shared_ptr<RecordReader> record_reader_;
   std::shared_ptr<Field> field_;
-
-  // Are we reading directly to dictionary-encoded?
-  bool read_dictionary_;
 };
 
-// Reader implementation for struct array
-class PARQUET_NO_EXPORT StructImpl : public ColumnReader::ColumnReaderImpl {
- public:
-  explicit StructImpl(const std::vector<std::shared_ptr<ColumnReaderImpl>>& children,
-                      int16_t struct_def_level, MemoryPool* pool, const Node* node)
-      : children_(children), struct_def_level_(struct_def_level), pool_(pool) {
-    InitField(node, children);
+Status PrimitiveImpl::WrapIntoListArray(std::shared_ptr<ChunkedArray>* inout_array) {
+  if (descr_->max_repetition_level() == 0) {
+    // Flat, no action
+    return Status::OK();
   }
 
-  Status NextBatch(int64_t records_to_read, std::shared_ptr<ChunkedArray>* out) override;
-  Status GetDefLevels(const int16_t** data, size_t* length) override;
-  Status GetRepLevels(const int16_t** data, size_t* length) override;
-  const std::shared_ptr<Field> field() override { return field_; }
-
- private:
-  std::vector<std::shared_ptr<ColumnReaderImpl>> children_;
-  int16_t struct_def_level_;
-  MemoryPool* pool_;
-  std::shared_ptr<Field> field_;
-  std::shared_ptr<ResizableBuffer> def_levels_buffer_;
-
-  Status DefLevelsToNullArray(std::shared_ptr<Buffer>* null_bitmap, int64_t* null_count);
-  void InitField(const Node* node,
-                 const std::vector<std::shared_ptr<ColumnReaderImpl>>& children);
-};
-
-FileReader::FileReader(MemoryPool* pool, std::unique_ptr<ParquetFileReader> reader,
-                       const ArrowReaderProperties& properties)
-    : impl_(new FileReader::Impl(pool, std::move(reader), properties)) {}
-
-Status FileReader::Make(::arrow::MemoryPool* pool,
-                        std::unique_ptr<ParquetFileReader> reader,
-                        const ArrowReaderProperties& properties,
-                        std::unique_ptr<FileReader>* out) {
-  out->reset(new FileReader(pool, std::move(reader), properties));
-  return (*out)->impl_->Init();
-}
-
-Status FileReader::Make(::arrow::MemoryPool* pool,
-                        std::unique_ptr<ParquetFileReader> reader,
-                        std::unique_ptr<FileReader>* out) {
-  return Make(pool, std::move(reader), default_arrow_reader_properties(), out);
-}
-
-FileReader::~FileReader() {}
+  std::shared_ptr<Array> flat_array;
 
-Status FileReader::Impl::GetColumn(int i, FileColumnIteratorFactory iterator_factory,
-                                   std::unique_ptr<ColumnReader>* out) {
-  if (i < 0 || i >= this->num_columns()) {
-    return Status::Invalid("Column index out of bounds (got ", i,
-                           ", should be "
-                           "between 0 and ",
-                           this->num_columns() - 1, ")");
+  // ARROW-3762(wesm): If inout_array is a chunked array, we reject as this is
+  // not yet implemented
+  if ((*inout_array)->num_chunks() > 1) {
+    return Status::NotImplemented(
+        "Nested data conversions not implemented for "
+        "chunked array outputs");
   }
+  flat_array = (*inout_array)->chunk(0);
 
-  std::unique_ptr<FileColumnIterator> input(iterator_factory(i, reader_.get()));
+  const int16_t* def_levels = record_reader_->def_levels();
+  const int16_t* rep_levels = record_reader_->rep_levels();
+  const int64_t total_levels_read = record_reader_->levels_position();
 
-  std::unique_ptr<ColumnReader::ColumnReaderImpl> impl(
-      new PrimitiveImpl(pool_, std::move(input), reader_properties_.read_dictionary(i)));
-  *out = std::unique_ptr<ColumnReader>(new ColumnReader(std::move(impl)));
-  return Status::OK();
-}
+  std::shared_ptr<::arrow::Schema> arrow_schema;
+  RETURN_NOT_OK(FromParquetSchema(
+      input_->schema(), {input_->column_index()}, default_arrow_reader_properties(),
+      input_->metadata()->key_value_metadata(), &arrow_schema));
+  std::shared_ptr<Field> current_field = arrow_schema->field(0);
 
-Status FileReader::Impl::GetReaderForNode(
-    int index, const Node* node, const std::vector<int>& indices, int16_t def_level,
-    FileColumnIteratorFactory iterator_factory,
-    std::unique_ptr<ColumnReader::ColumnReaderImpl>* out) {
-  *out = nullptr;
+  if (current_field->type()->num_children() > 0 &&
+      flat_array->type_id() == ::arrow::Type::DICTIONARY) {
+    // XXX(wesm): Handling of nested types and dictionary encoding needs to be
+    // significantly refactored
+    return Status::Invalid("Cannot have nested types containing dictionary arrays yet");
+  }
 
-  if (schema::IsSimpleStruct(node)) {
-    const schema::GroupNode* group = static_cast<const schema::GroupNode*>(node);
-    std::vector<std::shared_ptr<ColumnReader::ColumnReaderImpl>> children;
-    for (int i = 0; i < group->field_count(); i++) {
-      std::unique_ptr<ColumnReader::ColumnReaderImpl> child_reader;
-      // TODO(itaiin): Remove the -1 index hack when all types of nested reads
-      // are supported. This currently just signals the lower level reader resolution
-      // to abort
-      RETURN_NOT_OK(GetReaderForNode(index, group->field(i).get(), indices,
-                                     static_cast<int16_t>(def_level + 1),
-                                     iterator_factory, &child_reader));
-      if (child_reader != nullptr) {
-        children.push_back(std::move(child_reader));
+  // Walk downwards to extract nullability
+  std::vector<bool> nullable;
+  std::vector<std::shared_ptr<::arrow::Int32Builder>> offset_builders;
+  std::vector<std::shared_ptr<::arrow::BooleanBuilder>> valid_bits_builders;
+  nullable.push_back(current_field->nullable());
+  while (current_field->type()->num_children() > 0) {
+    if (current_field->type()->num_children() > 1) {
+      return Status::NotImplemented("Fields with more than one child are not supported.");
+    } else {
+      if (current_field->type()->id() != ::arrow::Type::LIST) {
+        return Status::NotImplemented("Currently only nesting with Lists is supported.");
       }
+      current_field = current_field->type()->child(0);
     }
+    offset_builders.emplace_back(
+        std::make_shared<::arrow::Int32Builder>(::arrow::int32(), pool_));
+    valid_bits_builders.emplace_back(
+        std::make_shared<::arrow::BooleanBuilder>(::arrow::boolean(), pool_));
+    nullable.push_back(current_field->nullable());
+  }
 
-    if (children.size() > 0) {
-      *out = std::unique_ptr<ColumnReader::ColumnReaderImpl>(
-          new StructImpl(children, def_level, pool_, node));
+  int64_t list_depth = offset_builders.size();
+  // This describes the minimal definition that describes a level that
+  // reflects a value in the primitive values array.
+  int16_t values_def_level = descr_->max_definition_level();
+  if (nullable[nullable.size() - 1]) {
+    values_def_level--;
+  }
+
+  // The definition levels that are needed so that a list is declared
+  // as empty and not null.
+  std::vector<int16_t> empty_def_level(list_depth);
+  int def_level = 0;
+  for (int i = 0; i < list_depth; i++) {
+    if (nullable[i]) {
+      def_level++;
     }
-  } else {
-    // This should be a flat field case - translate the field index to
-    // the correct column index by walking down to the leaf node
-    const Node* walker = node;
-    while (!walker->is_primitive()) {
-      DCHECK(walker->is_group());
-      auto group = static_cast<const schema::GroupNode*>(walker);
-      if (group->field_count() != 1) {
-        return Status::NotImplemented("lists with structs are not supported.");
-      }
-      walker = group->field(0).get();
-    }
-    auto column_index = reader_->metadata()->schema()->ColumnIndex(*walker);
-
-    // If the index of the column is found then a reader for the column is needed.
-    // Otherwise *out keeps the nullptr value.
-    if (std::find(indices.begin(), indices.end(), column_index) != indices.end()) {
-      std::unique_ptr<ColumnReader> reader;
-      RETURN_NOT_OK(GetColumn(column_index, iterator_factory, &reader));
-      *out = std::move(reader->impl_);
-    }
-  }
-
-  return Status::OK();
-}
-
-Status FileReader::Impl::ReadSchemaField(int i, std::shared_ptr<ChunkedArray>* out) {
-  std::vector<int> indices(reader_->metadata()->num_columns());
-
-  for (size_t j = 0; j < indices.size(); ++j) {
-    indices[j] = static_cast<int>(j);
-  }
-
-  return ReadSchemaField(i, indices, out);
-}
-
-Status FileReader::Impl::ReadSchemaField(int i, const std::vector<int>& indices,
-                                         std::shared_ptr<ChunkedArray>* out) {
-  auto iterator_factory = FileColumnIterator::MakeAllRowGroupsIterator;
-  auto parquet_schema = reader_->metadata()->schema();
-  auto node = parquet_schema->group_node()->field(i).get();
-  std::unique_ptr<ColumnReader::ColumnReaderImpl> reader_impl;
-
-  RETURN_NOT_OK(GetReaderForNode(i, node, indices, 1, iterator_factory, &reader_impl));
-  if (reader_impl == nullptr) {
-    *out = nullptr;
-    return Status::OK();
-  }
-
-  std::unique_ptr<ColumnReader> reader(new ColumnReader(std::move(reader_impl)));
-
-  // TODO(wesm): This calculation doesn't make much sense when we have repeated
-  // schema nodes
-  int64_t records_to_read = 0;
-
-  const FileMetaData& metadata = *reader_->metadata();
-  for (int j = 0; j < metadata.num_row_groups(); j++) {
-    records_to_read += metadata.RowGroup(j)->ColumnChunk(i)->num_values();
-  }
-
-  return reader->NextBatch(records_to_read, out);
-}
-
-Status FileReader::Impl::ReadColumn(int i, std::shared_ptr<ChunkedArray>* out) {
-  auto iterator_factory = FileColumnIterator::MakeAllRowGroupsIterator;
-  std::unique_ptr<ColumnReader> flat_column_reader;
-  RETURN_NOT_OK(GetColumn(i, iterator_factory, &flat_column_reader));
-
-  int64_t records_to_read = 0;
-  for (int j = 0; j < reader_->metadata()->num_row_groups(); j++) {
-    records_to_read += reader_->metadata()->RowGroup(j)->ColumnChunk(i)->num_values();
-  }
-
-  return flat_column_reader->NextBatch(records_to_read, out);
-}
-
-Status FileReader::Impl::GetSchema(std::shared_ptr<::arrow::Schema>* out) {
-  return FromParquetSchema(reader_->metadata()->schema(), reader_properties_,
-                           reader_->metadata()->key_value_metadata(), out);
-}
-
-Status FileReader::Impl::GetSchema(const std::vector<int>& indices,
-                                   std::shared_ptr<::arrow::Schema>* out) {
-  return FromParquetSchema(reader_->metadata()->schema(), indices, reader_properties_,
-                           reader_->metadata()->key_value_metadata(), out);
-}
-
-Status FileReader::Impl::ReadColumnChunk(int column_index, int row_group_index,
-                                         std::shared_ptr<ChunkedArray>* out) {
-  std::vector<int> indices(reader_->metadata()->num_columns());
-
-  for (size_t i = 0; i < indices.size(); ++i) {
-    indices[i] = static_cast<int>(i);
-  }
-
-  return ReadColumnChunk(column_index, indices, row_group_index, out);
-}
-
-Status FileReader::Impl::ReadColumnChunk(int column_index,
-                                         const std::vector<int>& indices,
-                                         int row_group_index,
-                                         std::shared_ptr<ChunkedArray>* out) {
-  auto rg_metadata = reader_->metadata()->RowGroup(row_group_index);
-  int64_t records_to_read = rg_metadata->ColumnChunk(column_index)->num_values();
-
-  auto parquet_schema = reader_->metadata()->schema();
-  auto node = parquet_schema->group_node()->field(column_index).get();
-  std::unique_ptr<ColumnReader::ColumnReaderImpl> reader_impl;
-
-  FileColumnIteratorFactory iterator_factory = [row_group_index](
-                                                   int i, ParquetFileReader* reader) {
-    return FileColumnIterator::MakeSingleRowGroupIterator(i, reader, row_group_index);
-  };
-  RETURN_NOT_OK(
-      GetReaderForNode(column_index, node, indices, 1, iterator_factory, &reader_impl));
-  if (reader_impl == nullptr) {
-    *out = nullptr;
-    return Status::OK();
-  }
-  ColumnReader reader(std::move(reader_impl));
-
-  return reader.NextBatch(records_to_read, out);
-}
-
-Status FileReader::Impl::ReadRowGroup(int row_group_index,
-                                      const std::vector<int>& indices,
-                                      std::shared_ptr<Table>* out) {
-  std::shared_ptr<::arrow::Schema> schema;
-  RETURN_NOT_OK(GetSchema(indices, &schema));
-
-  auto rg_metadata = reader_->metadata()->RowGroup(row_group_index);
-
-  // We only need to read schema fields which have columns indicated
-  // in the indices vector
-  std::vector<int> field_indices;
-  if (!schema::ColumnIndicesToFieldIndices(*reader_->metadata()->schema(), indices,
-                                           &field_indices)) {
-    return Status::Invalid("Invalid column index");
-  }
-  int num_fields = static_cast<int>(field_indices.size());
-  std::vector<std::shared_ptr<ChunkedArray>> columns(num_fields);
-
-  // TODO(wesm): Refactor to share more code with ReadTable
-
-  auto ReadColumnFunc = [&indices, &field_indices, &row_group_index, &columns,
-                         this](int i) {
-    RETURN_NOT_OK(
-        ReadColumnChunk(field_indices[i], indices, row_group_index, &columns[i]));
-    return Status::OK();
-  };
-
-  if (reader_properties_.use_threads()) {
-    std::vector<std::future<Status>> futures;
-    auto pool = ::arrow::internal::GetCpuThreadPool();
-    for (int i = 0; i < num_fields; i++) {
-      futures.push_back(pool->Submit(ReadColumnFunc, i));
-    }
-    Status final_status = Status::OK();
-    for (auto& fut : futures) {
-      Status st = fut.get();
-      if (!st.ok()) {
-        final_status = std::move(st);
-      }
-    }
-    RETURN_NOT_OK(final_status);
-  } else {
-    for (int i = 0; i < num_fields; i++) {
-      RETURN_NOT_OK(ReadColumnFunc(i));
-    }
-  }
-
-  auto dict_indices = GetDictionaryIndices(indices);
-  if (!dict_indices.empty()) {
-    schema = FixSchema(*schema, dict_indices, columns);
-  }
-
-  std::shared_ptr<Table> table = Table::Make(schema, columns);
-  RETURN_NOT_OK(table->Validate());
-  *out = table;
-  return Status::OK();
-}
-
-Status FileReader::Impl::ReadTable(const std::vector<int>& indices,
-                                   std::shared_ptr<Table>* out) {
-  std::shared_ptr<::arrow::Schema> schema;
-  RETURN_NOT_OK(GetSchema(indices, &schema));
-
-  // We only need to read schema fields which have columns indicated
-  // in the indices vector
-  std::vector<int> field_indices;
-  if (!schema::ColumnIndicesToFieldIndices(*reader_->metadata()->schema(), indices,
-                                           &field_indices)) {
-    return Status::Invalid("Invalid column index");
-  }
-
-  int num_fields = static_cast<int>(field_indices.size());
-  std::vector<std::shared_ptr<ChunkedArray>> columns(num_fields);
-
-  auto ReadColumnFunc = [&indices, &field_indices, &columns, this](int i) {
-    return ReadSchemaField(field_indices[i], indices, &columns[i]);
-  };
-
-  if (reader_properties_.use_threads()) {
-    std::vector<std::future<Status>> futures;
-    auto pool = ::arrow::internal::GetCpuThreadPool();
-    for (int i = 0; i < num_fields; i++) {
-      futures.push_back(pool->Submit(ReadColumnFunc, i));
-    }
-    Status final_status = Status::OK();
-    for (auto& fut : futures) {
-      Status st = fut.get();
-      if (!st.ok()) {
-        final_status = std::move(st);
-      }
-    }
-    RETURN_NOT_OK(final_status);
-  } else {
-    for (int i = 0; i < num_fields; i++) {
-      RETURN_NOT_OK(ReadColumnFunc(i));
-    }
-  }
-
-  auto dict_indices = GetDictionaryIndices(indices);
-  if (!dict_indices.empty()) {
-    schema = FixSchema(*schema, dict_indices, columns);
-  }
-
-  std::shared_ptr<Table> table = Table::Make(schema, columns);
-  RETURN_NOT_OK(table->Validate());
-  *out = table;
-  return Status::OK();
-}
-
-Status FileReader::Impl::ReadTable(std::shared_ptr<Table>* table) {
-  std::vector<int> indices(reader_->metadata()->num_columns());
-
-  for (size_t i = 0; i < indices.size(); ++i) {
-    indices[i] = static_cast<int>(i);
-  }
-  return ReadTable(indices, table);
-}
-
-Status FileReader::Impl::ReadRowGroups(const std::vector<int>& row_groups,
-                                       const std::vector<int>& indices,
-                                       std::shared_ptr<Table>* table) {
-  std::vector<std::shared_ptr<Table>> tables(row_groups.size(), nullptr);
-
-  for (size_t i = 0; i < row_groups.size(); ++i) {
-    RETURN_NOT_OK(ReadRowGroup(row_groups[i], indices, &tables[i]));
-  }
-  return ConcatenateTables(tables, table);
-}
-
-Status FileReader::Impl::ReadRowGroups(const std::vector<int>& row_groups,
-                                       std::shared_ptr<Table>* table) {
-  std::vector<int> indices(reader_->metadata()->num_columns());
-
-  for (size_t i = 0; i < indices.size(); ++i) {
-    indices[i] = static_cast<int>(i);
-  }
-  return ReadRowGroups(row_groups, indices, table);
-}
-
-Status FileReader::Impl::ReadRowGroup(int i, std::shared_ptr<Table>* table) {
-  std::vector<int> indices(reader_->metadata()->num_columns());
-
-  for (size_t i = 0; i < indices.size(); ++i) {
-    indices[i] = static_cast<int>(i);
-  }
-  return ReadRowGroup(i, indices, table);
-}
-
-std::vector<int> FileReader::Impl::GetDictionaryIndices(const std::vector<int>& indices) {
-  // Select the column indices that were read as DictionaryArray
-  std::vector<int> dict_indices(indices);
-  auto remove_func = [this](int i) { return !reader_properties_.read_dictionary(i); };
-  auto it = std::remove_if(dict_indices.begin(), dict_indices.end(), remove_func);
-  dict_indices.erase(it, dict_indices.end());
-  return dict_indices;
-}
-
-std::shared_ptr<::arrow::Schema> FileReader::Impl::FixSchema(
-    const ::arrow::Schema& old_schema, const std::vector<int>& dict_indices,
-    const std::vector<std::shared_ptr<::arrow::ChunkedArray>>& columns) {
-  // Fix the schema with the actual DictionaryType that was read
-  auto fields = old_schema.fields();
-
-  for (int idx : dict_indices) {
-    fields[idx] = old_schema.field(idx)->WithType(columns[idx]->type());
-  }
-  return std::make_shared<::arrow::Schema>(fields, old_schema.metadata());
-}
-
-// Static ctor
-Status OpenFile(const std::shared_ptr<::arrow::io::RandomAccessFile>& file,
-                MemoryPool* pool, const ReaderProperties& props,
-                const std::shared_ptr<FileMetaData>& metadata,
-                std::unique_ptr<FileReader>* reader) {
-  std::unique_ptr<ParquetReader> pq_reader;
-  PARQUET_CATCH_NOT_OK(pq_reader = ParquetReader::Open(file, props, metadata));
-  return FileReader::Make(pool, std::move(pq_reader), default_arrow_reader_properties(),
-                          reader);
-}
-
-Status OpenFile(const std::shared_ptr<::arrow::io::RandomAccessFile>& file,
-                MemoryPool* pool, std::unique_ptr<FileReader>* reader) {
-  return OpenFile(file, pool, ::parquet::default_reader_properties(), nullptr, reader);
-}
-
-Status OpenFile(const std::shared_ptr<::arrow::io::RandomAccessFile>& file,
-                ::arrow::MemoryPool* pool, const ArrowReaderProperties& properties,
-                std::unique_ptr<FileReader>* reader) {
-  std::unique_ptr<ParquetReader> pq_reader;
-  PARQUET_CATCH_NOT_OK(pq_reader = ParquetReader::Open(
-                           file, ::parquet::default_reader_properties(), nullptr));
-  return FileReader::Make(pool, std::move(pq_reader), properties, reader);
-}
-
-Status FileReader::GetColumn(int i, std::unique_ptr<ColumnReader>* out) {
-  auto iterator_factory = FileColumnIterator::MakeAllRowGroupsIterator;
-  return impl_->GetColumn(i, iterator_factory, out);
-}
-
-Status FileReader::GetSchema(std::shared_ptr<::arrow::Schema>* out) {
-  return impl_->GetSchema(out);
-}
-
-Status FileReader::GetSchema(const std::vector<int>& indices,
-                             std::shared_ptr<::arrow::Schema>* out) {
-  return impl_->GetSchema(indices, out);
-}
-
-Status FileReader::ReadColumn(int i, std::shared_ptr<ChunkedArray>* out) {
-  try {
-    return impl_->ReadColumn(i, out);
-  } catch (const ::parquet::ParquetException& e) {
-    return ::arrow::Status::IOError(e.what());
-  }
-}
-
-Status FileReader::ReadSchemaField(int i, std::shared_ptr<ChunkedArray>* out) {
-  try {
-    return impl_->ReadSchemaField(i, out);
-  } catch (const ::parquet::ParquetException& e) {
-    return ::arrow::Status::IOError(e.what());
-  }
-}
-
-Status FileReader::ReadColumn(int i, std::shared_ptr<Array>* out) {
-  std::shared_ptr<ChunkedArray> chunked_out;
-  RETURN_NOT_OK(ReadColumn(i, &chunked_out));
-  return GetSingleChunk(*chunked_out, out);
-}
-
-Status FileReader::ReadSchemaField(int i, std::shared_ptr<Array>* out) {
-  std::shared_ptr<ChunkedArray> chunked_out;
-  RETURN_NOT_OK(ReadSchemaField(i, &chunked_out));
-  return GetSingleChunk(*chunked_out, out);
-}
-
-Status FileReader::GetRecordBatchReader(const std::vector<int>& row_group_indices,
-                                        std::shared_ptr<RecordBatchReader>* out) {
-  std::vector<int> indices(impl_->num_columns());
-
-  for (size_t j = 0; j < indices.size(); ++j) {
-    indices[j] = static_cast<int>(j);
-  }
-
-  return GetRecordBatchReader(row_group_indices, indices, out);
-}
-
-Status FileReader::GetRecordBatchReader(const std::vector<int>& row_group_indices,
-                                        const std::vector<int>& column_indices,
-                                        std::shared_ptr<RecordBatchReader>* out) {
-  // column indices check
-  std::shared_ptr<::arrow::Schema> schema;
-  RETURN_NOT_OK(GetSchema(column_indices, &schema));
-
-  // row group indices check
-  int max_num = num_row_groups();
-  for (auto row_group_index : row_group_indices) {
-    if (row_group_index < 0 || row_group_index >= max_num) {
-      return Status::Invalid("Some index in row_group_indices is ", row_group_index,
-                             ", which is either < 0 or >= num_row_groups(", max_num, ")");
-    }
-  }
-
-  *out = std::make_shared<RowGroupRecordBatchReader>(row_group_indices, column_indices,
-                                                     schema, this, impl_->batch_size());
-  return Status::OK();
-}
-
-Status FileReader::ReadTable(std::shared_ptr<Table>* out) {
-  try {
-    return impl_->ReadTable(out);
-  } catch (const ::parquet::ParquetException& e) {
-    return ::arrow::Status::IOError(e.what());
-  }
-}
-
-Status FileReader::ReadTable(const std::vector<int>& indices,
-                             std::shared_ptr<Table>* out) {
-  try {
-    return impl_->ReadTable(indices, out);
-  } catch (const ::parquet::ParquetException& e) {
-    return ::arrow::Status::IOError(e.what());
-  }
-}
-
-Status FileReader::ReadRowGroup(int i, std::shared_ptr<Table>* out) {
-  try {
-    return impl_->ReadRowGroup(i, out);
-  } catch (const ::parquet::ParquetException& e) {
-    return ::arrow::Status::IOError(e.what());
-  }
-}
-
-Status FileReader::ReadRowGroup(int i, const std::vector<int>& indices,
-                                std::shared_ptr<Table>* out) {
-  try {
-    return impl_->ReadRowGroup(i, indices, out);
-  } catch (const ::parquet::ParquetException& e) {
-    return ::arrow::Status::IOError(e.what());
-  }
-}
-
-Status FileReader::ReadRowGroups(const std::vector<int>& row_groups,
-                                 std::shared_ptr<Table>* out) {
-  try {
-    return impl_->ReadRowGroups(row_groups, out);
-  } catch (const ::parquet::ParquetException& e) {
-    return ::arrow::Status::IOError(e.what());
-  }
-}
-
-Status FileReader::ReadRowGroups(const std::vector<int>& row_groups,
-                                 const std::vector<int>& indices,
-                                 std::shared_ptr<Table>* out) {
-  try {
-    return impl_->ReadRowGroups(row_groups, indices, out);
-  } catch (const ::parquet::ParquetException& e) {
-    return ::arrow::Status::IOError(e.what());
-  }
-}
-
-std::shared_ptr<RowGroupReader> FileReader::RowGroup(int row_group_index) {
-  return std::shared_ptr<RowGroupReader>(
-      new RowGroupReader(impl_.get(), row_group_index));
-}
-
-int FileReader::num_row_groups() const { return impl_->num_row_groups(); }
-
-void FileReader::set_num_threads(int num_threads) {}
-
-void FileReader::set_use_threads(bool use_threads) {
-  impl_->set_use_threads(use_threads);
-}
-
-Status FileReader::ScanContents(std::vector<int> columns, const int32_t column_batch_size,
-                                int64_t* num_rows) {
-  try {
-    *num_rows = ScanFileContents(columns, column_batch_size, impl_->reader());
-    return Status::OK();
-  } catch (const ::parquet::ParquetException& e) {
-    return Status::IOError(e.what());
-  }
-}
-
-const ParquetFileReader* FileReader::parquet_reader() const {
-  return impl_->parquet_reader();
-}
-
-Status PrimitiveImpl::WrapIntoListArray(Datum* inout_array) {
-  if (descr_->max_repetition_level() == 0) {
-    // Flat, no action
-    return Status::OK();
-  }
-
-  std::shared_ptr<Array> flat_array;
-
-  // ARROW-3762(wesm): If inout_array is a chunked array, we reject as this is
-  // not yet implemented
-  if (inout_array->kind() == Datum::CHUNKED_ARRAY) {
-    if (inout_array->chunked_array()->num_chunks() > 1) {
-      return Status::NotImplemented(
-          "Nested data conversions not implemented for "
-          "chunked array outputs");
-    }
-    flat_array = inout_array->chunked_array()->chunk(0);
-  } else {
-    DCHECK_EQ(Datum::ARRAY, inout_array->kind());
-    flat_array = inout_array->make_array();
-  }
-
-  const int16_t* def_levels = record_reader_->def_levels();
-  const int16_t* rep_levels = record_reader_->rep_levels();
-  const int64_t total_levels_read = record_reader_->levels_position();
-
-  std::shared_ptr<::arrow::Schema> arrow_schema;
-  RETURN_NOT_OK(FromParquetSchema(
-      input_->schema(), {input_->column_index()}, default_arrow_reader_properties(),
-      input_->metadata()->key_value_metadata(), &arrow_schema));
-  std::shared_ptr<Field> current_field = arrow_schema->field(0);
-
-  if (current_field->type()->num_children() > 0 &&
-      flat_array->type_id() == ::arrow::Type::DICTIONARY) {
-    // XXX(wesm): Handling of nested types and dictionary encoding needs to be
-    // significantly refactored
-    return Status::Invalid("Cannot have nested types containing dictionary arrays yet");
-  }
-
-  // Walk downwards to extract nullability
-  std::vector<bool> nullable;
-  std::vector<std::shared_ptr<::arrow::Int32Builder>> offset_builders;
-  std::vector<std::shared_ptr<::arrow::BooleanBuilder>> valid_bits_builders;
-  nullable.push_back(current_field->nullable());
-  while (current_field->type()->num_children() > 0) {
-    if (current_field->type()->num_children() > 1) {
-      return Status::NotImplemented("Fields with more than one child are not supported.");
-    } else {
-      if (current_field->type()->id() != ::arrow::Type::LIST) {
-        return Status::NotImplemented("Currently only nesting with Lists is supported.");
-      }
-      current_field = current_field->type()->child(0);
-    }
-    offset_builders.emplace_back(
-        std::make_shared<::arrow::Int32Builder>(::arrow::int32(), pool_));
-    valid_bits_builders.emplace_back(
-        std::make_shared<::arrow::BooleanBuilder>(::arrow::boolean(), pool_));
-    nullable.push_back(current_field->nullable());
-  }
-
-  int64_t list_depth = offset_builders.size();
-  // This describes the minimal definition that describes a level that
-  // reflects a value in the primitive values array.
-  int16_t values_def_level = descr_->max_definition_level();
-  if (nullable[nullable.size() - 1]) {
-    values_def_level--;
-  }
-
-  // The definition levels that are needed so that a list is declared
-  // as empty and not null.
-  std::vector<int16_t> empty_def_level(list_depth);
-  int def_level = 0;
-  for (int i = 0; i < list_depth; i++) {
-    if (nullable[i]) {
-      def_level++;
-    }
-    empty_def_level[i] = static_cast<int16_t>(def_level);
-    def_level++;
-  }
-
-  int32_t values_offset = 0;
-  std::vector<int64_t> null_counts(list_depth, 0);
-  for (int64_t i = 0; i < total_levels_read; i++) {
-    int16_t rep_level = rep_levels[i];
-    if (rep_level < descr_->max_repetition_level()) {
-      for (int64_t j = rep_level; j < list_depth; j++) {
-        if (j == (list_depth - 1)) {
-          RETURN_NOT_OK(offset_builders[j]->Append(values_offset));
-        } else {
-          RETURN_NOT_OK(offset_builders[j]->Append(
-              static_cast<int32_t>(offset_builders[j + 1]->length())));
-        }
-
-        if (((empty_def_level[j] - 1) == def_levels[i]) && (nullable[j])) {
-          RETURN_NOT_OK(valid_bits_builders[j]->Append(false));
-          null_counts[j]++;
-          break;
-        } else {
-          RETURN_NOT_OK(valid_bits_builders[j]->Append(true));
-          if (empty_def_level[j] == def_levels[i]) {
-            break;
-          }
-        }
-      }
-    }
-    if (def_levels[i] >= values_def_level) {
-      values_offset++;
-    }
-  }
-  // Add the final offset to all lists
-  for (int64_t j = 0; j < list_depth; j++) {
-    if (j == (list_depth - 1)) {
-      RETURN_NOT_OK(offset_builders[j]->Append(values_offset));
-    } else {
-      RETURN_NOT_OK(offset_builders[j]->Append(
-          static_cast<int32_t>(offset_builders[j + 1]->length())));
-    }
-  }
-
-  std::vector<std::shared_ptr<Buffer>> offsets;
-  std::vector<std::shared_ptr<Buffer>> valid_bits;
-  std::vector<int64_t> list_lengths;
-  for (int64_t j = 0; j < list_depth; j++) {
-    list_lengths.push_back(offset_builders[j]->length() - 1);
-    std::shared_ptr<Array> array;
-    RETURN_NOT_OK(offset_builders[j]->Finish(&array));
-    offsets.emplace_back(std::static_pointer_cast<Int32Array>(array)->values());
-    RETURN_NOT_OK(valid_bits_builders[j]->Finish(&array));
-    valid_bits.emplace_back(std::static_pointer_cast<BooleanArray>(array)->values());
-  }
-
-  std::shared_ptr<Array> output = flat_array;
-  for (int64_t j = list_depth - 1; j >= 0; j--) {
-    auto list_type =
-        ::arrow::list(::arrow::field("item", output->type(), nullable[j + 1]));
-    output = std::make_shared<::arrow::ListArray>(list_type, list_lengths[j], offsets[j],
-                                                  output, valid_bits[j], null_counts[j]);
-  }
-  *inout_array = output;
-  return Status::OK();
-}
-
-// ----------------------------------------------------------------------
-// Primitive types
-
-template <typename ArrowType, typename ParquetType, typename Enable = void>
-struct TransferFunctor {};
-
-template <typename ArrowType, typename ParquetType>
-Status TransferInt(RecordReader* reader, MemoryPool* pool,
-                   const std::shared_ptr<DataType>& type, Datum* out) {
-  using ArrowCType = typename ArrowType::c_type;
-  using ParquetCType = typename ParquetType::c_type;
-  int64_t length = reader->values_written();
-  std::shared_ptr<Buffer> data;
-  RETURN_NOT_OK(::arrow::AllocateBuffer(pool, length * sizeof(ArrowCType), &data));
-
-  auto values = reinterpret_cast<const ParquetCType*>(reader->values());
-  auto out_ptr = reinterpret_cast<ArrowCType*>(data->mutable_data());
-  std::copy(values, values + length, out_ptr);
-  *out = std::make_shared<ArrayType<ArrowType>>(
-      type, length, data, reader->ReleaseIsValid(), reader->null_count());
-  return Status::OK();
-}
-
-std::shared_ptr<Array> TransferZeroCopy(RecordReader* reader,
-                                        const std::shared_ptr<DataType>& type) {
-  std::vector<std::shared_ptr<Buffer>> buffers = {reader->ReleaseIsValid(),
-                                                  reader->ReleaseValues()};
-  auto data = std::make_shared<::arrow::ArrayData>(type, reader->values_written(),
-                                                   buffers, reader->null_count());
-  return MakeArray(data);
-}
-
-Status TransferBool(RecordReader* reader, MemoryPool* pool, Datum* out) {
-  int64_t length = reader->values_written();
-  std::shared_ptr<Buffer> data;
-
-  const int64_t buffer_size = BitUtil::BytesForBits(length);
-  RETURN_NOT_OK(::arrow::AllocateBuffer(pool, buffer_size, &data));
-
-  // Transfer boolean values to packed bitmap
-  auto values = reinterpret_cast<const bool*>(reader->values());
-  uint8_t* data_ptr = data->mutable_data();
-  memset(data_ptr, 0, buffer_size);
-
-  for (int64_t i = 0; i < length; i++) {
-    if (values[i]) {
-      ::arrow::BitUtil::SetBit(data_ptr, i);
-    }
-  }
-
-  *out = std::make_shared<BooleanArray>(length, data, reader->ReleaseIsValid(),
-                                        reader->null_count());
-  return Status::OK();
-}
-
-template <>
-struct TransferFunctor<::arrow::TimestampType, Int96Type> {
-  Status operator()(RecordReader* reader, MemoryPool* pool,
-                    const std::shared_ptr<DataType>& type, Datum* out) {
-    int64_t length = reader->values_written();
-    auto values = reinterpret_cast<const Int96*>(reader->values());
-
-    std::shared_ptr<Buffer> data;
-    RETURN_NOT_OK(::arrow::AllocateBuffer(pool, length * sizeof(int64_t), &data));
-
-    auto data_ptr = reinterpret_cast<int64_t*>(data->mutable_data());
-    for (int64_t i = 0; i < length; i++) {
-      *data_ptr++ = Int96GetNanoSeconds(values[i]);
-    }
-
-    *out = std::make_shared<TimestampArray>(type, length, data, reader->ReleaseIsValid(),
-                                            reader->null_count());
-    return Status::OK();
-  }
-};
-
-Status TransferDate64(RecordReader* reader, MemoryPool* pool,
-                      const std::shared_ptr<DataType>& type, Datum* out) {
-  int64_t length = reader->values_written();
-  auto values = reinterpret_cast<const int32_t*>(reader->values());
-
-  std::shared_ptr<Buffer> data;
-  RETURN_NOT_OK(::arrow::AllocateBuffer(pool, length * sizeof(int64_t), &data));
-  auto out_ptr = reinterpret_cast<int64_t*>(data->mutable_data());
-
-  for (int64_t i = 0; i < length; i++) {
-    *out_ptr++ = static_cast<int64_t>(values[i]) * kMillisecondsPerDay;
-  }
-
-  *out = std::make_shared<::arrow::Date64Array>(
-      type, length, data, reader->ReleaseIsValid(), reader->null_count());
-  return Status::OK();
-}
-
-// ----------------------------------------------------------------------
-// Binary, direct to dictionary-encoded
-
-// Some ugly hacks here for now to handle binary dictionaries casted to their
-// logical type
-std::shared_ptr<Array> ShallowCast(const Array& arr,
-                                   const std::shared_ptr<DataType>& new_type) {
-  std::shared_ptr<::arrow::ArrayData> new_data = arr.data()->Copy();
-  new_data->type = new_type;
-  if (new_type->id() == ::arrow::Type::DICTIONARY) {
-    // Cast dictionary, too
-    const auto& dict_type = static_cast<const ::arrow::DictionaryType&>(*new_type);
-    new_data->dictionary = ShallowCast(*new_data->dictionary, dict_type.value_type());
-  }
-  return MakeArray(new_data);
-}
-
-std::shared_ptr<ChunkedArray> CastChunksTo(
-    const ChunkedArray& data, const std::shared_ptr<DataType>& logical_value_type) {
-  std::vector<std::shared_ptr<Array>> string_chunks;
-  for (int i = 0; i < data.num_chunks(); ++i) {
-    string_chunks.push_back(ShallowCast(*data.chunk(i), logical_value_type));
-  }
-  return std::make_shared<ChunkedArray>(string_chunks);
-}
-
-Status TransferDictionary(RecordReader* reader,
-                          const std::shared_ptr<DataType>& logical_value_type,
-                          std::shared_ptr<ChunkedArray>* out) {
-  auto dict_reader = dynamic_cast<internal::DictionaryRecordReader*>(reader);
-  DCHECK(dict_reader);
-  *out = dict_reader->GetResult();
-
-  const auto& dict_type = static_cast<const ::arrow::DictionaryType&>(*(*out)->type());
-  if (!logical_value_type->Equals(*dict_type.value_type())) {
-    *out = CastChunksTo(**out,
-                        ::arrow::dictionary(dict_type.index_type(), logical_value_type));
-  }
-  return Status::OK();
-}
-
-Status TransferBinary(RecordReader* reader,
-                      const std::shared_ptr<DataType>& logical_value_type,
-                      bool read_dictionary, std::shared_ptr<ChunkedArray>* out) {
-  if (read_dictionary) {
-    return TransferDictionary(reader, logical_value_type, out);
-  }
-  auto binary_reader = dynamic_cast<internal::BinaryRecordReader*>(reader);
-  DCHECK(binary_reader);
-  *out = std::make_shared<ChunkedArray>(binary_reader->GetBuilderChunks());
-  if (!logical_value_type->Equals(*(*out)->type())) {
-    *out = CastChunksTo(**out, logical_value_type);
-  }
-  return Status::OK();
-}
-
-// ----------------------------------------------------------------------
-// INT32 / INT64 / BYTE_ARRAY / FIXED_LEN_BYTE_ARRAY -> Decimal128
-
-static uint64_t BytesToInteger(const uint8_t* bytes, int32_t start, int32_t stop) {
-  const int32_t length = stop - start;
-
-  DCHECK_GE(length, 0);
-  DCHECK_LE(length, 8);
-
-  switch (length) {
-    case 0:
-      return 0;
-    case 1:
-      return bytes[start];
-    case 2:
-      return FromBigEndian(SafeLoadAs<uint16_t>(bytes + start));
-    case 3: {
-      const uint64_t first_two_bytes = FromBigEndian(SafeLoadAs<uint16_t>(bytes + start));
-      const uint64_t last_byte = bytes[stop - 1];
-      return first_two_bytes << 8 | last_byte;
-    }
-    case 4:
-      return FromBigEndian(SafeLoadAs<uint32_t>(bytes + start));
-    case 5: {
-      const uint64_t first_four_bytes =
-          FromBigEndian(SafeLoadAs<uint32_t>(bytes + start));
-      const uint64_t last_byte = bytes[stop - 1];
-      return first_four_bytes << 8 | last_byte;
-    }
-    case 6: {
-      const uint64_t first_four_bytes =
-          FromBigEndian(SafeLoadAs<uint32_t>(bytes + start));
-      const uint64_t last_two_bytes =
-          FromBigEndian(SafeLoadAs<uint16_t>(bytes + start + 4));
-      return first_four_bytes << 16 | last_two_bytes;
-    }
-    case 7: {
-      const uint64_t first_four_bytes =
-          FromBigEndian(SafeLoadAs<uint32_t>(bytes + start));
-      const uint64_t second_two_bytes =
-          FromBigEndian(SafeLoadAs<uint16_t>(bytes + start + 4));
-      const uint64_t last_byte = bytes[stop - 1];
-      return first_four_bytes << 24 | second_two_bytes << 8 | last_byte;
-    }
-    case 8:
-      return FromBigEndian(SafeLoadAs<uint64_t>(bytes + start));
-    default: {
-      DCHECK(false);
-      return UINT64_MAX;
-    }
-  }
-}
-
-static constexpr int32_t kMinDecimalBytes = 1;
-static constexpr int32_t kMaxDecimalBytes = 16;
-
-/// \brief Convert a sequence of big-endian bytes to one int64_t (high bits) and one
-/// uint64_t (low bits).
-static void BytesToIntegerPair(const uint8_t* bytes, const int32_t length,
-                               int64_t* out_high, uint64_t* out_low) {
-  DCHECK_GE(length, kMinDecimalBytes);
-  DCHECK_LE(length, kMaxDecimalBytes);
-
-  // XXX This code is copied from Decimal::FromBigEndian
-
-  int64_t high, low;
-
-  // Bytes are coming in big-endian, so the first byte is the MSB and therefore holds the
-  // sign bit.
-  const bool is_negative = static_cast<int8_t>(bytes[0]) < 0;
-
-  // 1. Extract the high bytes
-  // Stop byte of the high bytes
-  const int32_t high_bits_offset = std::max(0, length - 8);
-  const auto high_bits = BytesToInteger(bytes, 0, high_bits_offset);
-
-  if (high_bits_offset == 8) {
-    // Avoid undefined shift by 64 below
-    high = high_bits;
-  } else {
-    high = -1 * (is_negative && length < kMaxDecimalBytes);
-    // Shift left enough bits to make room for the incoming int64_t
-    high = SafeLeftShift(high, high_bits_offset * CHAR_BIT);
-    // Preserve the upper bits by inplace OR-ing the int64_t
-    high |= high_bits;
-  }
-
-  // 2. Extract the low bytes
-  // Stop byte of the low bytes
-  const int32_t low_bits_offset = std::min(length, 8);
-  const auto low_bits = BytesToInteger(bytes, high_bits_offset, length);
-
-  if (low_bits_offset == 8) {
-    // Avoid undefined shift by 64 below
-    low = low_bits;
-  } else {
-    // Sign extend the low bits if necessary
-    low = -1 * (is_negative && length < 8);
-    // Shift left enough bits to make room for the incoming int64_t
-    low = SafeLeftShift(low, low_bits_offset * CHAR_BIT);
-    // Preserve the upper bits by inplace OR-ing the int64_t
-    low |= low_bits;
-  }
-
-  *out_high = high;
-  *out_low = static_cast<uint64_t>(low);
-}
-
-static inline void RawBytesToDecimalBytes(const uint8_t* value, int32_t byte_width,
-                                          uint8_t* out_buf) {
-  // view the first 8 bytes as an unsigned 64-bit integer
-  auto low = reinterpret_cast<uint64_t*>(out_buf);
-
-  // view the second 8 bytes as a signed 64-bit integer
-  auto high = reinterpret_cast<int64_t*>(out_buf + sizeof(uint64_t));
-
-  // Convert the fixed size binary array bytes into a Decimal128 compatible layout
-  BytesToIntegerPair(value, byte_width, high, low);
-}
-
-template <typename T>
-Status ConvertToDecimal128(const Array& array, const std::shared_ptr<DataType>&,
-                           MemoryPool* pool, std::shared_ptr<Array>*) {
-  return Status::NotImplemented("not implemented");
-}
-
-template <>
-Status ConvertToDecimal128<FLBAType>(const Array& array,
-                                     const std::shared_ptr<DataType>& type,
-                                     MemoryPool* pool, std::shared_ptr<Array>* out) {
-  const auto& fixed_size_binary_array =
-      static_cast<const ::arrow::FixedSizeBinaryArray&>(array);
-
-  // The byte width of each decimal value
-  const int32_t type_length =
-      static_cast<const ::arrow::Decimal128Type&>(*type).byte_width();
-
-  // number of elements in the entire array
-  const int64_t length = fixed_size_binary_array.length();
-
-  // Get the byte width of the values in the FixedSizeBinaryArray. Most of the time
-  // this will be different from the decimal array width because we write the minimum
-  // number of bytes necessary to represent a given precision
-  const int32_t byte_width =
-      static_cast<const ::arrow::FixedSizeBinaryType&>(*fixed_size_binary_array.type())
-          .byte_width();
-
-  // allocate memory for the decimal array
-  std::shared_ptr<Buffer> data;
-  RETURN_NOT_OK(::arrow::AllocateBuffer(pool, length * type_length, &data));
-
-  // raw bytes that we can write to
-  uint8_t* out_ptr = data->mutable_data();
-
-  // convert each FixedSizeBinary value to valid decimal bytes
-  const int64_t null_count = fixed_size_binary_array.null_count();
-  if (null_count > 0) {
-    for (int64_t i = 0; i < length; ++i, out_ptr += type_length) {
-      if (!fixed_size_binary_array.IsNull(i)) {
-        RawBytesToDecimalBytes(fixed_size_binary_array.GetValue(i), byte_width, out_ptr);
-      }
-    }
-  } else {
-    for (int64_t i = 0; i < length; ++i, out_ptr += type_length) {
-      RawBytesToDecimalBytes(fixed_size_binary_array.GetValue(i), byte_width, out_ptr);
-    }
-  }
-
-  *out = std::make_shared<::arrow::Decimal128Array>(
-      type, length, data, fixed_size_binary_array.null_bitmap(), null_count);
-
-  return Status::OK();
-}
-
-template <>
-Status ConvertToDecimal128<ByteArrayType>(const Array& array,
-                                          const std::shared_ptr<DataType>& type,
-                                          MemoryPool* pool, std::shared_ptr<Array>* out) {
-  const auto& binary_array = static_cast<const ::arrow::BinaryArray&>(array);
-  const int64_t length = binary_array.length();
-
-  const auto& decimal_type = static_cast<const ::arrow::Decimal128Type&>(*type);
-  const int64_t type_length = decimal_type.byte_width();
-
-  std::shared_ptr<Buffer> data;
-  RETURN_NOT_OK(::arrow::AllocateBuffer(pool, length * type_length, &data));
-
-  // raw bytes that we can write to
-  uint8_t* out_ptr = data->mutable_data();
-
-  const int64_t null_count = binary_array.null_count();
-
-  // convert each BinaryArray value to valid decimal bytes
-  for (int64_t i = 0; i < length; i++, out_ptr += type_length) {
-    int32_t record_len = 0;
-    const uint8_t* record_loc = binary_array.GetValue(i, &record_len);
-
-    if ((record_len < 0) || (record_len > type_length)) {
-      return Status::Invalid("Invalid BYTE_ARRAY size");
-    }
-
-    auto out_ptr_view = reinterpret_cast<uint64_t*>(out_ptr);
-    out_ptr_view[0] = 0;
-    out_ptr_view[1] = 0;
-
-    // only convert rows that are not null if there are nulls, or
-    // all rows, if there are not
-    if (((null_count > 0) && !binary_array.IsNull(i)) || (null_count <= 0)) {
-      RawBytesToDecimalBytes(record_loc, record_len, out_ptr);
-    }
-  }
-
-  *out = std::make_shared<::arrow::Decimal128Array>(
-      type, length, data, binary_array.null_bitmap(), null_count);
-  return Status::OK();
-}
-
-/// \brief Convert an Int32 or Int64 array into a Decimal128Array
-/// The parquet spec allows systems to write decimals in int32, int64 if the values are
-/// small enough to fit in less 4 bytes or less than 8 bytes, respectively.
-/// This function implements the conversion from int32 and int64 arrays to decimal arrays.
-template <typename ParquetIntegerType,
-          typename = typename std::enable_if<
-              std::is_same<ParquetIntegerType, Int32Type>::value ||
-              std::is_same<ParquetIntegerType, Int64Type>::value>::type>
-static Status DecimalIntegerTransfer(RecordReader* reader, MemoryPool* pool,
-                                     const std::shared_ptr<DataType>& type, Datum* out) {
-  DCHECK_EQ(type->id(), ::arrow::Type::DECIMAL);
-
-  const int64_t length = reader->values_written();
-
-  using ElementType = typename ParquetIntegerType::c_type;
-  static_assert(std::is_same<ElementType, int32_t>::value ||
-                    std::is_same<ElementType, int64_t>::value,
-                "ElementType must be int32_t or int64_t");
-
-  const auto values = reinterpret_cast<const ElementType*>(reader->values());
-
-  const auto& decimal_type = static_cast<const ::arrow::Decimal128Type&>(*type);
-  const int64_t type_length = decimal_type.byte_width();
-
-  std::shared_ptr<Buffer> data;
-  RETURN_NOT_OK(::arrow::AllocateBuffer(pool, length * type_length, &data));
-  uint8_t* out_ptr = data->mutable_data();
-
-  using ::arrow::BitUtil::FromLittleEndian;
-
-  for (int64_t i = 0; i < length; ++i, out_ptr += type_length) {
-    // sign/zero extend int32_t values, otherwise a no-op
-    const auto value = static_cast<int64_t>(values[i]);
-
-    auto out_ptr_view = reinterpret_cast<uint64_t*>(out_ptr);
-
-    // No-op on little endian machines, byteswap on big endian
-    out_ptr_view[0] = FromLittleEndian(static_cast<uint64_t>(value));
-
-    // no need to byteswap here because we're sign/zero extending exactly 8 bytes
-    out_ptr_view[1] = static_cast<uint64_t>(value < 0 ? -1 : 0);
-  }
-
-  if (reader->nullable_values()) {
-    std::shared_ptr<ResizableBuffer> is_valid = reader->ReleaseIsValid();
-    *out = std::make_shared<::arrow::Decimal128Array>(type, length, data, is_valid,
-                                                      reader->null_count());
-  } else {
-    *out = std::make_shared<::arrow::Decimal128Array>(type, length, data);
-  }
-  return Status::OK();
-}
-
-/// \brief Convert an arrow::BinaryArray to an arrow::Decimal128Array
-/// We do this by:
-/// 1. Creating an arrow::BinaryArray from the RecordReader's builder
-/// 2. Allocating a buffer for the arrow::Decimal128Array
-/// 3. Converting the big-endian bytes in each BinaryArray entry to two integers
-///    representing the high and low bits of each decimal value.
-template <typename ArrowType, typename ParquetType>
-struct TransferFunctor<
-    ArrowType, ParquetType,
-    typename std::enable_if<std::is_same<ArrowType, ::arrow::Decimal128Type>::value &&
-                            (std::is_same<ParquetType, ByteArrayType>::value ||
-                             std::is_same<ParquetType, FLBAType>::value)>::type> {
-  Status operator()(RecordReader* reader, MemoryPool* pool,
-                    const std::shared_ptr<DataType>& type, Datum* out) {
-    DCHECK_EQ(type->id(), ::arrow::Type::DECIMAL);
-
-    auto binary_reader = dynamic_cast<internal::BinaryRecordReader*>(reader);
-    DCHECK(binary_reader);
-    ::arrow::ArrayVector chunks = binary_reader->GetBuilderChunks();
-
-    for (size_t i = 0; i < chunks.size(); ++i) {
-      std::shared_ptr<Array> chunk_as_decimal;
-      RETURN_NOT_OK(
-          ConvertToDecimal128<ParquetType>(*chunks[i], type, pool, &chunk_as_decimal));
-
-      // Replace the chunk, which will hopefully also free memory as we go
-      chunks[i] = chunk_as_decimal;
-    }
-    *out = std::make_shared<ChunkedArray>(chunks);
-    return Status::OK();
+    empty_def_level[i] = static_cast<int16_t>(def_level);
+    def_level++;
   }
-};
 
-#define TRANSFER_DATA(ArrowType, ParquetType)   \
-  TransferFunctor<ArrowType, ParquetType> func; \
-  RETURN_NOT_OK(func(record_reader_.get(), pool_, value_type, &result));
-
-#define TRANSFER_CASE(ENUM, ArrowType, ParquetType) \
-  case ::arrow::Type::ENUM: {                       \
-    TRANSFER_DATA(ArrowType, ParquetType);          \
-  } break;
-
-#define TRANSFER_INT32(ENUM, ArrowType)                                       \
-  case ::arrow::Type::ENUM: {                                                 \
-    Status s = TransferInt<ArrowType, Int32Type>(record_reader_.get(), pool_, \
-                                                 value_type, &result);        \
-    RETURN_NOT_OK(s);                                                         \
-  } break;
-
-#define TRANSFER_INT64(ENUM, ArrowType)                                       \
-  case ::arrow::Type::ENUM: {                                                 \
-    Status s = TransferInt<ArrowType, Int64Type>(record_reader_.get(), pool_, \
-                                                 value_type, &result);        \
-    RETURN_NOT_OK(s);                                                         \
-  } break;
-
-Status PrimitiveImpl::NextBatch(int64_t records_to_read,
-                                std::shared_ptr<ChunkedArray>* out) {
-  try {
-    // Pre-allocation gives much better performance for flat columns
-    record_reader_->Reserve(records_to_read);
+  int32_t values_offset = 0;
+  std::vector<int64_t> null_counts(list_depth, 0);
+  for (int64_t i = 0; i < total_levels_read; i++) {
+    int16_t rep_level = rep_levels[i];
+    if (rep_level < descr_->max_repetition_level()) {
+      for (int64_t j = rep_level; j < list_depth; j++) {
+        if (j == (list_depth - 1)) {
+          RETURN_NOT_OK(offset_builders[j]->Append(values_offset));
+        } else {
+          RETURN_NOT_OK(offset_builders[j]->Append(
+              static_cast<int32_t>(offset_builders[j + 1]->length())));
+        }
 
-    record_reader_->Reset();
-    while (records_to_read > 0) {
-      if (!record_reader_->HasMoreData()) {
-        break;
-      }
-      int64_t records_read = record_reader_->ReadRecords(records_to_read);
-      records_to_read -= records_read;
-      if (records_read == 0) {
-        NextRowGroup();
+        if (((empty_def_level[j] - 1) == def_levels[i]) && (nullable[j])) {
+          RETURN_NOT_OK(valid_bits_builders[j]->Append(false));
+          null_counts[j]++;
+          break;
+        } else {
+          RETURN_NOT_OK(valid_bits_builders[j]->Append(true));
+          if (empty_def_level[j] == def_levels[i]) {
+            break;
+          }
+        }
       }
     }
-  } catch (const ::parquet::ParquetException& e) {
-    return ::arrow::Status::IOError(e.what());
+    if (def_levels[i] >= values_def_level) {
+      values_offset++;
+    }
   }
-
-  std::shared_ptr<DataType> value_type = field_->type();
-
-  Datum result;
-  switch (value_type->id()) {
-    case ::arrow::Type::NA: {
-      result = std::make_shared<::arrow::NullArray>(record_reader_->values_written());
-      break;
+  // Add the final offset to all lists
+  for (int64_t j = 0; j < list_depth; j++) {
+    if (j == (list_depth - 1)) {
+      RETURN_NOT_OK(offset_builders[j]->Append(values_offset));
+    } else {
+      RETURN_NOT_OK(offset_builders[j]->Append(
+          static_cast<int32_t>(offset_builders[j + 1]->length())));
     }
-    case ::arrow::Type::INT32:
-    case ::arrow::Type::INT64:
-    case ::arrow::Type::FLOAT:
-    case ::arrow::Type::DOUBLE:
-      result = TransferZeroCopy(record_reader_.get(), value_type);
-      break;
-    case ::arrow::Type::BOOL:
-      RETURN_NOT_OK(TransferBool(record_reader_.get(), pool_, &result));
-      break;
-      TRANSFER_INT32(UINT8, ::arrow::UInt8Type);
-      TRANSFER_INT32(INT8, ::arrow::Int8Type);
-      TRANSFER_INT32(UINT16, ::arrow::UInt16Type);
-      TRANSFER_INT32(INT16, ::arrow::Int16Type);
-      TRANSFER_INT32(UINT32, ::arrow::UInt32Type);
-      TRANSFER_INT64(UINT64, ::arrow::UInt64Type);
-      TRANSFER_INT32(DATE32, ::arrow::Date32Type);
-      TRANSFER_INT32(TIME32, ::arrow::Time32Type);
-      TRANSFER_INT64(TIME64, ::arrow::Time64Type);
-    case ::arrow::Type::DATE64:
-      RETURN_NOT_OK(TransferDate64(record_reader_.get(), pool_, value_type, &result));
-      break;
-    case ::arrow::Type::FIXED_SIZE_BINARY:
-    case ::arrow::Type::BINARY:
-    case ::arrow::Type::STRING: {
-      std::shared_ptr<ChunkedArray> out;
-      RETURN_NOT_OK(
-          TransferBinary(record_reader_.get(), value_type, read_dictionary_, &out));
-      result = out;
-    } break;
-    case ::arrow::Type::DECIMAL: {
-      switch (descr_->physical_type()) {
-        case ::parquet::Type::INT32: {
-          RETURN_NOT_OK(DecimalIntegerTransfer<Int32Type>(record_reader_.get(), pool_,
-                                                          value_type, &result));
-        } break;
-        case ::parquet::Type::INT64: {
-          RETURN_NOT_OK(DecimalIntegerTransfer<Int64Type>(record_reader_.get(), pool_,
-                                                          value_type, &result));
-        } break;
-        case ::parquet::Type::BYTE_ARRAY: {
-          TRANSFER_DATA(::arrow::Decimal128Type, ByteArrayType);
-        } break;
-        case ::parquet::Type::FIXED_LEN_BYTE_ARRAY: {
-          TRANSFER_DATA(::arrow::Decimal128Type, FLBAType);
-        } break;
-        default:
-          return Status::Invalid(
-              "Physical type for decimal must be int32, int64, byte array, or fixed "
-              "length binary");
-      }
-    } break;
-    case ::arrow::Type::TIMESTAMP: {
-      const ::arrow::TimestampType& timestamp_type =
-          static_cast<::arrow::TimestampType&>(*value_type);
-      switch (timestamp_type.unit()) {
-        case ::arrow::TimeUnit::MILLI:
-        case ::arrow::TimeUnit::MICRO: {
-          result = TransferZeroCopy(record_reader_.get(), value_type);
-        } break;
-        case ::arrow::TimeUnit::NANO: {
-          if (descr_->physical_type() == ::parquet::Type::INT96) {
-            TRANSFER_DATA(::arrow::TimestampType, Int96Type);
-          } else {
-            result = TransferZeroCopy(record_reader_.get(), value_type);
-          }
-        } break;
-        default:
-          return Status::NotImplemented("TimeUnit not supported");
-      }
-    } break;
-    default:
-      return Status::NotImplemented("No support for reading columns of type ",
-                                    value_type->ToString());
   }
 
-  // Nest nested types
-  RETURN_NOT_OK(WrapIntoListArray(&result));
-
-  DCHECK_NE(result.kind(), Datum::NONE);
-
-  if (result.kind() == Datum::ARRAY) {
-    *out = std::make_shared<ChunkedArray>(result.make_array());
-  } else if (result.kind() == Datum::CHUNKED_ARRAY) {
-    *out = result.chunked_array();
-  } else {
-    DCHECK(false) << "Should be impossible";
+  std::vector<std::shared_ptr<Buffer>> offsets;
+  std::vector<std::shared_ptr<Buffer>> valid_bits;
+  std::vector<int64_t> list_lengths;
+  for (int64_t j = 0; j < list_depth; j++) {
+    list_lengths.push_back(offset_builders[j]->length() - 1);
+    std::shared_ptr<Array> array;
+    RETURN_NOT_OK(offset_builders[j]->Finish(&array));
+    offsets.emplace_back(std::static_pointer_cast<Int32Array>(array)->values());
+    RETURN_NOT_OK(valid_bits_builders[j]->Finish(&array));
+    valid_bits.emplace_back(std::static_pointer_cast<BooleanArray>(array)->values());
   }
-  return Status::OK();
-}
 
-void PrimitiveImpl::NextRowGroup() {
-  std::unique_ptr<PageReader> page_reader = input_->NextChunk();
-  record_reader_->SetPageReader(std::move(page_reader));
-}
-
-Status PrimitiveImpl::GetDefLevels(const int16_t** data, size_t* length) {
-  *data = record_reader_->def_levels();
-  *length = record_reader_->levels_written();
-  return Status::OK();
-}
-
-Status PrimitiveImpl::GetRepLevels(const int16_t** data, size_t* length) {
-  *data = record_reader_->rep_levels();
-  *length = record_reader_->levels_written();
+  std::shared_ptr<Array> output = flat_array;
+  for (int64_t j = list_depth - 1; j >= 0; j--) {
+    auto list_type =
+        ::arrow::list(::arrow::field("item", output->type(), nullable[j + 1]));
+    output = std::make_shared<::arrow::ListArray>(list_type, list_lengths[j], offsets[j],
+                                                  output, valid_bits[j], null_counts[j]);
+  }
+  *inout_array = std::make_shared<ChunkedArray>(output);
   return Status::OK();
 }
 
-ColumnReader::ColumnReader(std::unique_ptr<ColumnReaderImpl> impl)
-    : impl_(std::move(impl)) {}
+// Reader implementation for struct array
 
-ColumnReader::~ColumnReader() {}
+class PARQUET_NO_EXPORT StructImpl : public ColumnReaderImpl {
+ public:
+  explicit StructImpl(const std::vector<std::shared_ptr<ColumnReaderImpl>>& children,
+                      int16_t struct_def_level, MemoryPool* pool, const Node* node)
+      : children_(children), struct_def_level_(struct_def_level), pool_(pool) {
+    InitField(node, children);
+  }
 
-Status ColumnReader::NextBatch(int64_t records_to_read,
-                               std::shared_ptr<ChunkedArray>* out) {
-  return impl_->NextBatch(records_to_read, out);
-}
+  Status NextBatch(int64_t records_to_read, std::shared_ptr<ChunkedArray>* out) override;
+  Status GetDefLevels(const int16_t** data, size_t* length) override;
+  Status GetRepLevels(const int16_t** data, size_t* length) override;
+  const std::shared_ptr<Field> field() override { return field_; }
 
-Status ColumnReader::NextBatch(int64_t records_to_read, std::shared_ptr<Array>* out) {
-  std::shared_ptr<ChunkedArray> chunked_out;
-  RETURN_NOT_OK(impl_->NextBatch(records_to_read, &chunked_out));
-  return GetSingleChunk(*chunked_out, out);
-}
+ private:
+  std::vector<std::shared_ptr<ColumnReaderImpl>> children_;
+  int16_t struct_def_level_;
+  MemoryPool* pool_;
+  std::shared_ptr<Field> field_;
+  std::shared_ptr<ResizableBuffer> def_levels_buffer_;
 
-// StructImpl methods
+  Status DefLevelsToNullArray(std::shared_ptr<Buffer>* null_bitmap, int64_t* null_count);
+  void InitField(const Node* node,
+                 const std::vector<std::shared_ptr<ColumnReaderImpl>>& children);
+};
 
 Status StructImpl::DefLevelsToNullArray(std::shared_ptr<Buffer>* null_bitmap_out,
                                         int64_t* null_count_out) {
@@ -1790,40 +784,195 @@ Status StructImpl::NextBatch(int64_t records_to_read,
   return Status::OK();
 }
 
-std::shared_ptr<ColumnChunkReader> RowGroupReader::Column(int column_index) {
-  return std::shared_ptr<ColumnChunkReader>(
-      new ColumnChunkReader(impl_, row_group_index_, column_index));
+// ----------------------------------------------------------------------
+// File reader implementation
+
+Status FileReaderImpl::GetReaderForNode(int index, const Node* node,
+                                        const std::vector<int>& indices,
+                                        int16_t def_level,
+                                        FileColumnIteratorFactory iterator_factory,
+                                        std::unique_ptr<ColumnReaderImpl>* out) {
+  *out = nullptr;
+
+  if (schema::IsSimpleStruct(node)) {
+    const schema::GroupNode* group = static_cast<const schema::GroupNode*>(node);
+    std::vector<std::shared_ptr<ColumnReaderImpl>> children;
+    for (int i = 0; i < group->field_count(); i++) {
+      std::unique_ptr<ColumnReaderImpl> child_reader;
+      // TODO(itaiin): Remove the -1 index hack when all types of nested reads
+      // are supported. This currently just signals the lower level reader resolution
+      // to abort
+      RETURN_NOT_OK(GetReaderForNode(index, group->field(i).get(), indices,
+                                     static_cast<int16_t>(def_level + 1),
+                                     iterator_factory, &child_reader));
+      if (child_reader != nullptr) {
+        children.push_back(std::move(child_reader));
+      }
+    }
+
+    if (children.size() > 0) {
+      *out = std::unique_ptr<ColumnReaderImpl>(
+          new StructImpl(children, def_level, pool_, node));
+    }
+  } else {
+    // This should be a flat field case - translate the field index to
+    // the correct column index by walking down to the leaf node
+    const Node* walker = node;
+    while (!walker->is_primitive()) {
+      DCHECK(walker->is_group());
+      auto group = static_cast<const schema::GroupNode*>(walker);
+      if (group->field_count() != 1) {
+        return Status::NotImplemented("lists with structs are not supported.");
+      }
+      walker = group->field(0).get();
+    }
+    auto column_index = reader_->metadata()->schema()->ColumnIndex(*walker);
+
+    // If the index of the column is found then a reader for the column is needed.
+    // Otherwise *out keeps the nullptr value.
+    if (std::find(indices.begin(), indices.end(), column_index) != indices.end()) {
+      std::unique_ptr<ColumnReader> reader;
+      RETURN_NOT_OK(GetColumn(column_index, iterator_factory, &reader));
+      *out = std::unique_ptr<ColumnReaderImpl>(
+          static_cast<ColumnReaderImpl*>(reader.release()));
+    }
+  }
+
+  return Status::OK();
+}
+
+Status FileReaderImpl::ReadRowGroups(const std::vector<int>& row_groups,
+                                     const std::vector<int>& indices,
+                                     std::shared_ptr<Table>* out) {
+  BEGIN_PARQUET_CATCH_EXCEPTIONS
+
+  std::shared_ptr<::arrow::Schema> schema;
+  RETURN_NOT_OK(GetSchema(indices, &schema));
+
+  // We only need to read schema fields which have columns indicated
+  // in the indices vector
+  std::vector<int> field_indices;
+  if (!schema::ColumnIndicesToFieldIndices(*reader_->metadata()->schema(), indices,
+                                           &field_indices)) {
+    return Status::Invalid("Invalid column index");
+  }
+  int num_fields = static_cast<int>(field_indices.size());
+  std::vector<std::shared_ptr<ChunkedArray>> columns(num_fields);
+
+  auto ReadColumnFunc = [&](int i) {
+    return ReadSchemaField(field_indices[i], indices, row_groups, &columns[i]);
+  };
+
+  if (reader_properties_.use_threads()) {
+    std::vector<std::future<Status>> futures;
+    auto pool = ::arrow::internal::GetCpuThreadPool();
+    for (int i = 0; i < num_fields; i++) {
+      futures.push_back(pool->Submit(ReadColumnFunc, i));
+    }
+    Status final_status = Status::OK();
+    for (auto& fut : futures) {
+      Status st = fut.get();
+      if (!st.ok()) {
+        final_status = std::move(st);
+      }
+    }
+    RETURN_NOT_OK(final_status);
+  } else {
+    for (int i = 0; i < num_fields; i++) {
+      RETURN_NOT_OK(ReadColumnFunc(i));
+    }
+  }
+
+  auto dict_indices = GetDictionaryIndices(indices);
+  if (!dict_indices.empty()) {
+    schema = FixSchema(*schema, dict_indices, columns);
+  }
+  std::shared_ptr<Table> table = Table::Make(schema, columns);
+  RETURN_NOT_OK(table->Validate());
+  *out = table;
+  return Status::OK();
+  END_PARQUET_CATCH_EXCEPTIONS
+}
+
+Status FileReaderImpl::GetColumn(int i, FileColumnIteratorFactory iterator_factory,
+                                 std::unique_ptr<ColumnReader>* out) {
+  if (i < 0 || i >= this->num_columns()) {
+    return Status::Invalid("Column index out of bounds (got ", i,
+                           ", should be "
+                           "between 0 and ",
+                           this->num_columns() - 1, ")");
+  }
+
+  std::unique_ptr<FileColumnIterator> input(iterator_factory(i, reader_.get()));
+  *out = std::unique_ptr<ColumnReader>(
+      new PrimitiveImpl(pool_, std::move(input), reader_properties_.read_dictionary(i)));
+  return Status::OK();
 }
 
-Status RowGroupReader::ReadTable(const std::vector<int>& column_indices,
-                                 std::shared_ptr<::arrow::Table>* out) {
-  return impl_->ReadRowGroup(row_group_index_, column_indices, out);
+Status FileReaderImpl::ReadSchemaField(int i, const std::vector<int>& indices,
+                                       const std::vector<int>& row_groups,
+                                       std::shared_ptr<ChunkedArray>* out) {
+  BEGIN_PARQUET_CATCH_EXCEPTIONS
+  auto parquet_schema = reader_->metadata()->schema();
+  auto node = parquet_schema->group_node()->field(i).get();
+  std::unique_ptr<ColumnReaderImpl> reader_impl;
+  RETURN_NOT_OK(GetReaderForNode(i, node, indices, 1, SomeRowGroupsFactory(row_groups),
+                                 &reader_impl));
+  if (reader_impl == nullptr) {
+    *out = nullptr;
+    return Status::OK();
+  }
+  // TODO(wesm): This calculation doesn't make much sense when we have repeated
+  // schema nodes
+  int64_t records_to_read = GetTotalRecords(row_groups, i);
+  return reader_impl->NextBatch(records_to_read, out);
+  END_PARQUET_CATCH_EXCEPTIONS
 }
 
-Status RowGroupReader::ReadTable(std::shared_ptr<::arrow::Table>* out) {
-  return impl_->ReadRowGroup(row_group_index_, out);
+std::shared_ptr<RowGroupReader> FileReaderImpl::RowGroup(int row_group_index) {
+  return std::make_shared<RowGroupReaderImpl>(this, row_group_index);
 }
 
-RowGroupReader::~RowGroupReader() {}
+// ----------------------------------------------------------------------
+// Public factory functions
 
-RowGroupReader::RowGroupReader(FileReader::Impl* impl, int row_group_index)
-    : impl_(impl), row_group_index_(row_group_index) {}
+Status FileReader::Make(::arrow::MemoryPool* pool,
+                        std::unique_ptr<ParquetFileReader> reader,
+                        const ArrowReaderProperties& properties,
+                        std::unique_ptr<FileReader>* out) {
+  out->reset(new FileReaderImpl(pool, std::move(reader), properties));
+  return static_cast<FileReaderImpl*>(out->get())->Init();
+}
 
-Status ColumnChunkReader::Read(std::shared_ptr<::arrow::ChunkedArray>* out) {
-  return impl_->ReadColumnChunk(column_index_, row_group_index_, out);
+Status FileReader::Make(::arrow::MemoryPool* pool,
+                        std::unique_ptr<ParquetFileReader> reader,
+                        std::unique_ptr<FileReader>* out) {
+  return Make(pool, std::move(reader), default_arrow_reader_properties(), out);
 }
 
-Status ColumnChunkReader::Read(std::shared_ptr<::arrow::Array>* out) {
-  std::shared_ptr<ChunkedArray> chunked_out;
-  RETURN_NOT_OK(Read(&chunked_out));
-  return GetSingleChunk(*chunked_out, out);
+Status OpenFile(const std::shared_ptr<::arrow::io::RandomAccessFile>& file,
+                MemoryPool* pool, const ReaderProperties& props,
+                const std::shared_ptr<FileMetaData>& metadata,
+                std::unique_ptr<FileReader>* reader) {
+  std::unique_ptr<ParquetReader> pq_reader;
+  PARQUET_CATCH_NOT_OK(pq_reader = ParquetReader::Open(file, props, metadata));
+  return FileReader::Make(pool, std::move(pq_reader), default_arrow_reader_properties(),
+                          reader);
 }
 
-ColumnChunkReader::~ColumnChunkReader() {}
+Status OpenFile(const std::shared_ptr<::arrow::io::RandomAccessFile>& file,
+                MemoryPool* pool, std::unique_ptr<FileReader>* reader) {
+  return OpenFile(file, pool, ::parquet::default_reader_properties(), nullptr, reader);
+}
 
-ColumnChunkReader::ColumnChunkReader(FileReader::Impl* impl, int row_group_index,
-                                     int column_index)
-    : impl_(impl), column_index_(column_index), row_group_index_(row_group_index) {}
+Status OpenFile(const std::shared_ptr<::arrow::io::RandomAccessFile>& file,
+                ::arrow::MemoryPool* pool, const ArrowReaderProperties& properties,
+                std::unique_ptr<FileReader>* reader) {
+  std::unique_ptr<ParquetReader> pq_reader;
+  PARQUET_CATCH_NOT_OK(pq_reader = ParquetReader::Open(
+                           file, ::parquet::default_reader_properties(), nullptr));
+  return FileReader::Make(pool, std::move(pq_reader), properties, reader);
+}
 
 }  // namespace arrow
 }  // namespace parquet
diff --git a/cpp/src/parquet/arrow/reader.h b/cpp/src/parquet/arrow/reader.h
index 3491314..5b2f2c7 100644
--- a/cpp/src/parquet/arrow/reader.h
+++ b/cpp/src/parquet/arrow/reader.h
@@ -25,17 +25,11 @@
 
 #include "parquet/platform.h"
 
-#include "arrow/io/interfaces.h"
-#include "arrow/util/macros.h"
-
 namespace arrow {
 
-class Array;
 class ChunkedArray;
-class MemoryPool;
 class RecordBatchReader;
 class Schema;
-class Status;
 class Table;
 
 }  // namespace arrow
@@ -172,22 +166,19 @@ class PARQUET_EXPORT FileReader {
   // fully-materialized arrow::Array instances
   //
   // Returns error status if the column of interest is not flat.
-  ::arrow::Status GetColumn(int i, std::unique_ptr<ColumnReader>* out);
+  virtual ::arrow::Status GetColumn(int i, std::unique_ptr<ColumnReader>* out) = 0;
 
   /// \brief Return arrow schema for all the columns.
-  ::arrow::Status GetSchema(std::shared_ptr<::arrow::Schema>* out);
+  virtual ::arrow::Status GetSchema(std::shared_ptr<::arrow::Schema>* out) = 0;
 
   /// \brief Return arrow schema by apply selection of column indices.
   /// \returns error status if passed wrong indices.
-  ::arrow::Status GetSchema(const std::vector<int>& indices,
-                            std::shared_ptr<::arrow::Schema>* out);
+  virtual ::arrow::Status GetSchema(const std::vector<int>& indices,
+                                    std::shared_ptr<::arrow::Schema>* out) = 0;
 
   // Read column as a whole into an Array.
-  ::arrow::Status ReadColumn(int i, std::shared_ptr<::arrow::ChunkedArray>* out);
-
-  /// \note Deprecated since 0.12
-  ARROW_DEPRECATED("Use version with ChunkedArray output")
-  ::arrow::Status ReadColumn(int i, std::shared_ptr<::arrow::Array>* out);
+  virtual ::arrow::Status ReadColumn(int i,
+                                     std::shared_ptr<::arrow::ChunkedArray>* out) = 0;
 
   // NOTE: Experimental API
   // Reads a specific top level schema field into an Array
@@ -201,117 +192,78 @@ class PARQUET_EXPORT FileReader {
   // 2 foo3
   //
   // i=0 will read the entire foo struct, i=1 the foo2 primitive column etc
-  ::arrow::Status ReadSchemaField(int i, std::shared_ptr<::arrow::ChunkedArray>* out);
-
-  /// \note Deprecated since 0.12
-  ARROW_DEPRECATED("Use version with ChunkedArray output")
-  ::arrow::Status ReadSchemaField(int i, std::shared_ptr<::arrow::Array>* out);
+  virtual ::arrow::Status ReadSchemaField(
+      int i, std::shared_ptr<::arrow::ChunkedArray>* out) = 0;
 
   /// \brief Return a RecordBatchReader of row groups selected from row_group_indices, the
   ///    ordering in row_group_indices matters.
   /// \returns error Status if row_group_indices contains invalid index
-  ::arrow::Status GetRecordBatchReader(const std::vector<int>& row_group_indices,
-                                       std::shared_ptr<::arrow::RecordBatchReader>* out);
-
-  /// \brief Return a RecordBatchReader of row groups selected from row_group_indices,
-  ///     whose columns are selected by column_indices. The ordering in row_group_indices
-  ///     and column_indices matter.
-  /// \returns error Status if either row_group_indices or column_indices contains invalid
-  ///    index
-  ::arrow::Status GetRecordBatchReader(const std::vector<int>& row_group_indices,
-                                       const std::vector<int>& column_indices,
-                                       std::shared_ptr<::arrow::RecordBatchReader>* out);
+  virtual ::arrow::Status GetRecordBatchReader(
+      const std::vector<int>& row_group_indices,
+      std::shared_ptr<::arrow::RecordBatchReader>* out) = 0;
+
+  /// \brief Return a RecordBatchReader of row groups selected from
+  ///     row_group_indices, whose columns are selected by column_indices. The
+  ///     ordering in row_group_indices and column_indices matter.
+  /// \returns error Status if either row_group_indices or column_indices
+  ///    contains invalid index
+  virtual ::arrow::Status GetRecordBatchReader(
+      const std::vector<int>& row_group_indices, const std::vector<int>& column_indices,
+      std::shared_ptr<::arrow::RecordBatchReader>* out) = 0;
 
   // Read a table of columns into a Table
-  ::arrow::Status ReadTable(std::shared_ptr<::arrow::Table>* out);
+  virtual ::arrow::Status ReadTable(std::shared_ptr<::arrow::Table>* out) = 0;
 
   // Read a table of columns into a Table. Read only the indicated column
   // indices (relative to the schema)
-  ::arrow::Status ReadTable(const std::vector<int>& column_indices,
-                            std::shared_ptr<::arrow::Table>* out);
+  virtual ::arrow::Status ReadTable(const std::vector<int>& column_indices,
+                                    std::shared_ptr<::arrow::Table>* out) = 0;
 
-  ::arrow::Status ReadRowGroup(int i, const std::vector<int>& column_indices,
-                               std::shared_ptr<::arrow::Table>* out);
+  virtual ::arrow::Status ReadRowGroup(int i, const std::vector<int>& column_indices,
+                                       std::shared_ptr<::arrow::Table>* out) = 0;
 
-  ::arrow::Status ReadRowGroup(int i, std::shared_ptr<::arrow::Table>* out);
+  virtual ::arrow::Status ReadRowGroup(int i, std::shared_ptr<::arrow::Table>* out) = 0;
 
-  ::arrow::Status ReadRowGroups(const std::vector<int>& row_groups,
-                                const std::vector<int>& column_indices,
-                                std::shared_ptr<::arrow::Table>* out);
+  virtual ::arrow::Status ReadRowGroups(const std::vector<int>& row_groups,
+                                        const std::vector<int>& column_indices,
+                                        std::shared_ptr<::arrow::Table>* out) = 0;
 
-  ::arrow::Status ReadRowGroups(const std::vector<int>& row_groups,
-                                std::shared_ptr<::arrow::Table>* out);
+  virtual ::arrow::Status ReadRowGroups(const std::vector<int>& row_groups,
+                                        std::shared_ptr<::arrow::Table>* out) = 0;
 
   /// \brief Scan file contents with one thread, return number of rows
-  ::arrow::Status ScanContents(std::vector<int> columns, const int32_t column_batch_size,
-                               int64_t* num_rows);
+  virtual ::arrow::Status ScanContents(std::vector<int> columns,
+                                       const int32_t column_batch_size,
+                                       int64_t* num_rows) = 0;
 
   /// \brief Return a reader for the RowGroup, this object must not outlive the
   ///   FileReader.
-  std::shared_ptr<RowGroupReader> RowGroup(int row_group_index);
+  virtual std::shared_ptr<RowGroupReader> RowGroup(int row_group_index) = 0;
 
-  int num_row_groups() const;
+  virtual int num_row_groups() const = 0;
 
-  const ParquetFileReader* parquet_reader() const;
-
-  /// Set the number of threads to use during reads of multiple columns. By
-  /// default only 1 thread is used
-  /// \deprecated Use set_use_threads instead.
-  ARROW_DEPRECATED("Use set_use_threads instead")
-  void set_num_threads(int num_threads);
+  virtual ParquetFileReader* parquet_reader() const = 0;
 
   /// Set whether to use multiple threads during reads of multiple columns.
   /// By default only one thread is used.
-  void set_use_threads(bool use_threads);
-
-  virtual ~FileReader();
+  virtual void set_use_threads(bool use_threads) = 0;
 
- private:
-  FileReader(::arrow::MemoryPool* pool, std::unique_ptr<ParquetFileReader> reader,
-             const ArrowReaderProperties& properties);
-
-  friend ColumnChunkReader;
-  friend RowGroupReader;
-
-  class PARQUET_NO_EXPORT Impl;
-  std::unique_ptr<Impl> impl_;
+  virtual ~FileReader() = default;
 };
 
-class PARQUET_EXPORT RowGroupReader {
+class RowGroupReader {
  public:
-  std::shared_ptr<ColumnChunkReader> Column(int column_index);
-
-  ::arrow::Status ReadTable(const std::vector<int>& column_indices,
-                            std::shared_ptr<::arrow::Table>* out);
-  ::arrow::Status ReadTable(std::shared_ptr<::arrow::Table>* out);
-
-  virtual ~RowGroupReader();
-
- private:
-  friend FileReader;
-  RowGroupReader(FileReader::Impl* reader, int row_group_index);
-
-  FileReader::Impl* impl_;
-  int row_group_index_;
+  virtual ~RowGroupReader() = default;
+  virtual std::shared_ptr<ColumnChunkReader> Column(int column_index) = 0;
+  virtual ::arrow::Status ReadTable(const std::vector<int>& column_indices,
+                                    std::shared_ptr<::arrow::Table>* out) = 0;
+  virtual ::arrow::Status ReadTable(std::shared_ptr<::arrow::Table>* out) = 0;
 };
 
-class PARQUET_EXPORT ColumnChunkReader {
+class ColumnChunkReader {
  public:
-  ::arrow::Status Read(std::shared_ptr<::arrow::ChunkedArray>* out);
-
-  /// \note Deprecated since 0.12
-  ARROW_DEPRECATED("Use version with ChunkedArray output")
-  ::arrow::Status Read(std::shared_ptr<::arrow::Array>* out);
-
-  virtual ~ColumnChunkReader();
-
- private:
-  friend RowGroupReader;
-  ColumnChunkReader(FileReader::Impl* impl, int row_group_index, int column_index);
-
-  FileReader::Impl* impl_;
-  int column_index_;
-  int row_group_index_;
+  virtual ~ColumnChunkReader() = default;
+  virtual ::arrow::Status Read(std::shared_ptr<::arrow::ChunkedArray>* out) = 0;
 };
 
 // At this point, the column reader is a stream iterator. It only knows how to
@@ -322,8 +274,7 @@ class PARQUET_EXPORT ColumnChunkReader {
 // might change in the future.
 class PARQUET_EXPORT ColumnReader {
  public:
-  class PARQUET_NO_EXPORT ColumnReaderImpl;
-  virtual ~ColumnReader();
+  virtual ~ColumnReader() = default;
 
   // Scan the next array of the indicated size. The actual size of the
   // returned array may be less than the passed size depending how much data is
@@ -334,20 +285,8 @@ class PARQUET_EXPORT ColumnReader {
   //
   // Returns Status::OK on a successful read, including if you have exhausted
   // the data available in the file.
-  ::arrow::Status NextBatch(int64_t batch_size,
-                            std::shared_ptr<::arrow::ChunkedArray>* out);
-
-  /// \note Deprecated since 0.12
-  ARROW_DEPRECATED("Use version with ChunkedArray output")
-  ::arrow::Status NextBatch(int64_t batch_size, std::shared_ptr<::arrow::Array>* out);
-
- private:
-  std::unique_ptr<ColumnReaderImpl> impl_;
-  explicit ColumnReader(std::unique_ptr<ColumnReaderImpl> impl);
-
-  friend class FileReader;
-  friend class PrimitiveImpl;
-  friend class StructImpl;
+  virtual ::arrow::Status NextBatch(int64_t batch_size,
+                                    std::shared_ptr<::arrow::ChunkedArray>* out) = 0;
 };
 
 // Helper function to create a file reader from an implementation of an Arrow
diff --git a/cpp/src/parquet/arrow/reader_internal.cc b/cpp/src/parquet/arrow/reader_internal.cc
new file mode 100644
index 0000000..de00173
--- /dev/null
+++ b/cpp/src/parquet/arrow/reader_internal.cc
@@ -0,0 +1,616 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "parquet/arrow/reader_internal.h"
+
+#include <algorithm>
+#include <climits>
+#include <cstdint>
+#include <cstring>
+#include <memory>
+#include <type_traits>
+#include <vector>
+
+#include "arrow/array.h"
+#include "arrow/compute/kernel.h"
+#include "arrow/table.h"
+#include "arrow/type.h"
+#include "arrow/type_traits.h"
+#include "arrow/util/int-util.h"
+#include "arrow/util/logging.h"
+#include "arrow/util/ubsan.h"
+
+#include "parquet/column_reader.h"
+#include "parquet/platform.h"
+#include "parquet/schema.h"
+#include "parquet/types.h"
+
+using arrow::Array;
+using arrow::BooleanArray;
+using arrow::ChunkedArray;
+using arrow::DataType;
+using arrow::Field;
+using arrow::Int32Array;
+using arrow::ListArray;
+using arrow::MemoryPool;
+using arrow::ResizableBuffer;
+using arrow::Status;
+using arrow::StructArray;
+using arrow::Table;
+using arrow::TimestampArray;
+using arrow::compute::Datum;
+
+using ::arrow::BitUtil::FromBigEndian;
+using ::arrow::internal::SafeLeftShift;
+using ::arrow::util::SafeLoadAs;
+
+using parquet::internal::RecordReader;
+
+namespace parquet {
+namespace arrow {
+
+template <typename ArrowType>
+using ArrayType = typename ::arrow::TypeTraits<ArrowType>::ArrayType;
+
+// ----------------------------------------------------------------------
+// Primitive types
+
+template <typename ArrowType, typename ParquetType>
+Status TransferInt(RecordReader* reader, MemoryPool* pool,
+                   const std::shared_ptr<DataType>& type, Datum* out) {
+  using ArrowCType = typename ArrowType::c_type;
+  using ParquetCType = typename ParquetType::c_type;
+  int64_t length = reader->values_written();
+  std::shared_ptr<Buffer> data;
+  RETURN_NOT_OK(::arrow::AllocateBuffer(pool, length * sizeof(ArrowCType), &data));
+
+  auto values = reinterpret_cast<const ParquetCType*>(reader->values());
+  auto out_ptr = reinterpret_cast<ArrowCType*>(data->mutable_data());
+  std::copy(values, values + length, out_ptr);
+  *out = std::make_shared<ArrayType<ArrowType>>(
+      type, length, data, reader->ReleaseIsValid(), reader->null_count());
+  return Status::OK();
+}
+
+std::shared_ptr<Array> TransferZeroCopy(RecordReader* reader,
+                                        const std::shared_ptr<DataType>& type) {
+  std::vector<std::shared_ptr<Buffer>> buffers = {reader->ReleaseIsValid(),
+                                                  reader->ReleaseValues()};
+  auto data = std::make_shared<::arrow::ArrayData>(type, reader->values_written(),
+                                                   buffers, reader->null_count());
+  return ::arrow::MakeArray(data);
+}
+
+Status TransferBool(RecordReader* reader, MemoryPool* pool, Datum* out) {
+  int64_t length = reader->values_written();
+  std::shared_ptr<Buffer> data;
+
+  const int64_t buffer_size = BitUtil::BytesForBits(length);
+  RETURN_NOT_OK(::arrow::AllocateBuffer(pool, buffer_size, &data));
+
+  // Transfer boolean values to packed bitmap
+  auto values = reinterpret_cast<const bool*>(reader->values());
+  uint8_t* data_ptr = data->mutable_data();
+  memset(data_ptr, 0, buffer_size);
+
+  for (int64_t i = 0; i < length; i++) {
+    if (values[i]) {
+      ::arrow::BitUtil::SetBit(data_ptr, i);
+    }
+  }
+
+  *out = std::make_shared<BooleanArray>(length, data, reader->ReleaseIsValid(),
+                                        reader->null_count());
+  return Status::OK();
+}
+
+Status TransferInt96(RecordReader* reader, MemoryPool* pool,
+                     const std::shared_ptr<DataType>& type, Datum* out) {
+  int64_t length = reader->values_written();
+  auto values = reinterpret_cast<const Int96*>(reader->values());
+  std::shared_ptr<Buffer> data;
+  RETURN_NOT_OK(::arrow::AllocateBuffer(pool, length * sizeof(int64_t), &data));
+  auto data_ptr = reinterpret_cast<int64_t*>(data->mutable_data());
+  for (int64_t i = 0; i < length; i++) {
+    *data_ptr++ = Int96GetNanoSeconds(values[i]);
+  }
+  *out = std::make_shared<TimestampArray>(type, length, data, reader->ReleaseIsValid(),
+                                          reader->null_count());
+  return Status::OK();
+}
+
+Status TransferDate64(RecordReader* reader, MemoryPool* pool,
+                      const std::shared_ptr<DataType>& type, Datum* out) {
+  int64_t length = reader->values_written();
+  auto values = reinterpret_cast<const int32_t*>(reader->values());
+
+  std::shared_ptr<Buffer> data;
+  RETURN_NOT_OK(::arrow::AllocateBuffer(pool, length * sizeof(int64_t), &data));
+  auto out_ptr = reinterpret_cast<int64_t*>(data->mutable_data());
+
+  for (int64_t i = 0; i < length; i++) {
+    *out_ptr++ = static_cast<int64_t>(values[i]) * kMillisecondsPerDay;
+  }
+
+  *out = std::make_shared<::arrow::Date64Array>(
+      type, length, data, reader->ReleaseIsValid(), reader->null_count());
+  return Status::OK();
+}
+
+// ----------------------------------------------------------------------
+// Binary, direct to dictionary-encoded
+
+// Some ugly hacks here for now to handle binary dictionaries casted to their
+// logical type
+std::shared_ptr<Array> ShallowCast(const Array& arr,
+                                   const std::shared_ptr<DataType>& new_type) {
+  std::shared_ptr<::arrow::ArrayData> new_data = arr.data()->Copy();
+  new_data->type = new_type;
+  if (new_type->id() == ::arrow::Type::DICTIONARY) {
+    // Cast dictionary, too
+    const auto& dict_type = static_cast<const ::arrow::DictionaryType&>(*new_type);
+    new_data->dictionary = ShallowCast(*new_data->dictionary, dict_type.value_type());
+  }
+  return MakeArray(new_data);
+}
+
+std::shared_ptr<ChunkedArray> CastChunksTo(
+    const ChunkedArray& data, const std::shared_ptr<DataType>& logical_value_type) {
+  std::vector<std::shared_ptr<Array>> string_chunks;
+  for (int i = 0; i < data.num_chunks(); ++i) {
+    string_chunks.push_back(ShallowCast(*data.chunk(i), logical_value_type));
+  }
+  return std::make_shared<ChunkedArray>(string_chunks);
+}
+
+Status TransferDictionary(RecordReader* reader,
+                          const std::shared_ptr<DataType>& logical_value_type,
+                          std::shared_ptr<ChunkedArray>* out) {
+  auto dict_reader = dynamic_cast<internal::DictionaryRecordReader*>(reader);
+  DCHECK(dict_reader);
+  *out = dict_reader->GetResult();
+
+  const auto& dict_type = static_cast<const ::arrow::DictionaryType&>(*(*out)->type());
+  if (!logical_value_type->Equals(*dict_type.value_type())) {
+    *out = CastChunksTo(**out,
+                        ::arrow::dictionary(dict_type.index_type(), logical_value_type));
+  }
+  return Status::OK();
+}
+
+Status TransferBinary(RecordReader* reader,
+                      const std::shared_ptr<DataType>& logical_value_type,
+                      std::shared_ptr<ChunkedArray>* out) {
+  if (reader->read_dictionary()) {
+    return TransferDictionary(reader, logical_value_type, out);
+  }
+  auto binary_reader = dynamic_cast<internal::BinaryRecordReader*>(reader);
+  DCHECK(binary_reader);
+  *out = std::make_shared<ChunkedArray>(binary_reader->GetBuilderChunks());
+  if (!logical_value_type->Equals(*(*out)->type())) {
+    *out = CastChunksTo(**out, logical_value_type);
+  }
+  return Status::OK();
+}
+
+// ----------------------------------------------------------------------
+// INT32 / INT64 / BYTE_ARRAY / FIXED_LEN_BYTE_ARRAY -> Decimal128
+
+static uint64_t BytesToInteger(const uint8_t* bytes, int32_t start, int32_t stop) {
+  const int32_t length = stop - start;
+
+  DCHECK_GE(length, 0);
+  DCHECK_LE(length, 8);
+
+  switch (length) {
+    case 0:
+      return 0;
+    case 1:
+      return bytes[start];
+    case 2:
+      return FromBigEndian(SafeLoadAs<uint16_t>(bytes + start));
+    case 3: {
+      const uint64_t first_two_bytes = FromBigEndian(SafeLoadAs<uint16_t>(bytes + start));
+      const uint64_t last_byte = bytes[stop - 1];
+      return first_two_bytes << 8 | last_byte;
+    }
+    case 4:
+      return FromBigEndian(SafeLoadAs<uint32_t>(bytes + start));
+    case 5: {
+      const uint64_t first_four_bytes =
+          FromBigEndian(SafeLoadAs<uint32_t>(bytes + start));
+      const uint64_t last_byte = bytes[stop - 1];
+      return first_four_bytes << 8 | last_byte;
+    }
+    case 6: {
+      const uint64_t first_four_bytes =
+          FromBigEndian(SafeLoadAs<uint32_t>(bytes + start));
+      const uint64_t last_two_bytes =
+          FromBigEndian(SafeLoadAs<uint16_t>(bytes + start + 4));
+      return first_four_bytes << 16 | last_two_bytes;
+    }
+    case 7: {
+      const uint64_t first_four_bytes =
+          FromBigEndian(SafeLoadAs<uint32_t>(bytes + start));
+      const uint64_t second_two_bytes =
+          FromBigEndian(SafeLoadAs<uint16_t>(bytes + start + 4));
+      const uint64_t last_byte = bytes[stop - 1];
+      return first_four_bytes << 24 | second_two_bytes << 8 | last_byte;
+    }
+    case 8:
+      return FromBigEndian(SafeLoadAs<uint64_t>(bytes + start));
+    default: {
+      DCHECK(false);
+      return UINT64_MAX;
+    }
+  }
+}
+
+static constexpr int32_t kMinDecimalBytes = 1;
+static constexpr int32_t kMaxDecimalBytes = 16;
+
+/// \brief Convert a sequence of big-endian bytes to one int64_t (high bits) and one
+/// uint64_t (low bits).
+static void BytesToIntegerPair(const uint8_t* bytes, const int32_t length,
+                               int64_t* out_high, uint64_t* out_low) {
+  DCHECK_GE(length, kMinDecimalBytes);
+  DCHECK_LE(length, kMaxDecimalBytes);
+
+  // XXX This code is copied from Decimal::FromBigEndian
+
+  int64_t high, low;
+
+  // Bytes are coming in big-endian, so the first byte is the MSB and therefore holds the
+  // sign bit.
+  const bool is_negative = static_cast<int8_t>(bytes[0]) < 0;
+
+  // 1. Extract the high bytes
+  // Stop byte of the high bytes
+  const int32_t high_bits_offset = std::max(0, length - 8);
+  const auto high_bits = BytesToInteger(bytes, 0, high_bits_offset);
+
+  if (high_bits_offset == 8) {
+    // Avoid undefined shift by 64 below
+    high = high_bits;
+  } else {
+    high = -1 * (is_negative && length < kMaxDecimalBytes);
+    // Shift left enough bits to make room for the incoming int64_t
+    high = SafeLeftShift(high, high_bits_offset * CHAR_BIT);
+    // Preserve the upper bits by inplace OR-ing the int64_t
+    high |= high_bits;
+  }
+
+  // 2. Extract the low bytes
+  // Stop byte of the low bytes
+  const int32_t low_bits_offset = std::min(length, 8);
+  const auto low_bits = BytesToInteger(bytes, high_bits_offset, length);
+
+  if (low_bits_offset == 8) {
+    // Avoid undefined shift by 64 below
+    low = low_bits;
+  } else {
+    // Sign extend the low bits if necessary
+    low = -1 * (is_negative && length < 8);
+    // Shift left enough bits to make room for the incoming int64_t
+    low = SafeLeftShift(low, low_bits_offset * CHAR_BIT);
+    // Preserve the upper bits by inplace OR-ing the int64_t
+    low |= low_bits;
+  }
+
+  *out_high = high;
+  *out_low = static_cast<uint64_t>(low);
+}
+
+static inline void RawBytesToDecimalBytes(const uint8_t* value, int32_t byte_width,
+                                          uint8_t* out_buf) {
+  // view the first 8 bytes as an unsigned 64-bit integer
+  auto low = reinterpret_cast<uint64_t*>(out_buf);
+
+  // view the second 8 bytes as a signed 64-bit integer
+  auto high = reinterpret_cast<int64_t*>(out_buf + sizeof(uint64_t));
+
+  // Convert the fixed size binary array bytes into a Decimal128 compatible layout
+  BytesToIntegerPair(value, byte_width, high, low);
+}
+
+template <typename T>
+Status ConvertToDecimal128(const Array& array, const std::shared_ptr<DataType>&,
+                           MemoryPool* pool, std::shared_ptr<Array>*) {
+  return Status::NotImplemented("not implemented");
+}
+
+template <>
+Status ConvertToDecimal128<FLBAType>(const Array& array,
+                                     const std::shared_ptr<DataType>& type,
+                                     MemoryPool* pool, std::shared_ptr<Array>* out) {
+  const auto& fixed_size_binary_array =
+      static_cast<const ::arrow::FixedSizeBinaryArray&>(array);
+
+  // The byte width of each decimal value
+  const int32_t type_length =
+      static_cast<const ::arrow::Decimal128Type&>(*type).byte_width();
+
+  // number of elements in the entire array
+  const int64_t length = fixed_size_binary_array.length();
+
+  // Get the byte width of the values in the FixedSizeBinaryArray. Most of the time
+  // this will be different from the decimal array width because we write the minimum
+  // number of bytes necessary to represent a given precision
+  const int32_t byte_width =
+      static_cast<const ::arrow::FixedSizeBinaryType&>(*fixed_size_binary_array.type())
+          .byte_width();
+
+  // allocate memory for the decimal array
+  std::shared_ptr<Buffer> data;
+  RETURN_NOT_OK(::arrow::AllocateBuffer(pool, length * type_length, &data));
+
+  // raw bytes that we can write to
+  uint8_t* out_ptr = data->mutable_data();
+
+  // convert each FixedSizeBinary value to valid decimal bytes
+  const int64_t null_count = fixed_size_binary_array.null_count();
+  if (null_count > 0) {
+    for (int64_t i = 0; i < length; ++i, out_ptr += type_length) {
+      if (!fixed_size_binary_array.IsNull(i)) {
+        RawBytesToDecimalBytes(fixed_size_binary_array.GetValue(i), byte_width, out_ptr);
+      }
+    }
+  } else {
+    for (int64_t i = 0; i < length; ++i, out_ptr += type_length) {
+      RawBytesToDecimalBytes(fixed_size_binary_array.GetValue(i), byte_width, out_ptr);
+    }
+  }
+
+  *out = std::make_shared<::arrow::Decimal128Array>(
+      type, length, data, fixed_size_binary_array.null_bitmap(), null_count);
+
+  return Status::OK();
+}
+
+template <>
+Status ConvertToDecimal128<ByteArrayType>(const Array& array,
+                                          const std::shared_ptr<DataType>& type,
+                                          MemoryPool* pool, std::shared_ptr<Array>* out) {
+  const auto& binary_array = static_cast<const ::arrow::BinaryArray&>(array);
+  const int64_t length = binary_array.length();
+
+  const auto& decimal_type = static_cast<const ::arrow::Decimal128Type&>(*type);
+  const int64_t type_length = decimal_type.byte_width();
+
+  std::shared_ptr<Buffer> data;
+  RETURN_NOT_OK(::arrow::AllocateBuffer(pool, length * type_length, &data));
+
+  // raw bytes that we can write to
+  uint8_t* out_ptr = data->mutable_data();
+
+  const int64_t null_count = binary_array.null_count();
+
+  // convert each BinaryArray value to valid decimal bytes
+  for (int64_t i = 0; i < length; i++, out_ptr += type_length) {
+    int32_t record_len = 0;
+    const uint8_t* record_loc = binary_array.GetValue(i, &record_len);
+
+    if ((record_len < 0) || (record_len > type_length)) {
+      return Status::Invalid("Invalid BYTE_ARRAY size");
+    }
+
+    auto out_ptr_view = reinterpret_cast<uint64_t*>(out_ptr);
+    out_ptr_view[0] = 0;
+    out_ptr_view[1] = 0;
+
+    // only convert rows that are not null if there are nulls, or
+    // all rows, if there are not
+    if (((null_count > 0) && !binary_array.IsNull(i)) || (null_count <= 0)) {
+      RawBytesToDecimalBytes(record_loc, record_len, out_ptr);
+    }
+  }
+
+  *out = std::make_shared<::arrow::Decimal128Array>(
+      type, length, data, binary_array.null_bitmap(), null_count);
+  return Status::OK();
+}
+
+/// \brief Convert an Int32 or Int64 array into a Decimal128Array
+/// The parquet spec allows systems to write decimals in int32, int64 if the values are
+/// small enough to fit in less 4 bytes or less than 8 bytes, respectively.
+/// This function implements the conversion from int32 and int64 arrays to decimal arrays.
+template <typename ParquetIntegerType,
+          typename = typename std::enable_if<
+              std::is_same<ParquetIntegerType, Int32Type>::value ||
+              std::is_same<ParquetIntegerType, Int64Type>::value>::type>
+static Status DecimalIntegerTransfer(RecordReader* reader, MemoryPool* pool,
+                                     const std::shared_ptr<DataType>& type, Datum* out) {
+  DCHECK_EQ(type->id(), ::arrow::Type::DECIMAL);
+
+  const int64_t length = reader->values_written();
+
+  using ElementType = typename ParquetIntegerType::c_type;
+  static_assert(std::is_same<ElementType, int32_t>::value ||
+                    std::is_same<ElementType, int64_t>::value,
+                "ElementType must be int32_t or int64_t");
+
+  const auto values = reinterpret_cast<const ElementType*>(reader->values());
+
+  const auto& decimal_type = static_cast<const ::arrow::Decimal128Type&>(*type);
+  const int64_t type_length = decimal_type.byte_width();
+
+  std::shared_ptr<Buffer> data;
+  RETURN_NOT_OK(::arrow::AllocateBuffer(pool, length * type_length, &data));
+  uint8_t* out_ptr = data->mutable_data();
+
+  using ::arrow::BitUtil::FromLittleEndian;
+
+  for (int64_t i = 0; i < length; ++i, out_ptr += type_length) {
+    // sign/zero extend int32_t values, otherwise a no-op
+    const auto value = static_cast<int64_t>(values[i]);
+
+    auto out_ptr_view = reinterpret_cast<uint64_t*>(out_ptr);
+
+    // No-op on little endian machines, byteswap on big endian
+    out_ptr_view[0] = FromLittleEndian(static_cast<uint64_t>(value));
+
+    // no need to byteswap here because we're sign/zero extending exactly 8 bytes
+    out_ptr_view[1] = static_cast<uint64_t>(value < 0 ? -1 : 0);
+  }
+
+  if (reader->nullable_values()) {
+    std::shared_ptr<ResizableBuffer> is_valid = reader->ReleaseIsValid();
+    *out = std::make_shared<::arrow::Decimal128Array>(type, length, data, is_valid,
+                                                      reader->null_count());
+  } else {
+    *out = std::make_shared<::arrow::Decimal128Array>(type, length, data);
+  }
+  return Status::OK();
+}
+
+/// \brief Convert an arrow::BinaryArray to an arrow::Decimal128Array
+/// We do this by:
+/// 1. Creating an arrow::BinaryArray from the RecordReader's builder
+/// 2. Allocating a buffer for the arrow::Decimal128Array
+/// 3. Converting the big-endian bytes in each BinaryArray entry to two integers
+///    representing the high and low bits of each decimal value.
+template <typename ParquetType>
+Status TransferDecimal(RecordReader* reader, MemoryPool* pool,
+                       const std::shared_ptr<DataType>& type, Datum* out) {
+  DCHECK_EQ(type->id(), ::arrow::Type::DECIMAL);
+
+  auto binary_reader = dynamic_cast<internal::BinaryRecordReader*>(reader);
+  DCHECK(binary_reader);
+  ::arrow::ArrayVector chunks = binary_reader->GetBuilderChunks();
+  for (size_t i = 0; i < chunks.size(); ++i) {
+    std::shared_ptr<Array> chunk_as_decimal;
+    RETURN_NOT_OK(
+        ConvertToDecimal128<ParquetType>(*chunks[i], type, pool, &chunk_as_decimal));
+    // Replace the chunk, which will hopefully also free memory as we go
+    chunks[i] = chunk_as_decimal;
+  }
+  *out = std::make_shared<ChunkedArray>(chunks);
+  return Status::OK();
+}
+
+#define TRANSFER_INT32(ENUM, ArrowType)                                              \
+  case ::arrow::Type::ENUM: {                                                        \
+    Status s = TransferInt<ArrowType, Int32Type>(reader, pool, value_type, &result); \
+    RETURN_NOT_OK(s);                                                                \
+  } break;
+
+#define TRANSFER_INT64(ENUM, ArrowType)                                              \
+  case ::arrow::Type::ENUM: {                                                        \
+    Status s = TransferInt<ArrowType, Int64Type>(reader, pool, value_type, &result); \
+    RETURN_NOT_OK(s);                                                                \
+  } break;
+
+Status TransferColumnData(internal::RecordReader* reader,
+                          std::shared_ptr<DataType> value_type,
+                          const ColumnDescriptor* descr, MemoryPool* pool,
+                          std::shared_ptr<ChunkedArray>* out) {
+  Datum result;
+  switch (value_type->id()) {
+    case ::arrow::Type::NA: {
+      result = std::make_shared<::arrow::NullArray>(reader->values_written());
+      break;
+    }
+    case ::arrow::Type::INT32:
+    case ::arrow::Type::INT64:
+    case ::arrow::Type::FLOAT:
+    case ::arrow::Type::DOUBLE:
+      result = TransferZeroCopy(reader, value_type);
+      break;
+    case ::arrow::Type::BOOL:
+      RETURN_NOT_OK(TransferBool(reader, pool, &result));
+      break;
+      TRANSFER_INT32(UINT8, ::arrow::UInt8Type);
+      TRANSFER_INT32(INT8, ::arrow::Int8Type);
+      TRANSFER_INT32(UINT16, ::arrow::UInt16Type);
+      TRANSFER_INT32(INT16, ::arrow::Int16Type);
+      TRANSFER_INT32(UINT32, ::arrow::UInt32Type);
+      TRANSFER_INT64(UINT64, ::arrow::UInt64Type);
+      TRANSFER_INT32(DATE32, ::arrow::Date32Type);
+      TRANSFER_INT32(TIME32, ::arrow::Time32Type);
+      TRANSFER_INT64(TIME64, ::arrow::Time64Type);
+    case ::arrow::Type::DATE64:
+      RETURN_NOT_OK(TransferDate64(reader, pool, value_type, &result));
+      break;
+    case ::arrow::Type::FIXED_SIZE_BINARY:
+    case ::arrow::Type::BINARY:
+    case ::arrow::Type::STRING: {
+      std::shared_ptr<ChunkedArray> out;
+      RETURN_NOT_OK(TransferBinary(reader, value_type, &out));
+      result = out;
+    } break;
+    case ::arrow::Type::DECIMAL: {
+      switch (descr->physical_type()) {
+        case ::parquet::Type::INT32: {
+          RETURN_NOT_OK(
+              DecimalIntegerTransfer<Int32Type>(reader, pool, value_type, &result));
+        } break;
+        case ::parquet::Type::INT64: {
+          RETURN_NOT_OK(
+              DecimalIntegerTransfer<Int64Type>(reader, pool, value_type, &result));
+        } break;
+        case ::parquet::Type::BYTE_ARRAY: {
+          RETURN_NOT_OK(
+              TransferDecimal<ByteArrayType>(reader, pool, value_type, &result));
+        } break;
+        case ::parquet::Type::FIXED_LEN_BYTE_ARRAY: {
+          RETURN_NOT_OK(TransferDecimal<FLBAType>(reader, pool, value_type, &result));
+        } break;
+        default:
+          return Status::Invalid(
+              "Physical type for decimal must be int32, int64, byte array, or fixed "
+              "length binary");
+      }
+    } break;
+    case ::arrow::Type::TIMESTAMP: {
+      const ::arrow::TimestampType& timestamp_type =
+          static_cast<::arrow::TimestampType&>(*value_type);
+      switch (timestamp_type.unit()) {
+        case ::arrow::TimeUnit::MILLI:
+        case ::arrow::TimeUnit::MICRO: {
+          result = TransferZeroCopy(reader, value_type);
+        } break;
+        case ::arrow::TimeUnit::NANO: {
+          if (descr->physical_type() == ::parquet::Type::INT96) {
+            RETURN_NOT_OK(TransferInt96(reader, pool, value_type, &result));
+          } else {
+            result = TransferZeroCopy(reader, value_type);
+          }
+        } break;
+        default:
+          return Status::NotImplemented("TimeUnit not supported");
+      }
+    } break;
+    default:
+      return Status::NotImplemented("No support for reading columns of type ",
+                                    value_type->ToString());
+  }
+
+  DCHECK_NE(result.kind(), Datum::NONE);
+
+  if (result.kind() == Datum::ARRAY) {
+    *out = std::make_shared<ChunkedArray>(result.make_array());
+  } else if (result.kind() == Datum::CHUNKED_ARRAY) {
+    *out = result.chunked_array();
+  } else {
+    DCHECK(false) << "Should be impossible";
+  }
+
+  return Status::OK();
+}
+
+}  // namespace arrow
+}  // namespace parquet
diff --git a/cpp/src/parquet/api/reader.h b/cpp/src/parquet/arrow/reader_internal.h
similarity index 54%
copy from cpp/src/parquet/api/reader.h
copy to cpp/src/parquet/arrow/reader_internal.h
index b29ca72..5c0fa5e 100644
--- a/cpp/src/parquet/api/reader.h
+++ b/cpp/src/parquet/arrow/reader_internal.h
@@ -15,23 +15,36 @@
 // specific language governing permissions and limitations
 // under the License.
 
-#ifndef PARQUET_API_READER_H
-#define PARQUET_API_READER_H
-
-// Column reader API
-#include "parquet/column_reader.h"
-#include "parquet/column_scanner.h"
-#include "parquet/exception.h"
-#include "parquet/file_reader.h"
-#include "parquet/metadata.h"
-#include "parquet/platform.h"
-#include "parquet/printer.h"
-#include "parquet/properties.h"
-
-// Schemas
-#include "parquet/api/schema.h"
-
-// IO
-#include "parquet/api/io.h"
-
-#endif  // PARQUET_API_READER_H
+#pragma once
+
+#include <memory>
+
+namespace arrow {
+
+class ChunkedArray;
+class DataType;
+class MemoryPool;
+class Status;
+
+}  // namespace arrow
+
+namespace parquet {
+
+class ColumnDescriptor;
+
+namespace internal {
+
+class RecordReader;
+
+}  // namespace internal
+
+namespace arrow {
+
+::arrow::Status TransferColumnData(internal::RecordReader* reader,
+                                   std::shared_ptr<::arrow::DataType> value_type,
+                                   const ColumnDescriptor* descr,
+                                   ::arrow::MemoryPool* pool,
+                                   std::shared_ptr<::arrow::ChunkedArray>* out);
+
+}  // namespace arrow
+}  // namespace parquet
diff --git a/cpp/src/parquet/arrow/schema.cc b/cpp/src/parquet/arrow/schema.cc
index 1648f0e..6404338 100644
--- a/cpp/src/parquet/arrow/schema.cc
+++ b/cpp/src/parquet/arrow/schema.cc
@@ -17,19 +17,15 @@
 
 #include "parquet/arrow/schema.h"
 
-#include <algorithm>
 #include <string>
 #include <unordered_set>
 #include <utility>
 #include <vector>
 
-#include "arrow/array.h"
-#include "arrow/status.h"
 #include "arrow/type.h"
 #include "arrow/util/checked_cast.h"
 #include "arrow/util/logging.h"
 
-#include "parquet/arrow/reader.h"
 #include "parquet/arrow/writer.h"
 #include "parquet/exception.h"
 #include "parquet/properties.h"
diff --git a/cpp/src/parquet/arrow/schema.h b/cpp/src/parquet/arrow/schema.h
index 477597e..1133431 100644
--- a/cpp/src/parquet/arrow/schema.h
+++ b/cpp/src/parquet/arrow/schema.h
@@ -30,7 +30,6 @@ namespace arrow {
 
 class Field;
 class Schema;
-class Status;
 
 }  // namespace arrow
 
diff --git a/cpp/src/parquet/arrow/writer.cc b/cpp/src/parquet/arrow/writer.cc
index f2096d7..a786065 100644
--- a/cpp/src/parquet/arrow/writer.cc
+++ b/cpp/src/parquet/arrow/writer.cc
@@ -24,10 +24,8 @@
 #include <vector>
 
 #include "arrow/array.h"
-#include "arrow/buffer.h"
-#include "arrow/builder.h"
+#include "arrow/buffer-builder.h"
 #include "arrow/compute/api.h"
-#include "arrow/status.h"
 #include "arrow/table.h"
 #include "arrow/util/checked_cast.h"
 #include "arrow/visitor_inline.h"
@@ -37,7 +35,6 @@
 #include "parquet/arrow/reader.h"
 #include "parquet/arrow/schema.h"
 #include "parquet/column_writer.h"
-#include "parquet/deprecated_io.h"
 #include "parquet/exception.h"
 #include "parquet/file_writer.h"
 #include "parquet/platform.h"
diff --git a/cpp/src/parquet/arrow/writer.h b/cpp/src/parquet/arrow/writer.h
index 6906e46..2b9d892 100644
--- a/cpp/src/parquet/arrow/writer.h
+++ b/cpp/src/parquet/arrow/writer.h
@@ -19,6 +19,7 @@
 #define PARQUET_ARROW_WRITER_H
 
 #include <cstdint>
+#include <cstring>
 #include <memory>
 
 #include "parquet/platform.h"
@@ -31,16 +32,8 @@ namespace arrow {
 
 class Array;
 class ChunkedArray;
-class MemoryPool;
-class Status;
 class Table;
 
-namespace io {
-
-class OutputStream;
-
-}  // namespace io
-
 }  // namespace arrow
 
 namespace parquet {
diff --git a/cpp/src/parquet/bloom_filter.cc b/cpp/src/parquet/bloom_filter.cc
index 5effa17..a4fabb0 100644
--- a/cpp/src/parquet/bloom_filter.cc
+++ b/cpp/src/parquet/bloom_filter.cc
@@ -18,8 +18,6 @@
 #include <cstdint>
 #include <cstring>
 
-#include "arrow/buffer.h"
-#include "arrow/memory_pool.h"
 #include "arrow/util/logging.h"
 #include "parquet/bloom_filter.h"
 #include "parquet/exception.h"
diff --git a/cpp/src/parquet/bloom_filter.h b/cpp/src/parquet/bloom_filter.h
index 0285b8f..215a470 100644
--- a/cpp/src/parquet/bloom_filter.h
+++ b/cpp/src/parquet/bloom_filter.h
@@ -27,12 +27,6 @@
 #include "parquet/platform.h"
 #include "parquet/types.h"
 
-namespace arrow {
-
-class MemoryPool;
-
-}  // namespace arrow
-
 namespace parquet {
 
 // A Bloom filter is a compact structure to indicate whether an item is not in a set or
diff --git a/cpp/src/parquet/column_reader.cc b/cpp/src/parquet/column_reader.cc
index 4626869..925a4ff 100644
--- a/cpp/src/parquet/column_reader.cc
+++ b/cpp/src/parquet/column_reader.cc
@@ -17,27 +17,30 @@
 
 #include "parquet/column_reader.h"
 
+#include <algorithm>
 #include <cstdint>
+#include <cstring>
 #include <exception>
 #include <iostream>
 #include <memory>
-#include <vector>
+#include <unordered_map>
+#include <utility>
 
-#include "arrow/buffer.h"
+#include "arrow/array.h"
 #include "arrow/builder.h"
 #include "arrow/table.h"
+#include "arrow/type.h"
 #include "arrow/util/bit-stream-utils.h"
 #include "arrow/util/checked_cast.h"
 #include "arrow/util/compression.h"
 #include "arrow/util/logging.h"
 #include "arrow/util/rle-encoding.h"
-#include "arrow/util/ubsan.h"
 
 #include "parquet/column_page.h"
 #include "parquet/encoding.h"
 #include "parquet/properties.h"
 #include "parquet/statistics.h"
-#include "parquet/thrift.h"
+#include "parquet/thrift.h"  // IWYU pragma: keep
 
 using arrow::MemoryPool;
 using arrow::internal::checked_cast;
@@ -1244,7 +1247,9 @@ class ByteArrayDictionaryRecordReader : public TypedRecordReader<ByteArrayType>,
  public:
   ByteArrayDictionaryRecordReader(const ColumnDescriptor* descr,
                                   ::arrow::MemoryPool* pool)
-      : TypedRecordReader<ByteArrayType>(descr, pool), builder_(pool) {}
+      : TypedRecordReader<ByteArrayType>(descr, pool), builder_(pool) {
+    this->read_dictionary_ = true;
+  }
 
   std::shared_ptr<::arrow::ChunkedArray> GetResult() override {
     FlushBuilder();
diff --git a/cpp/src/parquet/column_reader.h b/cpp/src/parquet/column_reader.h
index 8022be9..2b6ec9f 100644
--- a/cpp/src/parquet/column_reader.h
+++ b/cpp/src/parquet/column_reader.h
@@ -17,14 +17,10 @@
 
 #pragma once
 
-#include <algorithm>
 #include <cstdint>
 #include <memory>
-#include <unordered_map>
-#include <utility>
 #include <vector>
 
-#include "parquet/encoding.h"
 #include "parquet/exception.h"
 #include "parquet/platform.h"
 #include "parquet/schema.h"
@@ -47,7 +43,6 @@ class RleDecoder;
 
 namespace parquet {
 
-class DictionaryPage;
 class Page;
 
 // 16 MB is the default maximum page header size
@@ -256,6 +251,9 @@ class RecordReader {
   /// \brief True if the leaf values are nullable
   bool nullable_values() const { return nullable_values_; }
 
+  /// \brief True if reading directly as Arrow dictionary-encoded
+  bool read_dictionary() const { return read_dictionary_; }
+
  protected:
   bool nullable_values_;
 
@@ -278,6 +276,8 @@ class RecordReader {
   std::shared_ptr<::arrow::ResizableBuffer> valid_bits_;
   std::shared_ptr<::arrow::ResizableBuffer> def_levels_;
   std::shared_ptr<::arrow::ResizableBuffer> rep_levels_;
+
+  bool read_dictionary_ = false;
 };
 
 class BinaryRecordReader : virtual public RecordReader {
diff --git a/cpp/src/parquet/column_writer.cc b/cpp/src/parquet/column_writer.cc
index f2783d0..86e21c2 100644
--- a/cpp/src/parquet/column_writer.cc
+++ b/cpp/src/parquet/column_writer.cc
@@ -17,22 +17,26 @@
 
 #include "parquet/column_writer.h"
 
-#include <algorithm>
 #include <cstdint>
+#include <cstring>
 #include <memory>
 #include <utility>
+#include <vector>
 
 #include "arrow/buffer-builder.h"
-#include "arrow/status.h"
+#include "arrow/memory_pool.h"
 #include "arrow/util/bit-stream-utils.h"
 #include "arrow/util/checked_cast.h"
 #include "arrow/util/compression.h"
 #include "arrow/util/logging.h"
 #include "arrow/util/rle-encoding.h"
 
+#include "parquet/column_page.h"
+#include "parquet/encoding.h"
 #include "parquet/metadata.h"
 #include "parquet/platform.h"
 #include "parquet/properties.h"
+#include "parquet/schema.h"
 #include "parquet/statistics.h"
 #include "parquet/thrift.h"
 #include "parquet/types.h"
diff --git a/cpp/src/parquet/column_writer.h b/cpp/src/parquet/column_writer.h
index 023b965..0a67a86 100644
--- a/cpp/src/parquet/column_writer.h
+++ b/cpp/src/parquet/column_writer.h
@@ -19,15 +19,9 @@
 
 #include <cstdint>
 #include <memory>
-#include <vector>
 
-#include "arrow/memory_pool.h"
-
-#include "parquet/column_page.h"
-#include "parquet/encoding.h"
 #include "parquet/exception.h"
 #include "parquet/platform.h"
-#include "parquet/schema.h"
 #include "parquet/types.h"
 
 namespace arrow {
@@ -44,6 +38,9 @@ class RleEncoder;
 
 namespace parquet {
 
+class ColumnDescriptor;
+class CompressedDataPage;
+class DictionaryPage;
 class ColumnChunkMetaDataBuilder;
 class WriterProperties;
 
diff --git a/cpp/src/parquet/deprecated_io.cc b/cpp/src/parquet/deprecated_io.cc
index 893f88e..6e02281 100644
--- a/cpp/src/parquet/deprecated_io.cc
+++ b/cpp/src/parquet/deprecated_io.cc
@@ -17,16 +17,10 @@
 
 #include "parquet/deprecated_io.h"
 
-#include <algorithm>
 #include <cstdint>
-#include <cstdio>
-#include <string>
 #include <utility>
 
-#include "arrow/status.h"
-
 #include "parquet/exception.h"
-#include "parquet/types.h"
 
 namespace parquet {
 
diff --git a/cpp/src/parquet/deprecated_io.h b/cpp/src/parquet/deprecated_io.h
index 8dfdeda..ddaad72 100644
--- a/cpp/src/parquet/deprecated_io.h
+++ b/cpp/src/parquet/deprecated_io.h
@@ -23,20 +23,9 @@
 #pragma once
 
 #include <cstdint>
-#include <cstdlib>
-#include <cstring>
 #include <memory>
-#include <string>
-#include <vector>
 
-#include "arrow/buffer.h"
-#include "arrow/io/interfaces.h"
-#include "arrow/io/memory.h"
-#include "arrow/memory_pool.h"
-
-#include "parquet/exception.h"
 #include "parquet/platform.h"
-#include "parquet/types.h"
 
 namespace parquet {
 
diff --git a/cpp/src/parquet/encoding.cc b/cpp/src/parquet/encoding.cc
index 26b9050..58432de 100644
--- a/cpp/src/parquet/encoding.cc
+++ b/cpp/src/parquet/encoding.cc
@@ -24,12 +24,13 @@
 #include <utility>
 #include <vector>
 
+#include "arrow/array.h"
+#include "arrow/builder.h"
 #include "arrow/util/bit-stream-utils.h"
 #include "arrow/util/checked_cast.h"
 #include "arrow/util/hashing.h"
 #include "arrow/util/logging.h"
 #include "arrow/util/rle-encoding.h"
-#include "arrow/util/string_view.h"
 #include "arrow/util/ubsan.h"
 
 #include "parquet/exception.h"
diff --git a/cpp/src/parquet/file_reader.cc b/cpp/src/parquet/file_reader.cc
index d0ca9ca..6a11bae 100644
--- a/cpp/src/parquet/file_reader.cc
+++ b/cpp/src/parquet/file_reader.cc
@@ -21,12 +21,11 @@
 #include <cstdint>
 #include <cstring>
 #include <memory>
+#include <ostream>
 #include <string>
 #include <utility>
 
-#include "arrow/buffer.h"
 #include "arrow/io/file.h"
-#include "arrow/status.h"
 #include "arrow/util/logging.h"
 #include "arrow/util/ubsan.h"
 
diff --git a/cpp/src/parquet/file_reader.h b/cpp/src/parquet/file_reader.h
index 214cf11..e5f2d60 100644
--- a/cpp/src/parquet/file_reader.h
+++ b/cpp/src/parquet/file_reader.h
@@ -23,7 +23,7 @@
 #include <string>
 #include <vector>
 
-#include "parquet/metadata.h"  // IWYU pragma:: keep
+#include "parquet/metadata.h"  // IWYU pragma: keep
 #include "parquet/platform.h"
 #include "parquet/properties.h"
 
diff --git a/cpp/src/parquet/file_writer.cc b/cpp/src/parquet/file_writer.cc
index f2f42f3..a741cfa 100644
--- a/cpp/src/parquet/file_writer.cc
+++ b/cpp/src/parquet/file_writer.cc
@@ -17,11 +17,14 @@
 
 #include "parquet/file_writer.h"
 
+#include <cstddef>
+#include <ostream>
 #include <utility>
 #include <vector>
 
 #include "parquet/column_writer.h"
 #include "parquet/deprecated_io.h"
+#include "parquet/exception.h"
 #include "parquet/platform.h"
 #include "parquet/schema.h"
 
diff --git a/cpp/src/parquet/file_writer.h b/cpp/src/parquet/file_writer.h
index cdc787f..585e744 100644
--- a/cpp/src/parquet/file_writer.h
+++ b/cpp/src/parquet/file_writer.h
@@ -20,25 +20,12 @@
 
 #include <cstdint>
 #include <memory>
-#include <ostream>
 
-#include "parquet/exception.h"
 #include "parquet/metadata.h"
 #include "parquet/platform.h"
 #include "parquet/properties.h"
 #include "parquet/schema.h"
 
-namespace arrow {
-
-class MemoryPool;
-
-namespace io {
-
-class OutputStream;
-
-}  // namespace io
-}  // namespace arrow
-
 namespace parquet {
 
 class ColumnWriter;
diff --git a/cpp/src/parquet/metadata.cc b/cpp/src/parquet/metadata.cc
index e676486..85073d1 100644
--- a/cpp/src/parquet/metadata.cc
+++ b/cpp/src/parquet/metadata.cc
@@ -24,7 +24,6 @@
 
 #include "parquet/exception.h"
 #include "parquet/metadata.h"
-#include "parquet/platform.h"
 #include "parquet/schema-internal.h"
 #include "parquet/schema.h"
 #include "parquet/statistics.h"
diff --git a/cpp/src/parquet/metadata.h b/cpp/src/parquet/metadata.h
index b3a5f7b..7816af4 100644
--- a/cpp/src/parquet/metadata.h
+++ b/cpp/src/parquet/metadata.h
@@ -24,7 +24,6 @@
 #include <vector>
 
 #include "arrow/util/key_value_metadata.h"
-#include "arrow/util/macros.h"
 
 #include "parquet/platform.h"
 #include "parquet/properties.h"
diff --git a/cpp/src/parquet/platform.h b/cpp/src/parquet/platform.h
index 25d8dd4..24387c4 100644
--- a/cpp/src/parquet/platform.h
+++ b/cpp/src/parquet/platform.h
@@ -17,6 +17,7 @@
 
 #pragma once
 
+#include <cstdint>
 #include <memory>
 
 #include "arrow/buffer.h"            // IWYU pragma: export
diff --git a/cpp/src/parquet/properties.cc b/cpp/src/parquet/properties.cc
index d0cb574..4d3c5d6 100644
--- a/cpp/src/parquet/properties.cc
+++ b/cpp/src/parquet/properties.cc
@@ -20,7 +20,6 @@
 #include "parquet/properties.h"
 
 #include "arrow/io/buffered.h"
-#include "arrow/io/memory.h"
 
 namespace parquet {
 
diff --git a/cpp/src/parquet/schema.cc b/cpp/src/parquet/schema.cc
index cd2303a..71aa919 100644
--- a/cpp/src/parquet/schema.cc
+++ b/cpp/src/parquet/schema.cc
@@ -21,8 +21,12 @@
 #include <cstring>
 #include <memory>
 #include <string>
+#include <type_traits>
 #include <utility>
+
 #include "arrow/util/logging.h"
+#include "arrow/util/macros.h"
+
 #include "parquet/exception.h"
 #include "parquet/schema-internal.h"
 #include "parquet/thrift.h"
diff --git a/cpp/src/parquet/schema.h b/cpp/src/parquet/schema.h
index 740edbc..566d5fb 100644
--- a/cpp/src/parquet/schema.h
+++ b/cpp/src/parquet/schema.h
@@ -28,8 +28,6 @@
 #include <unordered_map>
 #include <vector>
 
-#include "arrow/util/macros.h"
-
 #include "parquet/platform.h"
 #include "parquet/types.h"
 
@@ -94,8 +92,6 @@ class PARQUET_EXPORT ColumnPath {
   std::vector<std::string> path_;
 };
 
-class GroupNode;
-
 // Base class for logical schema types. A type has a name, repetition level,
 // and optionally a logical type (ConvertedType in Parquet metadata parlance)
 class PARQUET_EXPORT Node {
diff --git a/cpp/src/parquet/statistics.cc b/cpp/src/parquet/statistics.cc
index 54cad54..16abc15 100644
--- a/cpp/src/parquet/statistics.cc
+++ b/cpp/src/parquet/statistics.cc
@@ -25,6 +25,7 @@
 #include "parquet/encoding.h"
 #include "parquet/exception.h"
 #include "parquet/platform.h"
+#include "parquet/schema.h"
 #include "parquet/statistics.h"
 
 using arrow::default_memory_pool;
diff --git a/cpp/src/parquet/statistics.h b/cpp/src/parquet/statistics.h
index 7698224..402b3c3 100644
--- a/cpp/src/parquet/statistics.h
+++ b/cpp/src/parquet/statistics.h
@@ -18,16 +18,18 @@
 #pragma once
 
 #include <algorithm>
+#include <cstddef>
 #include <cstdint>
 #include <memory>
 #include <string>
 
 #include "parquet/platform.h"
-#include "parquet/schema.h"
 #include "parquet/types.h"
 
 namespace parquet {
 
+class ColumnDescriptor;
+
 // ----------------------------------------------------------------------
 // Value comparator interfaces
 
diff --git a/cpp/src/parquet/types.cc b/cpp/src/parquet/types.cc
index f89170d..822afcd 100644
--- a/cpp/src/parquet/types.cc
+++ b/cpp/src/parquet/types.cc
@@ -17,12 +17,9 @@
 
 #include <cmath>
 #include <cstdint>
-#include <cstring>
-#include <iomanip>
 #include <memory>
 #include <sstream>
 #include <string>
-#include <utility>
 
 #include "arrow/util/checked_cast.h"
 #include "arrow/util/compression.h"
diff --git a/cpp/src/parquet/types.h b/cpp/src/parquet/types.h
index f24ff85..b317784 100644
--- a/cpp/src/parquet/types.h
+++ b/cpp/src/parquet/types.h
@@ -21,7 +21,6 @@
 #include <algorithm>
 #include <cstdint>
 #include <cstring>
-#include <iterator>
 #include <memory>
 #include <sstream>
 #include <string>