You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2019/08/02 01:53:31 UTC

[arrow] branch master updated: ARROW-6077: [C++][Parquet] Build Arrow "schema tree" from Parquet schema to help with nested data implementation

This is an automated email from the ASF dual-hosted git repository.

wesm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new 06fd2da  ARROW-6077: [C++][Parquet] Build Arrow "schema tree" from Parquet schema to help with nested data implementation
06fd2da is described below

commit 06fd2da5e8e71b660e6eea4b7702ca175e31f3f5
Author: Wes McKinney <we...@apache.org>
AuthorDate: Thu Aug 1 20:53:17 2019 -0500

    ARROW-6077: [C++][Parquet] Build Arrow "schema tree" from Parquet schema to help with nested data implementation
    
    Introduces auxiliary internal `SchemaManifest` and `SchemaField` data structures.
    
    This also permits dictionary-encoded subfields in a slightly more principled way (the dictionary type creation is resolved one time, so this removes the `FixSchema` hacks that were there before). I rewrote the nested schema conversion logic to hopefully be slightly easier to follow though it could still use some work. I added comments within to explain the 3 different styles of list encoding
    
    There are a couple of API changes:
    
    * The `FileReader::GetSchema(indices, &schema)` method has been removed. The way that "projected" schemas were being constructed was pretty hacky, and this function is non-essential to the operation of the class. I had to remove bindings in the GLib and R libraries for this function, but as far as I can tell these bindings were non-essential to operation, and were added only because the function was there to wrap.
    * Added `FileWriter::Make` factory method, making constructor private
    
    This patch was pretty unpleasant to do -- it removes some hacky functions used to create Arrow fields with leaf nodes trimmed. There is little functional change; it is an attempt to bring a cleaner structure for full-fledged nested data reading
    
    I'm going to get on with seeing through user-facing dictionary-encoding functionality in Python
    
    Closes #4971 from wesm/parquet-arrow-schema-tree and squashes the following commits:
    
    e1f19c06b <Wes McKinney> Code review feedback
    e2c117ad1 <Wes McKinney> Factor out list nesting into helper function
    
    Authored-by: Wes McKinney <we...@apache.org>
    Signed-off-by: Wes McKinney <we...@apache.org>
---
 c_glib/parquet-glib/arrow-file-reader.cpp         |  45 +-
 c_glib/parquet-glib/arrow-file-reader.h           |   5 -
 c_glib/test/parquet/test-arrow-file-reader.rb     |  13 -
 cpp/src/parquet/arrow/arrow-reader-writer-test.cc |  52 +-
 cpp/src/parquet/arrow/arrow-schema-test.cc        | 355 ++++------
 cpp/src/parquet/arrow/reader.cc                   | 794 +++++++++-------------
 cpp/src/parquet/arrow/reader.h                    |  18 +-
 cpp/src/parquet/arrow/reader_internal.cc          | 633 ++++++++++++++++-
 cpp/src/parquet/arrow/reader_internal.h           | 161 ++++-
 cpp/src/parquet/arrow/schema.cc                   | 442 +-----------
 cpp/src/parquet/arrow/schema.h                    |  41 --
 cpp/src/parquet/arrow/writer.cc                   |  98 ++-
 cpp/src/parquet/arrow/writer.h                    |  13 +-
 cpp/src/parquet/file_writer.cc                    |   8 +-
 cpp/src/parquet/metadata.cc                       |  18 +-
 cpp/src/parquet/schema-internal.h                 |  51 --
 cpp/src/parquet/schema.cc                         |  11 +
 cpp/src/parquet/schema.h                          |   6 +
 python/pyarrow/_parquet.pxd                       |   4 +-
 r/R/arrowExports.R                                |   8 +-
 r/R/parquet.R                                     |  11 +-
 r/src/arrowExports.cpp                            |  29 +-
 r/src/parquet.cpp                                 |  21 +-
 23 files changed, 1408 insertions(+), 1429 deletions(-)

diff --git a/c_glib/parquet-glib/arrow-file-reader.cpp b/c_glib/parquet-glib/arrow-file-reader.cpp
index 217bd19..db59436 100644
--- a/c_glib/parquet-glib/arrow-file-reader.cpp
+++ b/c_glib/parquet-glib/arrow-file-reader.cpp
@@ -231,15 +231,8 @@ gparquet_arrow_file_reader_get_schema(GParquetArrowFileReader *reader,
 {
   auto parquet_arrow_file_reader = gparquet_arrow_file_reader_get_raw(reader);
 
-  const auto n_columns =
-    parquet_arrow_file_reader->parquet_reader()->metadata()->num_columns();
-  std::vector<int> indices(n_columns);
-  for (int i = 0; i < n_columns; ++i) {
-    indices[i] = i;
-  }
-
   std::shared_ptr<arrow::Schema> arrow_schema;
-  auto status = parquet_arrow_file_reader->GetSchema(indices, &arrow_schema);
+  auto status = parquet_arrow_file_reader->GetSchema(&arrow_schema);
   if (garrow_error_check(error,
                          status,
                          "[parquet][arrow][file-reader][get-schema]")) {
@@ -250,42 +243,6 @@ gparquet_arrow_file_reader_get_schema(GParquetArrowFileReader *reader,
 }
 
 /**
- * gparquet_arrow_file_reader_select_schema:
- * @reader: A #GParquetArrowFileReader.
- * @column_indexes: (array length=n_column_indexes):
- *   The array of column indexes to be selected.
- * @n_column_indexes: The length of `column_indexes`.
- * @error: (nullable): Return locatipcn for a #GError or %NULL.
- *
- * Returns: (transfer full) (nullable): A selected #GArrowSchema.
- *
- * Since: 0.12.0
- */
-GArrowSchema *
-gparquet_arrow_file_reader_select_schema(GParquetArrowFileReader *reader,
-                                         gint *column_indexes,
-                                         gsize n_column_indexes,
-                                         GError **error)
-{
-  auto parquet_arrow_file_reader = gparquet_arrow_file_reader_get_raw(reader);
-
-  std::vector<int> indices(n_column_indexes);
-  for (gsize i = 0; i < n_column_indexes; ++i) {
-    indices[i] = column_indexes[i];
-  }
-
-  std::shared_ptr<arrow::Schema> arrow_schema;
-  auto status = parquet_arrow_file_reader->GetSchema(indices, &arrow_schema);
-  if (garrow_error_check(error,
-                         status,
-                         "[parquet][arrow][file-reader][select-schema]")) {
-    return garrow_schema_new_raw(&arrow_schema);
-  } else {
-    return NULL;
-  }
-}
-
-/**
  * gparquet_arrow_file_reader_read_column_data:
  * @reader: A #GParquetArrowFileReader.
  * @i: The index of the column to be read. If it's negative, index is
diff --git a/c_glib/parquet-glib/arrow-file-reader.h b/c_glib/parquet-glib/arrow-file-reader.h
index a0d1a8e..5a6ec96 100644
--- a/c_glib/parquet-glib/arrow-file-reader.h
+++ b/c_glib/parquet-glib/arrow-file-reader.h
@@ -48,11 +48,6 @@ gparquet_arrow_file_reader_read_table(GParquetArrowFileReader *reader,
 GArrowSchema *
 gparquet_arrow_file_reader_get_schema(GParquetArrowFileReader *reader,
                                       GError **error);
-GArrowSchema *
-gparquet_arrow_file_reader_select_schema(GParquetArrowFileReader *reader,
-                                         gint *column_indexes,
-                                         gsize n_column_indexes,
-                                         GError **error);
 
 GARROW_AVAILABLE_IN_1_0
 GArrowChunkedArray *
diff --git a/c_glib/test/parquet/test-arrow-file-reader.rb b/c_glib/test/parquet/test-arrow-file-reader.rb
index 7ff17c2..d30c8e9 100644
--- a/c_glib/test/parquet/test-arrow-file-reader.rb
+++ b/c_glib/test/parquet/test-arrow-file-reader.rb
@@ -39,19 +39,6 @@ b: int32
     SCHEMA
   end
 
-  def test_select_schema
-    assert_equal(<<-SCHEMA.chomp, @reader.select_schema([0]).to_s)
-a: string
-    SCHEMA
-    assert_equal(<<-SCHEMA.chomp, @reader.select_schema([1]).to_s)
-b: int32
-    SCHEMA
-    assert_equal(<<-SCHEMA.chomp, @reader.select_schema([0, 1]).to_s)
-a: string
-b: int32
-    SCHEMA
-  end
-
   def test_read_column
     assert_equal([
                    Arrow::ChunkedArray.new([@a_array]),
diff --git a/cpp/src/parquet/arrow/arrow-reader-writer-test.cc b/cpp/src/parquet/arrow/arrow-reader-writer-test.cc
index d0d2536..2e57ab8 100644
--- a/cpp/src/parquet/arrow/arrow-reader-writer-test.cc
+++ b/cpp/src/parquet/arrow/arrow-reader-writer-test.cc
@@ -40,6 +40,7 @@
 #include "parquet/api/writer.h"
 
 #include "parquet/arrow/reader.h"
+#include "parquet/arrow/reader_internal.h"
 #include "parquet/arrow/schema.h"
 #include "parquet/arrow/test-util.h"
 #include "parquet/arrow/writer.h"
@@ -597,12 +598,16 @@ class TestParquetIO : public ::testing::Test {
     std::shared_ptr<::arrow::Schema> arrow_schema;
     ArrowReaderProperties props;
     ASSERT_OK_NO_THROW(FromParquetSchema(&descriptor, props, &arrow_schema));
-    FileWriter writer(::arrow::default_memory_pool(), MakeWriter(schema), arrow_schema);
-    ASSERT_OK_NO_THROW(writer.NewRowGroup(values->length()));
-    ASSERT_OK_NO_THROW(writer.WriteColumnChunk(*values));
-    ASSERT_OK_NO_THROW(writer.Close());
-    // writer.Close() should be idempotent
-    ASSERT_OK_NO_THROW(writer.Close());
+
+    std::unique_ptr<FileWriter> writer;
+    ASSERT_OK_NO_THROW(FileWriter::Make(::arrow::default_memory_pool(),
+                                        MakeWriter(schema), arrow_schema,
+                                        default_arrow_writer_properties(), &writer));
+    ASSERT_OK_NO_THROW(writer->NewRowGroup(values->length()));
+    ASSERT_OK_NO_THROW(writer->WriteColumnChunk(*values));
+    ASSERT_OK_NO_THROW(writer->Close());
+    // writer->Close() should be idempotent
+    ASSERT_OK_NO_THROW(writer->Close());
   }
 
   void ResetSink() { sink_ = CreateOutputStream(); }
@@ -789,13 +794,17 @@ TYPED_TEST(TestParquetIO, SingleColumnRequiredChunkedWrite) {
   std::shared_ptr<::arrow::Schema> arrow_schema;
   ArrowReaderProperties props;
   ASSERT_OK_NO_THROW(FromParquetSchema(&descriptor, props, &arrow_schema));
-  FileWriter writer(default_memory_pool(), this->MakeWriter(schema), arrow_schema);
+
+  std::unique_ptr<FileWriter> writer;
+  ASSERT_OK_NO_THROW(FileWriter::Make(::arrow::default_memory_pool(),
+                                      this->MakeWriter(schema), arrow_schema,
+                                      default_arrow_writer_properties(), &writer));
   for (int i = 0; i < 4; i++) {
-    ASSERT_OK_NO_THROW(writer.NewRowGroup(chunk_size));
+    ASSERT_OK_NO_THROW(writer->NewRowGroup(chunk_size));
     std::shared_ptr<Array> sliced_array = values->Slice(i * chunk_size, chunk_size);
-    ASSERT_OK_NO_THROW(writer.WriteColumnChunk(*sliced_array));
+    ASSERT_OK_NO_THROW(writer->WriteColumnChunk(*sliced_array));
   }
-  ASSERT_OK_NO_THROW(writer.Close());
+  ASSERT_OK_NO_THROW(writer->Close());
 
   ASSERT_NO_FATAL_FAILURE(this->ReadAndCheckSingleColumnFile(*values));
 }
@@ -859,14 +868,17 @@ TYPED_TEST(TestParquetIO, SingleColumnOptionalChunkedWrite) {
   std::shared_ptr<::arrow::Schema> arrow_schema;
   ArrowReaderProperties props;
   ASSERT_OK_NO_THROW(FromParquetSchema(&descriptor, props, &arrow_schema));
-  FileWriter writer(::arrow::default_memory_pool(), this->MakeWriter(schema),
-                    arrow_schema);
+
+  std::unique_ptr<FileWriter> writer;
+  ASSERT_OK_NO_THROW(FileWriter::Make(::arrow::default_memory_pool(),
+                                      this->MakeWriter(schema), arrow_schema,
+                                      default_arrow_writer_properties(), &writer));
   for (int i = 0; i < 4; i++) {
-    ASSERT_OK_NO_THROW(writer.NewRowGroup(chunk_size));
+    ASSERT_OK_NO_THROW(writer->NewRowGroup(chunk_size));
     std::shared_ptr<Array> sliced_array = values->Slice(i * chunk_size, chunk_size);
-    ASSERT_OK_NO_THROW(writer.WriteColumnChunk(*sliced_array));
+    ASSERT_OK_NO_THROW(writer->WriteColumnChunk(*sliced_array));
   }
-  ASSERT_OK_NO_THROW(writer.Close());
+  ASSERT_OK_NO_THROW(writer->Close());
 
   ASSERT_NO_FATAL_FAILURE(this->ReadAndCheckSingleColumnFile(*values));
 }
@@ -2624,11 +2636,15 @@ TEST(TestArrowReaderAdHoc, DISABLED_LargeStringColumn) {
       GroupNode::Make("schema", Repetition::REQUIRED, {schm->group_node()->field(0)}));
 
   auto writer = ParquetFileWriter::Open(sink, schm_node);
-  FileWriter arrow_writer(default_memory_pool(), std::move(writer), table->schema());
+
+  std::unique_ptr<FileWriter> arrow_writer;
+  ASSERT_OK_NO_THROW(FileWriter::Make(::arrow::default_memory_pool(), std::move(writer),
+                                      table->schema(), default_arrow_writer_properties(),
+                                      &arrow_writer));
   for (int i : {0, 1}) {
-    ASSERT_OK_NO_THROW(arrow_writer.WriteTable(*table, table->num_rows())) << i;
+    ASSERT_OK_NO_THROW(arrow_writer->WriteTable(*table, table->num_rows())) << i;
   }
-  ASSERT_OK_NO_THROW(arrow_writer.Close());
+  ASSERT_OK_NO_THROW(arrow_writer->Close());
 
   std::shared_ptr<Buffer> tables_buffer;
   ASSERT_OK_NO_THROW(sink->Finish(&tables_buffer));
diff --git a/cpp/src/parquet/arrow/arrow-schema-test.cc b/cpp/src/parquet/arrow/arrow-schema-test.cc
index 06646b4..b9a8e81 100644
--- a/cpp/src/parquet/arrow/arrow-schema-test.cc
+++ b/cpp/src/parquet/arrow/arrow-schema-test.cc
@@ -21,6 +21,7 @@
 #include "gtest/gtest.h"
 
 #include "parquet/arrow/reader.h"
+#include "parquet/arrow/reader_internal.h"
 #include "parquet/arrow/schema.h"
 #include "parquet/file_reader.h"
 #include "parquet/schema.h"
@@ -65,35 +66,21 @@ class TestConvertParquetSchema : public ::testing::Test {
   void CheckFlatSchema(const std::shared_ptr<::arrow::Schema>& expected_schema) {
     ASSERT_EQ(expected_schema->num_fields(), result_schema_->num_fields());
     for (int i = 0; i < expected_schema->num_fields(); ++i) {
-      auto lhs = result_schema_->field(i);
-      auto rhs = expected_schema->field(i);
-      EXPECT_TRUE(lhs->Equals(rhs))
-          << i << " " << lhs->ToString() << " != " << rhs->ToString();
+      auto result_field = result_schema_->field(i);
+      auto expected_field = expected_schema->field(i);
+      EXPECT_TRUE(result_field->Equals(expected_field))
+          << "Field " << i << "\n  result: " << result_field->ToString()
+          << "\n  expected: " << expected_field->ToString();
     }
   }
 
-  ::arrow::Status ConvertSchema(const std::vector<NodePtr>& nodes) {
-    NodePtr schema = GroupNode::Make("schema", Repetition::REPEATED, nodes);
-    descr_.Init(schema);
-    ArrowReaderProperties props;
-    return FromParquetSchema(&descr_, props, &result_schema_);
-  }
-
-  ::arrow::Status ConvertSchema(const std::vector<NodePtr>& nodes,
-                                const std::vector<int>& column_indices) {
-    NodePtr schema = GroupNode::Make("schema", Repetition::REPEATED, nodes);
-    descr_.Init(schema);
-    ArrowReaderProperties props;
-    return FromParquetSchema(&descr_, column_indices, props, &result_schema_);
-  }
-
   ::arrow::Status ConvertSchema(
       const std::vector<NodePtr>& nodes,
-      const std::shared_ptr<const KeyValueMetadata>& key_value_metadata) {
+      const std::shared_ptr<const KeyValueMetadata>& key_value_metadata = nullptr) {
     NodePtr schema = GroupNode::Make("schema", Repetition::REPEATED, nodes);
     descr_.Init(schema);
     ArrowReaderProperties props;
-    return FromParquetSchema(&descr_, {}, props, key_value_metadata, &result_schema_);
+    return FromParquetSchema(&descr_, props, key_value_metadata, &result_schema_);
   }
 
  protected:
@@ -107,69 +94,68 @@ TEST_F(TestConvertParquetSchema, ParquetFlatPrimitives) {
 
   parquet_fields.push_back(
       PrimitiveNode::Make("boolean", Repetition::REQUIRED, ParquetType::BOOLEAN));
-  arrow_fields.push_back(std::make_shared<Field>("boolean", BOOL, false));
+  arrow_fields.push_back(::arrow::field("boolean", BOOL, false));
 
   parquet_fields.push_back(
       PrimitiveNode::Make("int32", Repetition::REQUIRED, ParquetType::INT32));
-  arrow_fields.push_back(std::make_shared<Field>("int32", INT32, false));
+  arrow_fields.push_back(::arrow::field("int32", INT32, false));
 
   parquet_fields.push_back(
       PrimitiveNode::Make("int64", Repetition::REQUIRED, ParquetType::INT64));
-  arrow_fields.push_back(std::make_shared<Field>("int64", INT64, false));
+  arrow_fields.push_back(::arrow::field("int64", INT64, false));
 
   parquet_fields.push_back(PrimitiveNode::Make("timestamp", Repetition::REQUIRED,
                                                ParquetType::INT64,
                                                ConvertedType::TIMESTAMP_MILLIS));
   arrow_fields.push_back(
-      std::make_shared<Field>("timestamp", ::arrow::timestamp(TimeUnit::MILLI), false));
+      ::arrow::field("timestamp", ::arrow::timestamp(TimeUnit::MILLI), false));
 
   parquet_fields.push_back(PrimitiveNode::Make("timestamp[us]", Repetition::REQUIRED,
                                                ParquetType::INT64,
                                                ConvertedType::TIMESTAMP_MICROS));
-  arrow_fields.push_back(std::make_shared<Field>(
-      "timestamp[us]", ::arrow::timestamp(TimeUnit::MICRO), false));
+  arrow_fields.push_back(
+      ::arrow::field("timestamp[us]", ::arrow::timestamp(TimeUnit::MICRO), false));
 
   parquet_fields.push_back(PrimitiveNode::Make("date", Repetition::REQUIRED,
                                                ParquetType::INT32, ConvertedType::DATE));
-  arrow_fields.push_back(std::make_shared<Field>("date", ::arrow::date32(), false));
+  arrow_fields.push_back(::arrow::field("date", ::arrow::date32(), false));
 
   parquet_fields.push_back(PrimitiveNode::Make(
       "time32", Repetition::REQUIRED, ParquetType::INT32, ConvertedType::TIME_MILLIS));
   arrow_fields.push_back(
-      std::make_shared<Field>("time32", ::arrow::time32(TimeUnit::MILLI), false));
+      ::arrow::field("time32", ::arrow::time32(TimeUnit::MILLI), false));
 
   parquet_fields.push_back(PrimitiveNode::Make(
       "time64", Repetition::REQUIRED, ParquetType::INT64, ConvertedType::TIME_MICROS));
   arrow_fields.push_back(
-      std::make_shared<Field>("time64", ::arrow::time64(TimeUnit::MICRO), false));
+      ::arrow::field("time64", ::arrow::time64(TimeUnit::MICRO), false));
 
   parquet_fields.push_back(
       PrimitiveNode::Make("timestamp96", Repetition::REQUIRED, ParquetType::INT96));
-  arrow_fields.push_back(std::make_shared<Field>("timestamp96", TIMESTAMP_NS, false));
+  arrow_fields.push_back(::arrow::field("timestamp96", TIMESTAMP_NS, false));
 
   parquet_fields.push_back(
       PrimitiveNode::Make("float", Repetition::OPTIONAL, ParquetType::FLOAT));
-  arrow_fields.push_back(std::make_shared<Field>("float", FLOAT));
+  arrow_fields.push_back(::arrow::field("float", FLOAT));
 
   parquet_fields.push_back(
       PrimitiveNode::Make("double", Repetition::OPTIONAL, ParquetType::DOUBLE));
-  arrow_fields.push_back(std::make_shared<Field>("double", DOUBLE));
+  arrow_fields.push_back(::arrow::field("double", DOUBLE));
 
   parquet_fields.push_back(
       PrimitiveNode::Make("binary", Repetition::OPTIONAL, ParquetType::BYTE_ARRAY));
-  arrow_fields.push_back(std::make_shared<Field>("binary", BINARY));
+  arrow_fields.push_back(::arrow::field("binary", BINARY));
 
   parquet_fields.push_back(PrimitiveNode::Make(
       "string", Repetition::OPTIONAL, ParquetType::BYTE_ARRAY, ConvertedType::UTF8));
-  arrow_fields.push_back(std::make_shared<Field>("string", UTF8));
+  arrow_fields.push_back(::arrow::field("string", UTF8));
 
   parquet_fields.push_back(PrimitiveNode::Make("flba-binary", Repetition::OPTIONAL,
                                                ParquetType::FIXED_LEN_BYTE_ARRAY,
                                                ConvertedType::NONE, 12));
-  arrow_fields.push_back(
-      std::make_shared<Field>("flba-binary", ::arrow::fixed_size_binary(12)));
+  arrow_fields.push_back(::arrow::field("flba-binary", ::arrow::fixed_size_binary(12)));
 
-  auto arrow_schema = std::make_shared<::arrow::Schema>(arrow_fields);
+  auto arrow_schema = ::arrow::schema(arrow_fields);
   ASSERT_OK(ConvertSchema(parquet_fields));
 
   ASSERT_NO_FATAL_FAILURE(CheckFlatSchema(arrow_schema));
@@ -261,11 +247,11 @@ TEST_F(TestConvertParquetSchema, ParquetAnnotatedFields) {
     parquet_fields.push_back(PrimitiveNode::Make(c.name, Repetition::OPTIONAL,
                                                  c.logical_type, c.physical_type,
                                                  c.physical_length));
-    arrow_fields.push_back(std::make_shared<Field>(c.name, c.datatype));
+    arrow_fields.push_back(::arrow::field(c.name, c.datatype));
   }
 
   ASSERT_OK(ConvertSchema(parquet_fields));
-  auto arrow_schema = std::make_shared<::arrow::Schema>(arrow_fields);
+  auto arrow_schema = ::arrow::schema(arrow_fields);
   ASSERT_NO_FATAL_FAILURE(CheckFlatSchema(arrow_schema));
 }
 
@@ -275,26 +261,15 @@ TEST_F(TestConvertParquetSchema, DuplicateFieldNames) {
 
   parquet_fields.push_back(
       PrimitiveNode::Make("xxx", Repetition::REQUIRED, ParquetType::BOOLEAN));
-  auto arrow_field1 = std::make_shared<Field>("xxx", BOOL, false);
+  auto arrow_field1 = ::arrow::field("xxx", BOOL, false);
 
   parquet_fields.push_back(
       PrimitiveNode::Make("xxx", Repetition::REQUIRED, ParquetType::INT32));
-  auto arrow_field2 = std::make_shared<Field>("xxx", INT32, false);
+  auto arrow_field2 = ::arrow::field("xxx", INT32, false);
 
   ASSERT_OK(ConvertSchema(parquet_fields));
   arrow_fields = {arrow_field1, arrow_field2};
-  ASSERT_NO_FATAL_FAILURE(
-      CheckFlatSchema(std::make_shared<::arrow::Schema>(arrow_fields)));
-
-  ASSERT_OK(ConvertSchema(parquet_fields, std::vector<int>({0, 1})));
-  arrow_fields = {arrow_field1, arrow_field2};
-  ASSERT_NO_FATAL_FAILURE(
-      CheckFlatSchema(std::make_shared<::arrow::Schema>(arrow_fields)));
-
-  ASSERT_OK(ConvertSchema(parquet_fields, std::vector<int>({1, 0})));
-  arrow_fields = {arrow_field2, arrow_field1};
-  ASSERT_NO_FATAL_FAILURE(
-      CheckFlatSchema(std::make_shared<::arrow::Schema>(arrow_fields)));
+  ASSERT_NO_FATAL_FAILURE(CheckFlatSchema(::arrow::schema(arrow_fields)));
 }
 
 TEST_F(TestConvertParquetSchema, ParquetKeyValueMetadata) {
@@ -303,11 +278,11 @@ TEST_F(TestConvertParquetSchema, ParquetKeyValueMetadata) {
 
   parquet_fields.push_back(
       PrimitiveNode::Make("boolean", Repetition::REQUIRED, ParquetType::BOOLEAN));
-  arrow_fields.push_back(std::make_shared<Field>("boolean", BOOL, false));
+  arrow_fields.push_back(::arrow::field("boolean", BOOL, false));
 
   parquet_fields.push_back(
       PrimitiveNode::Make("int32", Repetition::REQUIRED, ParquetType::INT32));
-  arrow_fields.push_back(std::make_shared<Field>("int32", INT32, false));
+  arrow_fields.push_back(::arrow::field("int32", INT32, false));
 
   auto key_value_metadata = std::make_shared<KeyValueMetadata>();
   key_value_metadata->Append("foo", "bar");
@@ -327,7 +302,7 @@ TEST_F(TestConvertParquetSchema, ParquetEmptyKeyValueMetadata) {
 
   parquet_fields.push_back(
       PrimitiveNode::Make("int32", Repetition::REQUIRED, ParquetType::INT32));
-  arrow_fields.push_back(std::make_shared<Field>("int32", INT32, false));
+  arrow_fields.push_back(::arrow::field("int32", INT32, false));
 
   std::shared_ptr<KeyValueMetadata> key_value_metadata = nullptr;
   ASSERT_OK(ConvertSchema(parquet_fields, key_value_metadata));
@@ -343,24 +318,24 @@ TEST_F(TestConvertParquetSchema, ParquetFlatDecimals) {
   parquet_fields.push_back(PrimitiveNode::Make("flba-decimal", Repetition::OPTIONAL,
                                                ParquetType::FIXED_LEN_BYTE_ARRAY,
                                                ConvertedType::DECIMAL, 4, 8, 4));
-  arrow_fields.push_back(std::make_shared<Field>("flba-decimal", DECIMAL_8_4));
+  arrow_fields.push_back(::arrow::field("flba-decimal", DECIMAL_8_4));
 
   parquet_fields.push_back(PrimitiveNode::Make("binary-decimal", Repetition::OPTIONAL,
                                                ParquetType::BYTE_ARRAY,
                                                ConvertedType::DECIMAL, -1, 8, 4));
-  arrow_fields.push_back(std::make_shared<Field>("binary-decimal", DECIMAL_8_4));
+  arrow_fields.push_back(::arrow::field("binary-decimal", DECIMAL_8_4));
 
   parquet_fields.push_back(PrimitiveNode::Make("int32-decimal", Repetition::OPTIONAL,
                                                ParquetType::INT32, ConvertedType::DECIMAL,
                                                -1, 8, 4));
-  arrow_fields.push_back(std::make_shared<Field>("int32-decimal", DECIMAL_8_4));
+  arrow_fields.push_back(::arrow::field("int32-decimal", DECIMAL_8_4));
 
   parquet_fields.push_back(PrimitiveNode::Make("int64-decimal", Repetition::OPTIONAL,
                                                ParquetType::INT64, ConvertedType::DECIMAL,
                                                -1, 8, 4));
-  arrow_fields.push_back(std::make_shared<Field>("int64-decimal", DECIMAL_8_4));
+  arrow_fields.push_back(::arrow::field("int64-decimal", DECIMAL_8_4));
 
-  auto arrow_schema = std::make_shared<::arrow::Schema>(arrow_fields);
+  auto arrow_schema = ::arrow::schema(arrow_fields);
   ASSERT_OK(ConvertSchema(parquet_fields));
 
   ASSERT_NO_FATAL_FAILURE(CheckFlatSchema(arrow_schema));
@@ -384,9 +359,9 @@ TEST_F(TestConvertParquetSchema, ParquetLists) {
     auto list = GroupNode::Make("list", Repetition::REPEATED, {element});
     parquet_fields.push_back(
         GroupNode::Make("my_list", Repetition::REQUIRED, {list}, ConvertedType::LIST));
-    auto arrow_element = std::make_shared<Field>("string", UTF8, true);
-    auto arrow_list = std::make_shared<::arrow::ListType>(arrow_element);
-    arrow_fields.push_back(std::make_shared<Field>("my_list", arrow_list, false));
+    auto arrow_element = ::arrow::field("string", UTF8, true);
+    auto arrow_list = ::arrow::list(arrow_element);
+    arrow_fields.push_back(::arrow::field("my_list", arrow_list, false));
   }
 
   // // List<String> (list nullable, elements non-null)
@@ -401,9 +376,9 @@ TEST_F(TestConvertParquetSchema, ParquetLists) {
     auto list = GroupNode::Make("list", Repetition::REPEATED, {element});
     parquet_fields.push_back(
         GroupNode::Make("my_list", Repetition::OPTIONAL, {list}, ConvertedType::LIST));
-    auto arrow_element = std::make_shared<Field>("string", UTF8, false);
-    auto arrow_list = std::make_shared<::arrow::ListType>(arrow_element);
-    arrow_fields.push_back(std::make_shared<Field>("my_list", arrow_list, true));
+    auto arrow_element = ::arrow::field("string", UTF8, false);
+    auto arrow_list = ::arrow::list(arrow_element);
+    arrow_fields.push_back(::arrow::field("my_list", arrow_list, true));
   }
 
   // Element types can be nested structures. For example, a list of lists:
@@ -427,11 +402,11 @@ TEST_F(TestConvertParquetSchema, ParquetLists) {
     auto list = GroupNode::Make("list", Repetition::REPEATED, {element});
     parquet_fields.push_back(GroupNode::Make("array_of_arrays", Repetition::OPTIONAL,
                                              {list}, ConvertedType::LIST));
-    auto arrow_inner_element = std::make_shared<Field>("int32", INT32, false);
-    auto arrow_inner_list = std::make_shared<::arrow::ListType>(arrow_inner_element);
-    auto arrow_element = std::make_shared<Field>("element", arrow_inner_list, false);
-    auto arrow_list = std::make_shared<::arrow::ListType>(arrow_element);
-    arrow_fields.push_back(std::make_shared<Field>("array_of_arrays", arrow_list, true));
+    auto arrow_inner_element = ::arrow::field("int32", INT32, false);
+    auto arrow_inner_list = ::arrow::list(arrow_inner_element);
+    auto arrow_element = ::arrow::field("element", arrow_inner_list, false);
+    auto arrow_list = ::arrow::list(arrow_element);
+    arrow_fields.push_back(::arrow::field("array_of_arrays", arrow_list, true));
   }
 
   // // List<String> (list nullable, elements non-null)
@@ -446,9 +421,9 @@ TEST_F(TestConvertParquetSchema, ParquetLists) {
     auto list = GroupNode::Make("element", Repetition::REPEATED, {element});
     parquet_fields.push_back(
         GroupNode::Make("my_list", Repetition::OPTIONAL, {list}, ConvertedType::LIST));
-    auto arrow_element = std::make_shared<Field>("str", UTF8, false);
-    auto arrow_list = std::make_shared<::arrow::ListType>(arrow_element);
-    arrow_fields.push_back(std::make_shared<Field>("my_list", arrow_list, true));
+    auto arrow_element = ::arrow::field("str", UTF8, false);
+    auto arrow_list = ::arrow::list(arrow_element);
+    arrow_fields.push_back(::arrow::field("my_list", arrow_list, true));
   }
 
   // // List<Integer> (nullable list, non-null elements)
@@ -460,9 +435,9 @@ TEST_F(TestConvertParquetSchema, ParquetLists) {
         PrimitiveNode::Make("element", Repetition::REPEATED, ParquetType::INT32);
     parquet_fields.push_back(
         GroupNode::Make("my_list", Repetition::OPTIONAL, {element}, ConvertedType::LIST));
-    auto arrow_element = std::make_shared<Field>("element", INT32, false);
-    auto arrow_list = std::make_shared<::arrow::ListType>(arrow_element);
-    arrow_fields.push_back(std::make_shared<Field>("my_list", arrow_list, true));
+    auto arrow_element = ::arrow::field("element", INT32, false);
+    auto arrow_list = ::arrow::list(arrow_element);
+    arrow_fields.push_back(::arrow::field("my_list", arrow_list, true));
   }
 
   // // List<Tuple<String, Integer>> (nullable list, non-null elements)
@@ -481,13 +456,13 @@ TEST_F(TestConvertParquetSchema, ParquetLists) {
         GroupNode::Make("element", Repetition::REPEATED, {str_element, num_element});
     parquet_fields.push_back(
         GroupNode::Make("my_list", Repetition::OPTIONAL, {element}, ConvertedType::LIST));
-    auto arrow_str = std::make_shared<Field>("str", UTF8, false);
-    auto arrow_num = std::make_shared<Field>("num", INT32, false);
+    auto arrow_str = ::arrow::field("str", UTF8, false);
+    auto arrow_num = ::arrow::field("num", INT32, false);
     std::vector<std::shared_ptr<Field>> fields({arrow_str, arrow_num});
-    auto arrow_struct = std::make_shared<::arrow::StructType>(fields);
-    auto arrow_element = std::make_shared<Field>("element", arrow_struct, false);
-    auto arrow_list = std::make_shared<::arrow::ListType>(arrow_element);
-    arrow_fields.push_back(std::make_shared<Field>("my_list", arrow_list, true));
+    auto arrow_struct = ::arrow::struct_(fields);
+    auto arrow_element = ::arrow::field("element", arrow_struct, false);
+    auto arrow_list = ::arrow::list(arrow_element);
+    arrow_fields.push_back(::arrow::field("my_list", arrow_list, true));
   }
 
   // // List<OneTuple<String>> (nullable list, non-null elements)
@@ -503,12 +478,12 @@ TEST_F(TestConvertParquetSchema, ParquetLists) {
     auto array = GroupNode::Make("array", Repetition::REPEATED, {element});
     parquet_fields.push_back(
         GroupNode::Make("my_list", Repetition::OPTIONAL, {array}, ConvertedType::LIST));
-    auto arrow_str = std::make_shared<Field>("str", UTF8, false);
+    auto arrow_str = ::arrow::field("str", UTF8, false);
     std::vector<std::shared_ptr<Field>> fields({arrow_str});
-    auto arrow_struct = std::make_shared<::arrow::StructType>(fields);
-    auto arrow_element = std::make_shared<Field>("array", arrow_struct, false);
-    auto arrow_list = std::make_shared<::arrow::ListType>(arrow_element);
-    arrow_fields.push_back(std::make_shared<Field>("my_list", arrow_list, true));
+    auto arrow_struct = ::arrow::struct_(fields);
+    auto arrow_element = ::arrow::field("array", arrow_struct, false);
+    auto arrow_list = ::arrow::list(arrow_element);
+    arrow_fields.push_back(::arrow::field("my_list", arrow_list, true));
   }
 
   // // List<OneTuple<String>> (nullable list, non-null elements)
@@ -524,12 +499,12 @@ TEST_F(TestConvertParquetSchema, ParquetLists) {
     auto array = GroupNode::Make("my_list_tuple", Repetition::REPEATED, {element});
     parquet_fields.push_back(
         GroupNode::Make("my_list", Repetition::OPTIONAL, {array}, ConvertedType::LIST));
-    auto arrow_str = std::make_shared<Field>("str", UTF8, false);
+    auto arrow_str = ::arrow::field("str", UTF8, false);
     std::vector<std::shared_ptr<Field>> fields({arrow_str});
-    auto arrow_struct = std::make_shared<::arrow::StructType>(fields);
-    auto arrow_element = std::make_shared<Field>("my_list_tuple", arrow_struct, false);
-    auto arrow_list = std::make_shared<::arrow::ListType>(arrow_element);
-    arrow_fields.push_back(std::make_shared<Field>("my_list", arrow_list, true));
+    auto arrow_struct = ::arrow::struct_(fields);
+    auto arrow_element = ::arrow::field("my_list_tuple", arrow_struct, false);
+    auto arrow_list = ::arrow::list(arrow_element);
+    arrow_fields.push_back(::arrow::field("my_list", arrow_list, true));
   }
 
   // One-level encoding: Only allows required lists with required cells
@@ -537,12 +512,12 @@ TEST_F(TestConvertParquetSchema, ParquetLists) {
   {
     parquet_fields.push_back(
         PrimitiveNode::Make("name", Repetition::REPEATED, ParquetType::INT32));
-    auto arrow_element = std::make_shared<Field>("name", INT32, false);
-    auto arrow_list = std::make_shared<::arrow::ListType>(arrow_element);
-    arrow_fields.push_back(std::make_shared<Field>("name", arrow_list, false));
+    auto arrow_element = ::arrow::field("name", INT32, false);
+    auto arrow_list = ::arrow::list(arrow_element);
+    arrow_fields.push_back(::arrow::field("name", arrow_list, false));
   }
 
-  auto arrow_schema = std::make_shared<::arrow::Schema>(arrow_fields);
+  auto arrow_schema = ::arrow::schema(arrow_fields);
   ASSERT_OK(ConvertSchema(parquet_fields));
 
   ASSERT_NO_FATAL_FAILURE(CheckFlatSchema(arrow_schema));
@@ -573,20 +548,20 @@ TEST_F(TestConvertParquetSchema, ParquetNestedSchema) {
     parquet_fields.push_back(
         PrimitiveNode::Make("leaf3", Repetition::REQUIRED, ParquetType::INT64));
 
-    auto group1_fields = {std::make_shared<Field>("leaf1", BOOL, false),
-                          std::make_shared<Field>("leaf2", INT32, false)};
-    auto arrow_group1_type = std::make_shared<::arrow::StructType>(group1_fields);
-    arrow_fields.push_back(std::make_shared<Field>("group1", arrow_group1_type, false));
-    arrow_fields.push_back(std::make_shared<Field>("leaf3", INT64, false));
+    auto group1_fields = {::arrow::field("leaf1", BOOL, false),
+                          ::arrow::field("leaf2", INT32, false)};
+    auto arrow_group1_type = ::arrow::struct_(group1_fields);
+    arrow_fields.push_back(::arrow::field("group1", arrow_group1_type, false));
+    arrow_fields.push_back(::arrow::field("leaf3", INT64, false));
   }
 
-  auto arrow_schema = std::make_shared<::arrow::Schema>(arrow_fields);
+  auto arrow_schema = ::arrow::schema(arrow_fields);
   ASSERT_OK(ConvertSchema(parquet_fields));
 
   ASSERT_NO_FATAL_FAILURE(CheckFlatSchema(arrow_schema));
 }
 
-TEST_F(TestConvertParquetSchema, ParquetNestedSchemaPartial) {
+TEST_F(TestConvertParquetSchema, ParquetNestedSchema2) {
   std::vector<NodePtr> parquet_fields;
   std::vector<std::shared_ptr<Field>> arrow_fields;
 
@@ -600,15 +575,6 @@ TEST_F(TestConvertParquetSchema, ParquetNestedSchemaPartial) {
   //   required int64 leaf4;
   // }
   // required int64 leaf5;
-  //
-  // Expected partial arrow schema (columns 0, 3, 4):
-  // required group group1 {
-  //   required int64 leaf1;
-  // }
-  // required group group2 {
-  //   required int64 leaf4;
-  // }
-  // required int64 leaf5;
   {
     parquet_fields.push_back(GroupNode::Make(
         "group1", Repetition::REQUIRED,
@@ -621,70 +587,19 @@ TEST_F(TestConvertParquetSchema, ParquetNestedSchemaPartial) {
     parquet_fields.push_back(
         PrimitiveNode::Make("leaf5", Repetition::REQUIRED, ParquetType::INT64));
 
-    auto group1_fields = {std::make_shared<Field>("leaf1", INT64, false)};
-    auto arrow_group1_type = std::make_shared<::arrow::StructType>(group1_fields);
-    auto group2_fields = {std::make_shared<Field>("leaf4", INT64, false)};
-    auto arrow_group2_type = std::make_shared<::arrow::StructType>(group2_fields);
-
-    arrow_fields.push_back(std::make_shared<Field>("group1", arrow_group1_type, false));
-    arrow_fields.push_back(std::make_shared<Field>("group2", arrow_group2_type, false));
-    arrow_fields.push_back(std::make_shared<Field>("leaf5", INT64, false));
+    auto group1_fields = {::arrow::field("leaf1", INT64, false),
+                          ::arrow::field("leaf2", INT64, false)};
+    auto arrow_group1_type = ::arrow::struct_(group1_fields);
+    auto group2_fields = {::arrow::field("leaf3", INT64, false),
+                          ::arrow::field("leaf4", INT64, false)};
+    auto arrow_group2_type = ::arrow::struct_(group2_fields);
+    arrow_fields.push_back(::arrow::field("group1", arrow_group1_type, false));
+    arrow_fields.push_back(::arrow::field("group2", arrow_group2_type, false));
+    arrow_fields.push_back(::arrow::field("leaf5", INT64, false));
   }
 
-  auto arrow_schema = std::make_shared<::arrow::Schema>(arrow_fields);
-  ASSERT_OK(ConvertSchema(parquet_fields, std::vector<int>{0, 3, 4}));
-
-  ASSERT_NO_FATAL_FAILURE(CheckFlatSchema(arrow_schema));
-}
-
-TEST_F(TestConvertParquetSchema, ParquetNestedSchemaPartialOrdering) {
-  std::vector<NodePtr> parquet_fields;
-  std::vector<std::shared_ptr<Field>> arrow_fields;
-
-  // Full Parquet Schema:
-  // required group group1 {
-  //   required int64 leaf1;
-  //   required int64 leaf2;
-  // }
-  // required group group2 {
-  //   required int64 leaf3;
-  //   required int64 leaf4;
-  // }
-  // required int64 leaf5;
-  //
-  // Expected partial arrow schema (columns 3, 4, 0):
-  // required group group2 {
-  //   required int64 leaf4;
-  // }
-  // required int64 leaf5;
-  // required group group1 {
-  //   required int64 leaf1;
-  // }
-  {
-    parquet_fields.push_back(GroupNode::Make(
-        "group1", Repetition::REQUIRED,
-        {PrimitiveNode::Make("leaf1", Repetition::REQUIRED, ParquetType::INT64),
-         PrimitiveNode::Make("leaf2", Repetition::REQUIRED, ParquetType::INT64)}));
-    parquet_fields.push_back(GroupNode::Make(
-        "group2", Repetition::REQUIRED,
-        {PrimitiveNode::Make("leaf3", Repetition::REQUIRED, ParquetType::INT64),
-         PrimitiveNode::Make("leaf4", Repetition::REQUIRED, ParquetType::INT64)}));
-    parquet_fields.push_back(
-        PrimitiveNode::Make("leaf5", Repetition::REQUIRED, ParquetType::INT64));
-
-    auto group1_fields = {std::make_shared<Field>("leaf1", INT64, false)};
-    auto arrow_group1_type = std::make_shared<::arrow::StructType>(group1_fields);
-    auto group2_fields = {std::make_shared<Field>("leaf4", INT64, false)};
-    auto arrow_group2_type = std::make_shared<::arrow::StructType>(group2_fields);
-
-    arrow_fields.push_back(std::make_shared<Field>("group2", arrow_group2_type, false));
-    arrow_fields.push_back(std::make_shared<Field>("leaf5", INT64, false));
-    arrow_fields.push_back(std::make_shared<Field>("group1", arrow_group1_type, false));
-  }
-
-  auto arrow_schema = std::make_shared<::arrow::Schema>(arrow_fields);
-  ASSERT_OK(ConvertSchema(parquet_fields, std::vector<int>{3, 4, 0}));
-
+  auto arrow_schema = ::arrow::schema(arrow_fields);
+  ASSERT_OK(ConvertSchema(parquet_fields));
   ASSERT_NO_FATAL_FAILURE(CheckFlatSchema(arrow_schema));
 }
 
@@ -708,23 +623,21 @@ TEST_F(TestConvertParquetSchema, ParquetRepeatedNestedSchema) {
              "innerGroup", Repetition::REPEATED,
              {PrimitiveNode::Make("leaf3", Repetition::OPTIONAL, ParquetType::INT32)})}));
 
-    auto inner_group_fields = {std::make_shared<Field>("leaf3", INT32, true)};
-    auto inner_group_type = std::make_shared<::arrow::StructType>(inner_group_fields);
+    auto inner_group_fields = {::arrow::field("leaf3", INT32, true)};
+    auto inner_group_type = ::arrow::struct_(inner_group_fields);
     auto outer_group_fields = {
-        std::make_shared<Field>("leaf2", INT32, true),
-        std::make_shared<Field>(
+        ::arrow::field("leaf2", INT32, true),
+        ::arrow::field(
             "innerGroup",
-            ::arrow::list(std::make_shared<Field>("innerGroup", inner_group_type, false)),
-            false)};
-    auto outer_group_type = std::make_shared<::arrow::StructType>(outer_group_fields);
+            ::arrow::list(::arrow::field("innerGroup", inner_group_type, false)), false)};
+    auto outer_group_type = ::arrow::struct_(outer_group_fields);
 
-    arrow_fields.push_back(std::make_shared<Field>("leaf1", INT32, true));
-    arrow_fields.push_back(std::make_shared<Field>(
+    arrow_fields.push_back(::arrow::field("leaf1", INT32, true));
+    arrow_fields.push_back(::arrow::field(
         "outerGroup",
-        ::arrow::list(std::make_shared<Field>("outerGroup", outer_group_type, false)),
-        false));
+        ::arrow::list(::arrow::field("outerGroup", outer_group_type, false)), false));
   }
-  auto arrow_schema = std::make_shared<::arrow::Schema>(arrow_fields);
+  auto arrow_schema = ::arrow::schema(arrow_fields);
   ASSERT_OK(ConvertSchema(parquet_fields));
 
   ASSERT_NO_FATAL_FAILURE(CheckFlatSchema(arrow_schema));
@@ -750,7 +663,7 @@ class TestConvertArrowSchema : public ::testing::Test {
   }
 
   ::arrow::Status ConvertSchema(const std::vector<std::shared_ptr<Field>>& fields) {
-    arrow_schema_ = std::make_shared<::arrow::Schema>(fields);
+    arrow_schema_ = ::arrow::schema(fields);
     std::shared_ptr<::parquet::WriterProperties> properties =
         ::parquet::default_writer_properties();
     return ToParquetSchema(arrow_schema_.get(), *properties.get(), &result_schema_);
@@ -767,51 +680,51 @@ TEST_F(TestConvertArrowSchema, ParquetFlatPrimitives) {
 
   parquet_fields.push_back(
       PrimitiveNode::Make("boolean", Repetition::REQUIRED, ParquetType::BOOLEAN));
-  arrow_fields.push_back(std::make_shared<Field>("boolean", BOOL, false));
+  arrow_fields.push_back(::arrow::field("boolean", BOOL, false));
 
   parquet_fields.push_back(
       PrimitiveNode::Make("int32", Repetition::REQUIRED, ParquetType::INT32));
-  arrow_fields.push_back(std::make_shared<Field>("int32", INT32, false));
+  arrow_fields.push_back(::arrow::field("int32", INT32, false));
 
   parquet_fields.push_back(
       PrimitiveNode::Make("int64", Repetition::REQUIRED, ParquetType::INT64));
-  arrow_fields.push_back(std::make_shared<Field>("int64", INT64, false));
+  arrow_fields.push_back(::arrow::field("int64", INT64, false));
 
   parquet_fields.push_back(PrimitiveNode::Make("date", Repetition::REQUIRED,
                                                ParquetType::INT32, ConvertedType::DATE));
-  arrow_fields.push_back(std::make_shared<Field>("date", ::arrow::date32(), false));
+  arrow_fields.push_back(::arrow::field("date", ::arrow::date32(), false));
 
   parquet_fields.push_back(PrimitiveNode::Make("date64", Repetition::REQUIRED,
                                                ParquetType::INT32, ConvertedType::DATE));
-  arrow_fields.push_back(std::make_shared<Field>("date64", ::arrow::date64(), false));
+  arrow_fields.push_back(::arrow::field("date64", ::arrow::date64(), false));
 
   parquet_fields.push_back(PrimitiveNode::Make("timestamp", Repetition::REQUIRED,
                                                ParquetType::INT64,
                                                ConvertedType::TIMESTAMP_MILLIS));
-  arrow_fields.push_back(std::make_shared<Field>(
-      "timestamp", ::arrow::timestamp(TimeUnit::MILLI, "UTC"), false));
+  arrow_fields.push_back(
+      ::arrow::field("timestamp", ::arrow::timestamp(TimeUnit::MILLI, "UTC"), false));
 
   parquet_fields.push_back(PrimitiveNode::Make("timestamp[us]", Repetition::REQUIRED,
                                                ParquetType::INT64,
                                                ConvertedType::TIMESTAMP_MICROS));
-  arrow_fields.push_back(std::make_shared<Field>(
-      "timestamp[us]", ::arrow::timestamp(TimeUnit::MICRO, "UTC"), false));
+  arrow_fields.push_back(
+      ::arrow::field("timestamp[us]", ::arrow::timestamp(TimeUnit::MICRO, "UTC"), false));
 
   parquet_fields.push_back(
       PrimitiveNode::Make("float", Repetition::OPTIONAL, ParquetType::FLOAT));
-  arrow_fields.push_back(std::make_shared<Field>("float", FLOAT));
+  arrow_fields.push_back(::arrow::field("float", FLOAT));
 
   parquet_fields.push_back(
       PrimitiveNode::Make("double", Repetition::OPTIONAL, ParquetType::DOUBLE));
-  arrow_fields.push_back(std::make_shared<Field>("double", DOUBLE));
+  arrow_fields.push_back(::arrow::field("double", DOUBLE));
 
   parquet_fields.push_back(PrimitiveNode::Make(
       "string", Repetition::OPTIONAL, ParquetType::BYTE_ARRAY, ConvertedType::UTF8));
-  arrow_fields.push_back(std::make_shared<Field>("string", UTF8));
+  arrow_fields.push_back(::arrow::field("string", UTF8));
 
   parquet_fields.push_back(PrimitiveNode::Make(
       "binary", Repetition::OPTIONAL, ParquetType::BYTE_ARRAY, ConvertedType::NONE));
-  arrow_fields.push_back(std::make_shared<Field>("binary", BINARY));
+  arrow_fields.push_back(::arrow::field("binary", BINARY));
 
   ASSERT_OK(ConvertSchema(arrow_fields));
 
@@ -920,7 +833,7 @@ TEST_F(TestConvertArrowSchema, ArrowNonconvertibleFields) {
   };
 
   for (const FieldConstructionArguments& c : cases) {
-    auto field = std::make_shared<Field>(c.name, c.datatype);
+    auto field = ::arrow::field(c.name, c.datatype);
     ASSERT_RAISES(NotImplemented, ConvertSchema({field}));
   }
 }
@@ -937,38 +850,38 @@ TEST_F(TestConvertArrowSchema, ParquetFlatPrimitivesAsDictionaries) {
 
   parquet_fields.push_back(
       PrimitiveNode::Make("int64", Repetition::REQUIRED, ParquetType::INT64));
-  arrow_fields.push_back(std::make_shared<Field>(
+  arrow_fields.push_back(::arrow::field(
       "int64", ::arrow::dictionary(::arrow::int8(), ::arrow::int64()), false));
 
   parquet_fields.push_back(PrimitiveNode::Make("date", Repetition::REQUIRED,
                                                ParquetType::INT32, ConvertedType::DATE));
-  arrow_fields.push_back(std::make_shared<Field>(
+  arrow_fields.push_back(::arrow::field(
       "date", ::arrow::dictionary(::arrow::int8(), ::arrow::date32()), false));
 
   parquet_fields.push_back(PrimitiveNode::Make("date64", Repetition::REQUIRED,
                                                ParquetType::INT32, ConvertedType::DATE));
-  arrow_fields.push_back(std::make_shared<Field>(
+  arrow_fields.push_back(::arrow::field(
       "date64", ::arrow::dictionary(::arrow::int8(), ::arrow::date64()), false));
 
   parquet_fields.push_back(
       PrimitiveNode::Make("float", Repetition::OPTIONAL, ParquetType::FLOAT));
-  arrow_fields.push_back(std::make_shared<Field>(
-      "float", ::arrow::dictionary(::arrow::int8(), ::arrow::float32())));
+  arrow_fields.push_back(
+      ::arrow::field("float", ::arrow::dictionary(::arrow::int8(), ::arrow::float32())));
 
   parquet_fields.push_back(
       PrimitiveNode::Make("double", Repetition::OPTIONAL, ParquetType::DOUBLE));
-  arrow_fields.push_back(std::make_shared<Field>(
-      "double", ::arrow::dictionary(::arrow::int8(), ::arrow::float64())));
+  arrow_fields.push_back(
+      ::arrow::field("double", ::arrow::dictionary(::arrow::int8(), ::arrow::float64())));
 
   parquet_fields.push_back(PrimitiveNode::Make(
       "string", Repetition::OPTIONAL, ParquetType::BYTE_ARRAY, ConvertedType::UTF8));
-  arrow_fields.push_back(std::make_shared<Field>(
-      "string", ::arrow::dictionary(::arrow::int8(), ::arrow::utf8())));
+  arrow_fields.push_back(
+      ::arrow::field("string", ::arrow::dictionary(::arrow::int8(), ::arrow::utf8())));
 
   parquet_fields.push_back(PrimitiveNode::Make(
       "binary", Repetition::OPTIONAL, ParquetType::BYTE_ARRAY, ConvertedType::NONE));
-  arrow_fields.push_back(std::make_shared<Field>(
-      "binary", ::arrow::dictionary(::arrow::int8(), ::arrow::binary())));
+  arrow_fields.push_back(
+      ::arrow::field("binary", ::arrow::dictionary(::arrow::int8(), ::arrow::binary())));
 
   ASSERT_OK(ConvertSchema(arrow_fields));
 
@@ -993,9 +906,9 @@ TEST_F(TestConvertArrowSchema, ParquetLists) {
     auto list = GroupNode::Make("list", Repetition::REPEATED, {element});
     parquet_fields.push_back(
         GroupNode::Make("my_list", Repetition::REQUIRED, {list}, ConvertedType::LIST));
-    auto arrow_element = std::make_shared<Field>("string", UTF8, true);
-    auto arrow_list = std::make_shared<::arrow::ListType>(arrow_element);
-    arrow_fields.push_back(std::make_shared<Field>("my_list", arrow_list, false));
+    auto arrow_element = ::arrow::field("string", UTF8, true);
+    auto arrow_list = ::arrow::list(arrow_element);
+    arrow_fields.push_back(::arrow::field("my_list", arrow_list, false));
   }
 
   // // List<String> (list nullable, elements non-null)
@@ -1010,9 +923,9 @@ TEST_F(TestConvertArrowSchema, ParquetLists) {
     auto list = GroupNode::Make("list", Repetition::REPEATED, {element});
     parquet_fields.push_back(
         GroupNode::Make("my_list", Repetition::OPTIONAL, {list}, ConvertedType::LIST));
-    auto arrow_element = std::make_shared<Field>("string", UTF8, false);
-    auto arrow_list = std::make_shared<::arrow::ListType>(arrow_element);
-    arrow_fields.push_back(std::make_shared<Field>("my_list", arrow_list, true));
+    auto arrow_element = ::arrow::field("string", UTF8, false);
+    auto arrow_list = ::arrow::list(arrow_element);
+    arrow_fields.push_back(::arrow::field("my_list", arrow_list, true));
   }
 
   ASSERT_OK(ConvertSchema(arrow_fields));
diff --git a/cpp/src/parquet/arrow/reader.cc b/cpp/src/parquet/arrow/reader.cc
index e710e3b..11be571 100644
--- a/cpp/src/parquet/arrow/reader.cc
+++ b/cpp/src/parquet/arrow/reader.cc
@@ -19,7 +19,6 @@
 
 #include <algorithm>
 #include <cstring>
-#include <deque>
 #include <functional>
 #include <future>
 #include <numeric>
@@ -27,7 +26,6 @@
 #include <vector>
 
 #include "arrow/array.h"
-#include "arrow/builder.h"
 #include "arrow/record_batch.h"
 #include "arrow/table.h"
 #include "arrow/type.h"
@@ -58,7 +56,9 @@ using arrow::StructArray;
 using arrow::Table;
 using arrow::TimestampArray;
 
+using parquet::schema::GroupNode;
 using parquet::schema::Node;
+using parquet::schema::PrimitiveNode;
 
 // Help reduce verbosity
 using ParquetReader = parquet::ParquetFileReader;
@@ -77,114 +77,37 @@ namespace parquet {
 namespace arrow {
 
 class ColumnChunkReaderImpl;
-class ColumnReaderImpl;
 
-ArrowReaderProperties default_arrow_reader_properties() {
-  static ArrowReaderProperties default_reader_props;
-  return default_reader_props;
-}
-
-// ----------------------------------------------------------------------
-// Iteration utilities
-
-// Abstraction to decouple row group iteration details from the ColumnReader,
-// so we can read only a single row group if we want
-class FileColumnIterator {
+class ColumnReaderImpl : public ColumnReader {
  public:
-  explicit FileColumnIterator(int column_index, ParquetFileReader* reader,
-                              std::vector<int> row_groups)
-      : column_index_(column_index),
-        reader_(reader),
-        schema_(reader->metadata()->schema()),
-        row_groups_(row_groups.begin(), row_groups.end()) {}
-
-  virtual ~FileColumnIterator() {}
-
-  std::unique_ptr<::parquet::PageReader> NextChunk() {
-    if (row_groups_.empty()) {
-      return nullptr;
-    }
-
-    auto row_group_reader = reader_->RowGroup(row_groups_.front());
-    row_groups_.pop_front();
-    return row_group_reader->GetColumnPageReader(column_index_);
-  }
-
-  const SchemaDescriptor* schema() const { return schema_; }
+  enum ReaderType { PRIMITIVE, LIST, STRUCT };
 
-  const ColumnDescriptor* descr() const { return schema_->Column(column_index_); }
-
-  std::shared_ptr<FileMetaData> metadata() const { return reader_->metadata(); }
+  virtual Status GetDefLevels(const int16_t** data, int64_t* length) = 0;
+  virtual Status GetRepLevels(const int16_t** data, int64_t* length) = 0;
+  virtual const std::shared_ptr<Field> field() = 0;
 
-  int column_index() const { return column_index_; }
+  virtual const ColumnDescriptor* descr() const = 0;
 
- protected:
-  int column_index_;
-  ParquetFileReader* reader_;
-  const SchemaDescriptor* schema_;
-  std::deque<int> row_groups_;
+  virtual ReaderType type() const = 0;
 };
 
-using FileColumnIteratorFactory =
-    std::function<FileColumnIterator*(int, ParquetFileReader*)>;
-
-class RowGroupRecordBatchReader : public ::arrow::RecordBatchReader {
- public:
-  explicit RowGroupRecordBatchReader(const std::vector<int>& row_group_indices,
-                                     const std::vector<int>& column_indices,
-                                     std::shared_ptr<::arrow::Schema> schema,
-                                     FileReader* reader, int64_t batch_size)
-      : column_readers_(),
-        row_group_indices_(row_group_indices),
-        column_indices_(column_indices),
-        schema_(schema),
-        file_reader_(reader),
-        batch_size_(batch_size) {}
-
-  ~RowGroupRecordBatchReader() override {}
-
-  std::shared_ptr<::arrow::Schema> schema() const override { return schema_; }
-
-  Status ReadNext(std::shared_ptr<::arrow::RecordBatch>* out) override {
-    if (column_readers_.empty()) {
-      // Initialize the column readers
-      column_readers_.reserve(column_indices_.size());
-
-      for (size_t i = 0; i < column_indices_.size(); ++i) {
-        ColumnReaderPtr tmp;
-        RETURN_NOT_OK(file_reader_->GetColumn(column_indices_[i], &tmp));
-        column_readers_.emplace_back(std::move(tmp));
-      }
-    }
-
-    // TODO (hatemhelal): Consider refactoring this to share logic with ReadTable as this
-    // does not currently honor the use_threads option.
-    std::vector<std::shared_ptr<ChunkedArray>> columns(column_indices_.size());
+ArrowReaderProperties default_arrow_reader_properties() {
+  static ArrowReaderProperties default_reader_props;
+  return default_reader_props;
+}
 
-    for (size_t i = 0; i < column_indices_.size(); ++i) {
-      RETURN_NOT_OK(column_readers_[i]->NextBatch(batch_size_, &columns[i]));
-    }
+// ----------------------------------------------------------------------
+// FileReaderImpl forward declaration
 
-    // Create an intermediate table and use TableBatchReader as an adaptor to a
-    // RecordBatch
-    std::shared_ptr<Table> table = Table::Make(schema_, columns);
-    RETURN_NOT_OK(table->Validate());
-    ::arrow::TableBatchReader table_batch_reader(*table);
-    return table_batch_reader.ReadNext(out);
-  }
+namespace {
 
- private:
-  using ColumnReaderPtr = std::unique_ptr<ColumnReader>;
-  std::vector<ColumnReaderPtr> column_readers_;
-  std::vector<int> row_group_indices_;
-  std::vector<int> column_indices_;
-  std::shared_ptr<::arrow::Schema> schema_;
-  FileReader* file_reader_;
-  int64_t batch_size_;
-};
+std::vector<int> Arange(int length) {
+  std::vector<int> result(length);
+  std::iota(result.begin(), result.end(), 0);
+  return result;
+}
 
-// ----------------------------------------------------------------------
-// FileReaderImpl forward declaration
+}  // namespace
 
 class FileReaderImpl : public FileReader {
  public:
@@ -193,8 +116,16 @@ class FileReaderImpl : public FileReader {
       : pool_(pool), reader_(std::move(reader)), reader_properties_(properties) {}
 
   Status Init() {
-    // TODO(wesm): Smarter schema/column-reader initialization for nested data
-    return Status::OK();
+    return BuildSchemaManifest(reader_->metadata()->schema(), reader_properties_,
+                               &manifest_);
+  }
+
+  std::vector<int> AllRowGroups() {
+    return Arange(reader_->metadata()->num_row_groups());
+  }
+
+  std::vector<int> AllColumnIndices() {
+    return Arange(reader_->metadata()->num_columns());
   }
 
   FileColumnIteratorFactory SomeRowGroupsFactory(std::vector<int> row_groups) {
@@ -203,28 +134,36 @@ class FileReaderImpl : public FileReader {
     };
   }
 
-  std::vector<int> AllRowGroups() {
-    std::vector<int> row_groups(reader_->metadata()->num_row_groups());
-    std::iota(row_groups.begin(), row_groups.end(), 0);
-    return row_groups;
+  FileColumnIteratorFactory AllRowGroupsFactory() {
+    return SomeRowGroupsFactory(AllRowGroups());
   }
 
-  std::vector<int> AllColumnIndices() {
-    std::vector<int> indices(reader_->metadata()->num_columns());
-    std::iota(indices.begin(), indices.end(), 0);
-    return indices;
+  Status BoundsCheckColumn(int column) {
+    if (column < 0 || column >= this->num_columns()) {
+      return Status::Invalid("Column index out of bounds (got ", column,
+                             ", should be "
+                             "between 0 and ",
+                             this->num_columns() - 1, ")");
+    }
+    return Status::OK();
   }
 
-  FileColumnIteratorFactory AllRowGroupsFactory() {
-    return SomeRowGroupsFactory(AllRowGroups());
+  Status BoundsCheckRowGroup(int row_group) {
+    // row group indices check
+    if (row_group < 0 || row_group >= num_row_groups()) {
+      return Status::Invalid("Some index in row_group_indices is ", row_group,
+                             ", which is either < 0 or >= num_row_groups(",
+                             num_row_groups(), ")");
+    }
+    return Status::OK();
   }
 
   int64_t GetTotalRecords(const std::vector<int>& row_groups, int column_chunk = 0) {
     // Can throw exception
     int64_t records = 0;
-    for (int j = 0; j < static_cast<int>(row_groups.size()); j++) {
+    for (auto row_group : row_groups) {
       records += reader_->metadata()
-                     ->RowGroup(row_groups[j])
+                     ->RowGroup(row_group)
                      ->ColumnChunk(column_chunk)
                      ->num_values();
     }
@@ -233,18 +172,43 @@ class FileReaderImpl : public FileReader {
 
   std::shared_ptr<RowGroupReader> RowGroup(int row_group_index) override;
 
-  Status GetReaderForNode(int index, const Node* node, const std::vector<int>& indices,
-                          int16_t def_level, FileColumnIteratorFactory iterator_factory,
-                          std::unique_ptr<ColumnReaderImpl>* out);
-
   Status ReadTable(const std::vector<int>& indices,
                    std::shared_ptr<Table>* out) override {
     return ReadRowGroups(AllRowGroups(), indices, out);
   }
 
+  Status GetFieldReader(int i, const std::vector<int>& indices,
+                        const std::vector<int>& row_groups,
+                        std::unique_ptr<ColumnReaderImpl>* out) {
+    ReaderContext ctx;
+    ctx.reader = reader_.get();
+    ctx.pool = pool_;
+    ctx.iterator_factory = SomeRowGroupsFactory(row_groups);
+    ctx.filter_leaves = true;
+    ctx.included_leaves.insert(indices.begin(), indices.end());
+    return manifest_.schema_fields[i].GetReader(ctx, out);
+  }
+
   Status GetColumn(int i, FileColumnIteratorFactory iterator_factory,
                    std::unique_ptr<ColumnReader>* out);
 
+  Status ReadSchemaField(int i, const std::vector<int>& indices,
+                         const std::vector<int>& row_groups,
+                         std::shared_ptr<Field>* out_field,
+                         std::shared_ptr<ChunkedArray>* out) {
+    BEGIN_PARQUET_CATCH_EXCEPTIONS
+    std::unique_ptr<ColumnReaderImpl> reader;
+    RETURN_NOT_OK(GetFieldReader(i, indices, row_groups, &reader));
+
+    *out_field = reader->field();
+
+    // TODO(wesm): This calculation doesn't make much sense when we have repeated
+    // schema nodes
+    int64_t records_to_read = GetTotalRecords(row_groups, i);
+    return reader->NextBatch(records_to_read, out);
+    END_PARQUET_CATCH_EXCEPTIONS
+  }
+
   Status GetColumn(int i, std::unique_ptr<ColumnReader>* out) override {
     return GetColumn(i, AllRowGroupsFactory(), out);
   }
@@ -254,15 +218,12 @@ class FileReaderImpl : public FileReader {
                              reader_->metadata()->key_value_metadata(), out);
   }
 
-  Status GetSchema(const std::vector<int>& indices,
-                   std::shared_ptr<::arrow::Schema>* out) override {
-    return FromParquetSchema(reader_->metadata()->schema(), indices, reader_properties_,
-                             reader_->metadata()->key_value_metadata(), out);
-  }
-
   Status ReadSchemaField(int i, const std::vector<int>& indices,
                          const std::vector<int>& row_groups,
-                         std::shared_ptr<ChunkedArray>* out);
+                         std::shared_ptr<ChunkedArray>* out) {
+    std::shared_ptr<Field> unused;
+    return ReadSchemaField(i, indices, row_groups, &unused, out);
+  }
 
   Status ReadSchemaField(int i, const std::vector<int>& indices,
                          std::shared_ptr<ChunkedArray>* out) {
@@ -288,11 +249,7 @@ class FileReaderImpl : public FileReader {
   }
 
   Status ReadTable(std::shared_ptr<Table>* table) override {
-    std::vector<int> indices(reader_->metadata()->num_columns());
-    for (size_t i = 0; i < indices.size(); ++i) {
-      indices[i] = static_cast<int>(i);
-    }
-    return ReadTable(indices, table);
+    return ReadTable(AllColumnIndices(), table);
   }
 
   Status ReadRowGroups(const std::vector<int>& row_groups,
@@ -313,54 +270,13 @@ class FileReaderImpl : public FileReader {
     return ReadRowGroup(i, AllColumnIndices(), table);
   }
 
-  std::vector<int> GetDictionaryIndices(const std::vector<int>& indices) {
-    // Select the column indices that were read as DictionaryArray
-    std::vector<int> dict_indices(indices);
-    auto remove_func = [this](int i) { return !reader_properties_.read_dictionary(i); };
-    auto it = std::remove_if(dict_indices.begin(), dict_indices.end(), remove_func);
-    dict_indices.erase(it, dict_indices.end());
-    return dict_indices;
-  }
-
-  std::shared_ptr<::arrow::Schema> FixSchema(
-      const ::arrow::Schema& old_schema, const std::vector<int>& dict_indices,
-      const std::vector<std::shared_ptr<::arrow::ChunkedArray>>& columns) {
-    // Fix the schema with the actual DictionaryType that was read
-    auto fields = old_schema.fields();
-
-    for (int idx : dict_indices) {
-      fields[idx] = old_schema.field(idx)->WithType(columns[idx]->type());
-    }
-    return std::make_shared<::arrow::Schema>(fields, old_schema.metadata());
-  }
-
   Status GetRecordBatchReader(const std::vector<int>& row_group_indices,
-                              std::shared_ptr<RecordBatchReader>* out) override {
-    return GetRecordBatchReader(row_group_indices, AllColumnIndices(), out);
-  }
-
-  Status BoundsCheckRowGroup(int row_group) {
-    // row group indices check
-    if (row_group < 0 || row_group >= num_row_groups()) {
-      return Status::Invalid("Some index in row_group_indices is ", row_group,
-                             ", which is either < 0 or >= num_row_groups(",
-                             num_row_groups(), ")");
-    }
-    return Status::OK();
-  }
+                              const std::vector<int>& column_indices,
+                              std::shared_ptr<RecordBatchReader>* out) override;
 
   Status GetRecordBatchReader(const std::vector<int>& row_group_indices,
-                              const std::vector<int>& column_indices,
                               std::shared_ptr<RecordBatchReader>* out) override {
-    // column indices check
-    std::shared_ptr<::arrow::Schema> schema;
-    RETURN_NOT_OK(GetSchema(column_indices, &schema));
-    for (auto row_group_index : row_group_indices) {
-      RETURN_NOT_OK(BoundsCheckRowGroup(row_group_index));
-    }
-    *out = std::make_shared<RowGroupRecordBatchReader>(row_group_indices, column_indices,
-                                                       schema, this, batch_size());
-    return Status::OK();
+    return GetRecordBatchReader(row_group_indices, AllColumnIndices(), out);
   }
 
   int num_columns() const { return reader_->metadata()->num_columns(); }
@@ -373,8 +289,6 @@ class FileReaderImpl : public FileReader {
     reader_properties_.set_use_threads(use_threads);
   }
 
-  int64_t batch_size() const { return reader_properties_.batch_size(); }
-
   Status ScanContents(std::vector<int> columns, const int32_t column_batch_size,
                       int64_t* num_rows) override {
     BEGIN_PARQUET_CATCH_EXCEPTIONS
@@ -383,10 +297,68 @@ class FileReaderImpl : public FileReader {
     END_PARQUET_CATCH_EXCEPTIONS
   }
 
- private:
   MemoryPool* pool_;
   std::unique_ptr<ParquetFileReader> reader_;
   ArrowReaderProperties reader_properties_;
+
+  SchemaManifest manifest_;
+};
+
+class RowGroupRecordBatchReader : public ::arrow::RecordBatchReader {
+ public:
+  RowGroupRecordBatchReader(std::vector<std::unique_ptr<ColumnReaderImpl>> field_readers,
+                            std::shared_ptr<::arrow::Schema> schema, int64_t batch_size)
+      : field_readers_(std::move(field_readers)),
+        schema_(schema),
+        batch_size_(batch_size) {}
+
+  ~RowGroupRecordBatchReader() override {}
+
+  std::shared_ptr<::arrow::Schema> schema() const override { return schema_; }
+
+  static Status Make(const std::vector<int>& row_groups,
+                     const std::vector<int>& column_indices, FileReaderImpl* reader,
+                     int64_t batch_size,
+                     std::shared_ptr<::arrow::RecordBatchReader>* out) {
+    std::vector<int> field_indices;
+    if (!reader->manifest_.GetFieldIndices(column_indices, &field_indices)) {
+      return Status::Invalid("Invalid column index");
+    }
+    std::vector<std::unique_ptr<ColumnReaderImpl>> field_readers(field_indices.size());
+    std::vector<std::shared_ptr<Field>> fields;
+    for (size_t i = 0; i < field_indices.size(); ++i) {
+      RETURN_NOT_OK(reader->GetFieldReader(field_indices[i], column_indices, row_groups,
+                                           &field_readers[i]));
+      fields.push_back(field_readers[i]->field());
+    }
+    *out = std::make_shared<RowGroupRecordBatchReader>(
+        std::move(field_readers), ::arrow::schema(fields), batch_size);
+    return Status::OK();
+  }
+
+  Status ReadNext(std::shared_ptr<::arrow::RecordBatch>* out) override {
+    // TODO (hatemhelal): Consider refactoring this to share logic with ReadTable as this
+    // does not currently honor the use_threads option.
+    std::vector<std::shared_ptr<ChunkedArray>> columns(field_readers_.size());
+    for (size_t i = 0; i < field_readers_.size(); ++i) {
+      RETURN_NOT_OK(field_readers_[i]->NextBatch(batch_size_, &columns[i]));
+      if (columns[i]->num_chunks() > 1) {
+        return Status::NotImplemented("This class cannot yet iterate chunked arrays");
+      }
+    }
+
+    // Create an intermediate table and use TableBatchReader as an adaptor to a
+    // RecordBatch
+    std::shared_ptr<Table> table = Table::Make(schema_, columns);
+    RETURN_NOT_OK(table->Validate());
+    ::arrow::TableBatchReader table_batch_reader(*table);
+    return table_batch_reader.ReadNext(out);
+  }
+
+ private:
+  std::vector<std::unique_ptr<ColumnReaderImpl>> field_readers_;
+  std::shared_ptr<::arrow::Schema> schema_;
+  int64_t batch_size_;
 };
 
 class ColumnChunkReaderImpl : public ColumnChunkReader {
@@ -428,34 +400,26 @@ class RowGroupReaderImpl : public RowGroupReader {
   int row_group_index_;
 };
 
-class ColumnReaderImpl : public ColumnReader {
+// Leaf reader is for primitive arrays and primitive children of nested arrays
+class LeafReader : public ColumnReaderImpl {
  public:
-  virtual Status GetDefLevels(const int16_t** data, size_t* length) = 0;
-  virtual Status GetRepLevels(const int16_t** data, size_t* length) = 0;
-  virtual const std::shared_ptr<Field> field() = 0;
-};
-
-// Reader implementation for primitive arrays
-class PARQUET_NO_EXPORT PrimitiveImpl : public ColumnReaderImpl {
- public:
-  PrimitiveImpl(MemoryPool* pool, std::unique_ptr<FileColumnIterator> input,
-                bool read_dictionary)
-      : pool_(pool), input_(std::move(input)), descr_(input_->descr()) {
-    record_reader_ = RecordReader::Make(descr_, pool_, read_dictionary);
-    Status s = NodeToField(*input_->descr()->schema_node(), &field_);
-    DCHECK_OK(s);
+  LeafReader(const ReaderContext& ctx, const std::shared_ptr<Field>& field,
+             std::unique_ptr<FileColumnIterator> input)
+      : ctx_(ctx), field_(field), input_(std::move(input)), descr_(input_->descr()) {
+    record_reader_ = RecordReader::Make(descr_, ctx_.pool,
+                                        field->type()->id() == ::arrow::Type::DICTIONARY);
     NextRowGroup();
   }
 
-  Status GetDefLevels(const int16_t** data, size_t* length) override {
+  Status GetDefLevels(const int16_t** data, int64_t* length) override {
     *data = record_reader_->def_levels();
-    *length = record_reader_->levels_written();
+    *length = record_reader_->levels_position();
     return Status::OK();
   }
 
-  Status GetRepLevels(const int16_t** data, size_t* length) override {
+  Status GetRepLevels(const int16_t** data, int64_t* length) override {
     *data = record_reader_->rep_levels();
-    *length = record_reader_->levels_written();
+    *length = record_reader_->levels_position();
     return Status::OK();
   }
 
@@ -477,17 +441,15 @@ class PARQUET_NO_EXPORT PrimitiveImpl : public ColumnReaderImpl {
       }
     }
     RETURN_NOT_OK(
-        TransferColumnData(record_reader_.get(), field_->type(), descr_, pool_, out));
-
-    // Nest nested types, if not nested returns unmodifed
-    RETURN_NOT_OK(WrapIntoListArray(out));
+        TransferColumnData(record_reader_.get(), field_->type(), descr_, ctx_.pool, out));
     return Status::OK();
     END_PARQUET_CATCH_EXCEPTIONS
   }
 
-  Status WrapIntoListArray(std::shared_ptr<ChunkedArray>* inout_array);
-
   const std::shared_ptr<Field> field() override { return field_; }
+  const ColumnDescriptor* descr() const override { return descr_; }
+
+  ReaderType type() const override { return PRIMITIVE; }
 
  private:
   void NextRowGroup() {
@@ -495,188 +457,111 @@ class PARQUET_NO_EXPORT PrimitiveImpl : public ColumnReaderImpl {
     record_reader_->SetPageReader(std::move(page_reader));
   }
 
-  MemoryPool* pool_;
+  ReaderContext ctx_;
+  std::shared_ptr<Field> field_;
   std::unique_ptr<FileColumnIterator> input_;
   const ColumnDescriptor* descr_;
-
   std::shared_ptr<RecordReader> record_reader_;
-  std::shared_ptr<Field> field_;
 };
 
-Status PrimitiveImpl::WrapIntoListArray(std::shared_ptr<ChunkedArray>* inout_array) {
-  if (descr_->max_repetition_level() == 0) {
-    // Flat, no action
-    return Status::OK();
-  }
-
-  std::shared_ptr<Array> flat_array;
+class NestedListReader : public ColumnReaderImpl {
+ public:
+  NestedListReader(const ReaderContext& ctx, std::shared_ptr<Field> field,
+                   int16_t max_definition_level, int16_t max_repetition_level,
+                   std::unique_ptr<ColumnReaderImpl> item_reader)
+      : ctx_(ctx),
+        field_(field),
+        max_definition_level_(max_definition_level),
+        max_repetition_level_(max_repetition_level),
+        item_reader_(std::move(item_reader)) {}
 
-  // ARROW-3762(wesm): If inout_array is a chunked array, we reject as this is
-  // not yet implemented
-  if ((*inout_array)->num_chunks() > 1) {
-    return Status::NotImplemented(
-        "Nested data conversions not implemented for "
-        "chunked array outputs");
+  Status GetDefLevels(const int16_t** data, int64_t* length) override {
+    return item_reader_->GetDefLevels(data, length);
   }
-  flat_array = (*inout_array)->chunk(0);
-
-  const int16_t* def_levels = record_reader_->def_levels();
-  const int16_t* rep_levels = record_reader_->rep_levels();
-  const int64_t total_levels_read = record_reader_->levels_position();
 
-  std::shared_ptr<::arrow::Schema> arrow_schema;
-  RETURN_NOT_OK(FromParquetSchema(
-      input_->schema(), {input_->column_index()}, default_arrow_reader_properties(),
-      input_->metadata()->key_value_metadata(), &arrow_schema));
-  std::shared_ptr<Field> current_field = arrow_schema->field(0);
-
-  if (current_field->type()->num_children() > 0 &&
-      flat_array->type_id() == ::arrow::Type::DICTIONARY) {
-    // XXX(wesm): Handling of nested types and dictionary encoding needs to be
-    // significantly refactored
-    return Status::Invalid("Cannot have nested types containing dictionary arrays yet");
+  Status GetRepLevels(const int16_t** data, int64_t* length) override {
+    return item_reader_->GetRepLevels(data, length);
   }
 
-  // Walk downwards to extract nullability
-  std::vector<bool> nullable;
-  std::vector<std::shared_ptr<::arrow::Int32Builder>> offset_builders;
-  std::vector<std::shared_ptr<::arrow::BooleanBuilder>> valid_bits_builders;
-  nullable.push_back(current_field->nullable());
-  while (current_field->type()->num_children() > 0) {
-    if (current_field->type()->num_children() > 1) {
-      return Status::NotImplemented("Fields with more than one child are not supported.");
-    } else {
-      if (current_field->type()->id() != ::arrow::Type::LIST) {
-        return Status::NotImplemented("Currently only nesting with Lists is supported.");
-      }
-      current_field = current_field->type()->child(0);
-    }
-    offset_builders.emplace_back(
-        std::make_shared<::arrow::Int32Builder>(::arrow::int32(), pool_));
-    valid_bits_builders.emplace_back(
-        std::make_shared<::arrow::BooleanBuilder>(::arrow::boolean(), pool_));
-    nullable.push_back(current_field->nullable());
-  }
-
-  int64_t list_depth = offset_builders.size();
-  // This describes the minimal definition that describes a level that
-  // reflects a value in the primitive values array.
-  int16_t values_def_level = descr_->max_definition_level();
-  if (nullable[nullable.size() - 1]) {
-    values_def_level--;
-  }
-
-  // The definition levels that are needed so that a list is declared
-  // as empty and not null.
-  std::vector<int16_t> empty_def_level(list_depth);
-  int def_level = 0;
-  for (int i = 0; i < list_depth; i++) {
-    if (nullable[i]) {
-      def_level++;
-    }
-    empty_def_level[i] = static_cast<int16_t>(def_level);
-    def_level++;
-  }
-
-  int32_t values_offset = 0;
-  std::vector<int64_t> null_counts(list_depth, 0);
-  for (int64_t i = 0; i < total_levels_read; i++) {
-    int16_t rep_level = rep_levels[i];
-    if (rep_level < descr_->max_repetition_level()) {
-      for (int64_t j = rep_level; j < list_depth; j++) {
-        if (j == (list_depth - 1)) {
-          RETURN_NOT_OK(offset_builders[j]->Append(values_offset));
-        } else {
-          RETURN_NOT_OK(offset_builders[j]->Append(
-              static_cast<int32_t>(offset_builders[j + 1]->length())));
-        }
-
-        if (((empty_def_level[j] - 1) == def_levels[i]) && (nullable[j])) {
-          RETURN_NOT_OK(valid_bits_builders[j]->Append(false));
-          null_counts[j]++;
-          break;
-        } else {
-          RETURN_NOT_OK(valid_bits_builders[j]->Append(true));
-          if (empty_def_level[j] == def_levels[i]) {
-            break;
-          }
-        }
-      }
-    }
-    if (def_levels[i] >= values_def_level) {
-      values_offset++;
+  Status NextBatch(int64_t records_to_read, std::shared_ptr<ChunkedArray>* out) override {
+    if (item_reader_->type() == ColumnReaderImpl::STRUCT) {
+      return Status::Invalid("Mix of struct and list types not yet supported");
     }
-  }
-  // Add the final offset to all lists
-  for (int64_t j = 0; j < list_depth; j++) {
-    if (j == (list_depth - 1)) {
-      RETURN_NOT_OK(offset_builders[j]->Append(values_offset));
-    } else {
-      RETURN_NOT_OK(offset_builders[j]->Append(
-          static_cast<int32_t>(offset_builders[j + 1]->length())));
+
+    RETURN_NOT_OK(item_reader_->NextBatch(records_to_read, out));
+
+    // ARROW-3762(wesm): If item reader yields a chunked array, we reject as
+    // this is not yet implemented
+    if ((*out)->num_chunks() > 1) {
+      return Status::NotImplemented(
+          "Nested data conversions not implemented for chunked array outputs");
     }
-  }
 
-  std::vector<std::shared_ptr<Buffer>> offsets;
-  std::vector<std::shared_ptr<Buffer>> valid_bits;
-  std::vector<int64_t> list_lengths;
-  for (int64_t j = 0; j < list_depth; j++) {
-    list_lengths.push_back(offset_builders[j]->length() - 1);
-    std::shared_ptr<Array> array;
-    RETURN_NOT_OK(offset_builders[j]->Finish(&array));
-    offsets.emplace_back(std::static_pointer_cast<Int32Array>(array)->values());
-    RETURN_NOT_OK(valid_bits_builders[j]->Finish(&array));
-    valid_bits.emplace_back(std::static_pointer_cast<BooleanArray>(array)->values());
+    const int16_t* def_levels;
+    const int16_t* rep_levels;
+    int64_t num_levels;
+    RETURN_NOT_OK(item_reader_->GetDefLevels(&def_levels, &num_levels));
+    RETURN_NOT_OK(item_reader_->GetRepLevels(&rep_levels, &num_levels));
+    std::shared_ptr<Array> result;
+    RETURN_NOT_OK(ReconstructNestedList((*out)->chunk(0), field_, max_definition_level_,
+                                        max_repetition_level_, def_levels, rep_levels,
+                                        num_levels, ctx_.pool, &result));
+    *out = std::make_shared<ChunkedArray>(result);
+    return Status::OK();
   }
 
-  std::shared_ptr<Array> output = flat_array;
-  for (int64_t j = list_depth - 1; j >= 0; j--) {
-    auto list_type =
-        ::arrow::list(::arrow::field("item", output->type(), nullable[j + 1]));
-    output = std::make_shared<::arrow::ListArray>(list_type, list_lengths[j], offsets[j],
-                                                  output, valid_bits[j], null_counts[j]);
-  }
-  *inout_array = std::make_shared<ChunkedArray>(output);
-  return Status::OK();
-}
+  const std::shared_ptr<Field> field() override { return field_; }
+
+  const ColumnDescriptor* descr() const override { return nullptr; }
 
-// Reader implementation for struct array
+  ReaderType type() const override { return LIST; }
+
+ private:
+  ReaderContext ctx_;
+  std::shared_ptr<Field> field_;
+  int16_t max_definition_level_;
+  int16_t max_repetition_level_;
+  std::unique_ptr<ColumnReaderImpl> item_reader_;
+};
 
-class PARQUET_NO_EXPORT StructImpl : public ColumnReaderImpl {
+class PARQUET_NO_EXPORT StructReader : public ColumnReaderImpl {
  public:
-  explicit StructImpl(const std::vector<std::shared_ptr<ColumnReaderImpl>>& children,
-                      int16_t struct_def_level, MemoryPool* pool, const Node* node)
-      : children_(children), struct_def_level_(struct_def_level), pool_(pool) {
-    InitField(node, children);
-  }
+  explicit StructReader(const ReaderContext& ctx, const SchemaField& schema_field,
+                        std::shared_ptr<Field> filtered_field,
+                        std::vector<std::unique_ptr<ColumnReaderImpl>>&& children)
+      : ctx_(ctx),
+        schema_field_(schema_field),
+        filtered_field_(filtered_field),
+        struct_def_level_(schema_field.max_definition_level),
+        children_(std::move(children)) {}
 
   Status NextBatch(int64_t records_to_read, std::shared_ptr<ChunkedArray>* out) override;
-  Status GetDefLevels(const int16_t** data, size_t* length) override;
-  Status GetRepLevels(const int16_t** data, size_t* length) override;
-  const std::shared_ptr<Field> field() override { return field_; }
+  Status GetDefLevels(const int16_t** data, int64_t* length) override;
+  Status GetRepLevels(const int16_t** data, int64_t* length) override;
+  const std::shared_ptr<Field> field() override { return filtered_field_; }
+  const ColumnDescriptor* descr() const override { return nullptr; }
+  ReaderType type() const override { return STRUCT; }
 
  private:
-  std::vector<std::shared_ptr<ColumnReaderImpl>> children_;
+  ReaderContext ctx_;
+  SchemaField schema_field_;
+  std::shared_ptr<Field> filtered_field_;
   int16_t struct_def_level_;
-  MemoryPool* pool_;
-  std::shared_ptr<Field> field_;
+  std::vector<std::unique_ptr<ColumnReaderImpl>> children_;
   std::shared_ptr<ResizableBuffer> def_levels_buffer_;
-
   Status DefLevelsToNullArray(std::shared_ptr<Buffer>* null_bitmap, int64_t* null_count);
-  void InitField(const Node* node,
-                 const std::vector<std::shared_ptr<ColumnReaderImpl>>& children);
 };
 
-Status StructImpl::DefLevelsToNullArray(std::shared_ptr<Buffer>* null_bitmap_out,
-                                        int64_t* null_count_out) {
+Status StructReader::DefLevelsToNullArray(std::shared_ptr<Buffer>* null_bitmap_out,
+                                          int64_t* null_count_out) {
   std::shared_ptr<Buffer> null_bitmap;
   auto null_count = 0;
   const int16_t* def_levels_data;
-  size_t def_levels_length;
+  int64_t def_levels_length;
   RETURN_NOT_OK(GetDefLevels(&def_levels_data, &def_levels_length));
-  RETURN_NOT_OK(AllocateEmptyBitmap(pool_, def_levels_length, &null_bitmap));
+  RETURN_NOT_OK(AllocateEmptyBitmap(ctx_.pool, def_levels_length, &null_bitmap));
   uint8_t* null_bitmap_ptr = null_bitmap->mutable_data();
-  for (size_t i = 0; i < def_levels_length; i++) {
+  for (int64_t i = 0; i < def_levels_length; i++) {
     if (def_levels_data[i] < struct_def_level_) {
       // Mark null
       null_count += 1;
@@ -693,7 +578,7 @@ Status StructImpl::DefLevelsToNullArray(std::shared_ptr<Buffer>* null_bitmap_out
 
 // TODO(itaiin): Consider caching the results of this calculation -
 //   note that this is only used once for each read for now
-Status StructImpl::GetDefLevels(const int16_t** data, size_t* length) {
+Status StructReader::GetDefLevels(const int16_t** data, int64_t* length) {
   *data = nullptr;
   if (children_.size() == 0) {
     // Empty struct
@@ -703,10 +588,10 @@ Status StructImpl::GetDefLevels(const int16_t** data, size_t* length) {
 
   // We have at least one child
   const int16_t* child_def_levels;
-  size_t child_length;
+  int64_t child_length;
   RETURN_NOT_OK(children_[0]->GetDefLevels(&child_def_levels, &child_length));
   auto size = child_length * sizeof(int16_t);
-  RETURN_NOT_OK(AllocateResizableBuffer(pool_, size, &def_levels_buffer_));
+  RETURN_NOT_OK(AllocateResizableBuffer(ctx_.pool, size, &def_levels_buffer_));
   // Initialize with the minimal def level
   std::memset(def_levels_buffer_->mutable_data(), -1, size);
   auto result_levels = reinterpret_cast<int16_t*>(def_levels_buffer_->mutable_data());
@@ -717,10 +602,10 @@ Status StructImpl::GetDefLevels(const int16_t** data, size_t* length) {
   // the nesting level, and the def level equals max(children def levels)
   // All other possibilities are malformed definition data.
   for (auto& child : children_) {
-    size_t current_child_length;
+    int64_t current_child_length;
     RETURN_NOT_OK(child->GetDefLevels(&child_def_levels, &current_child_length));
     DCHECK_EQ(child_length, current_child_length);
-    for (size_t i = 0; i < child_length; i++) {
+    for (int64_t i = 0; i < child_length; i++) {
       // Check that value is either uninitialized, or current
       // and previous children def levels agree on the struct level
       DCHECK((result_levels[i] == -1) || ((result_levels[i] >= struct_def_level_) ==
@@ -730,34 +615,26 @@ Status StructImpl::GetDefLevels(const int16_t** data, size_t* length) {
     }
   }
   *data = reinterpret_cast<const int16_t*>(def_levels_buffer_->data());
-  *length = child_length;
+  *length = static_cast<int64_t>(child_length);
   return Status::OK();
 }
 
-void StructImpl::InitField(
-    const Node* node, const std::vector<std::shared_ptr<ColumnReaderImpl>>& children) {
-  // Make a shallow node to field conversion from the children fields
-  std::vector<std::shared_ptr<::arrow::Field>> fields(children.size());
-  for (size_t i = 0; i < children.size(); i++) {
-    fields[i] = children[i]->field();
-  }
-
-  auto type = ::arrow::struct_(fields);
-  field_ = ::arrow::field(node->name(), type, node->is_optional());
-}
-
-Status StructImpl::GetRepLevels(const int16_t** data, size_t* length) {
+Status StructReader::GetRepLevels(const int16_t** data, int64_t* length) {
   return Status::NotImplemented("GetRepLevels is not implemented for struct");
 }
 
-Status StructImpl::NextBatch(int64_t records_to_read,
-                             std::shared_ptr<ChunkedArray>* out) {
+Status StructReader::NextBatch(int64_t records_to_read,
+                               std::shared_ptr<ChunkedArray>* out) {
   std::vector<std::shared_ptr<Array>> children_arrays;
   std::shared_ptr<Buffer> null_bitmap;
   int64_t null_count;
 
   // Gather children arrays and def levels
   for (auto& child : children_) {
+    if (child->type() == ColumnReaderImpl::LIST) {
+      return Status::Invalid("Mix of struct and list types not yet supported");
+    }
+
     std::shared_ptr<ChunkedArray> field;
     RETURN_NOT_OK(child->NextBatch(records_to_read, &field));
 
@@ -787,57 +664,85 @@ Status StructImpl::NextBatch(int64_t records_to_read,
 // ----------------------------------------------------------------------
 // File reader implementation
 
-Status FileReaderImpl::GetReaderForNode(int index, const Node* node,
-                                        const std::vector<int>& indices,
-                                        int16_t def_level,
-                                        FileColumnIteratorFactory iterator_factory,
-                                        std::unique_ptr<ColumnReaderImpl>* out) {
-  *out = nullptr;
-
-  if (schema::IsSimpleStruct(node)) {
-    const schema::GroupNode* group = static_cast<const schema::GroupNode*>(node);
-    std::vector<std::shared_ptr<ColumnReaderImpl>> children;
-    for (int i = 0; i < group->field_count(); i++) {
-      std::unique_ptr<ColumnReaderImpl> child_reader;
-      // TODO(itaiin): Remove the -1 index hack when all types of nested reads
-      // are supported. This currently just signals the lower level reader resolution
-      // to abort
-      RETURN_NOT_OK(GetReaderForNode(index, group->field(i).get(), indices,
-                                     static_cast<int16_t>(def_level + 1),
-                                     iterator_factory, &child_reader));
-      if (child_reader != nullptr) {
-        children.push_back(std::move(child_reader));
-      }
+Status SchemaField::GetReader(const ReaderContext& ctx,
+                              std::unique_ptr<ColumnReaderImpl>* out) const {
+  auto type_id = this->field->type()->id();
+  if (this->children.size() == 0) {
+    std::unique_ptr<FileColumnIterator> input(
+        ctx.iterator_factory(this->column_index, ctx.reader));
+    out->reset(new LeafReader(ctx, this->field, std::move(input)));
+  } else if (type_id == ::arrow::Type::LIST) {
+    // We can only read lists-of-lists or structs at the moment
+    auto list_field = this->field;
+    auto child = &this->children[0];
+    while (child->field->type()->id() == ::arrow::Type::LIST) {
+      child = &child->children[0];
     }
-
-    if (children.size() > 0) {
-      *out = std::unique_ptr<ColumnReaderImpl>(
-          new StructImpl(children, def_level, pool_, node));
+    if (child->field->type()->id() == ::arrow::Type::STRUCT) {
+      return Status::NotImplemented("Lists of structs not yet supported");
     }
-  } else {
-    // This should be a flat field case - translate the field index to
-    // the correct column index by walking down to the leaf node
-    const Node* walker = node;
-    while (!walker->is_primitive()) {
-      DCHECK(walker->is_group());
-      auto group = static_cast<const schema::GroupNode*>(walker);
-      if (group->field_count() != 1) {
-        return Status::NotImplemented("lists with structs are not supported.");
+    if (!ctx.IncludesLeaf(child->column_index)) {
+      *out = nullptr;
+      return Status::OK();
+    }
+    std::unique_ptr<ColumnReaderImpl> child_reader;
+    RETURN_NOT_OK(child->GetReader(ctx, &child_reader));
+    // Use the max definition/repetition level of the leaf here
+    out->reset(new NestedListReader(ctx, list_field, child->max_definition_level,
+                                    child->max_repetition_level,
+                                    std::move(child_reader)));
+  } else if (type_id == ::arrow::Type::STRUCT) {
+    std::vector<std::shared_ptr<Field>> child_fields;
+    std::vector<std::unique_ptr<ColumnReaderImpl>> child_readers;
+    for (const auto& child : this->children) {
+      if (child.is_leaf() && !ctx.IncludesLeaf(child.column_index)) {
+        // Excluded leaf
+        continue;
+      }
+      std::unique_ptr<ColumnReaderImpl> child_reader;
+      RETURN_NOT_OK(child.GetReader(ctx, &child_reader));
+      if (!child_reader) {
+        // If all children were pruned, then we do not try to read this field
+        continue;
       }
-      walker = group->field(0).get();
+      child_fields.push_back(child.field);
+      child_readers.emplace_back(std::move(child_reader));
     }
-    auto column_index = reader_->metadata()->schema()->ColumnIndex(*walker);
-
-    // If the index of the column is found then a reader for the column is needed.
-    // Otherwise *out keeps the nullptr value.
-    if (std::find(indices.begin(), indices.end(), column_index) != indices.end()) {
-      std::unique_ptr<ColumnReader> reader;
-      RETURN_NOT_OK(GetColumn(column_index, iterator_factory, &reader));
-      *out = std::unique_ptr<ColumnReaderImpl>(
-          static_cast<ColumnReaderImpl*>(reader.release()));
+    if (child_fields.size() == 0) {
+      *out = nullptr;
+      return Status::OK();
     }
+    auto filtered_field = ::arrow::field(
+        this->field->name(), ::arrow::struct_(child_fields), this->field->nullable());
+    out->reset(new StructReader(ctx, *this, filtered_field, std::move(child_readers)));
+  } else {
+    return Status::Invalid("Unsupported nested type: ", this->field->ToString());
   }
+  return Status::OK();
+}
 
+Status FileReaderImpl::GetRecordBatchReader(const std::vector<int>& row_group_indices,
+                                            const std::vector<int>& column_indices,
+                                            std::shared_ptr<RecordBatchReader>* out) {
+  // column indices check
+  for (auto row_group_index : row_group_indices) {
+    RETURN_NOT_OK(BoundsCheckRowGroup(row_group_index));
+  }
+  return RowGroupRecordBatchReader::Make(row_group_indices, column_indices, this,
+                                         reader_properties_.batch_size(), out);
+}
+
+Status FileReaderImpl::GetColumn(int i, FileColumnIteratorFactory iterator_factory,
+                                 std::unique_ptr<ColumnReader>* out) {
+  RETURN_NOT_OK(BoundsCheckColumn(i));
+  ReaderContext ctx;
+  ctx.reader = reader_.get();
+  ctx.pool = pool_;
+  ctx.iterator_factory = AllRowGroupsFactory();
+  ctx.filter_leaves = false;
+  std::unique_ptr<ColumnReaderImpl> result;
+  RETURN_NOT_OK(manifest_.schema_fields[i].GetReader(ctx, &result));
+  out->reset(result.release());
   return Status::OK();
 }
 
@@ -846,21 +751,20 @@ Status FileReaderImpl::ReadRowGroups(const std::vector<int>& row_groups,
                                      std::shared_ptr<Table>* out) {
   BEGIN_PARQUET_CATCH_EXCEPTIONS
 
-  std::shared_ptr<::arrow::Schema> schema;
-  RETURN_NOT_OK(GetSchema(indices, &schema));
-
   // We only need to read schema fields which have columns indicated
   // in the indices vector
   std::vector<int> field_indices;
-  if (!schema::ColumnIndicesToFieldIndices(*reader_->metadata()->schema(), indices,
-                                           &field_indices)) {
+  if (!manifest_.GetFieldIndices(indices, &field_indices)) {
     return Status::Invalid("Invalid column index");
   }
+
   int num_fields = static_cast<int>(field_indices.size());
+  std::vector<std::shared_ptr<Field>> fields(num_fields);
   std::vector<std::shared_ptr<ChunkedArray>> columns(num_fields);
 
   auto ReadColumnFunc = [&](int i) {
-    return ReadSchemaField(field_indices[i], indices, row_groups, &columns[i]);
+    return ReadSchemaField(field_indices[i], indices, row_groups, &fields[i],
+                           &columns[i]);
   };
 
   if (reader_properties_.use_threads()) {
@@ -883,49 +787,9 @@ Status FileReaderImpl::ReadRowGroups(const std::vector<int>& row_groups,
     }
   }
 
-  auto dict_indices = GetDictionaryIndices(indices);
-  if (!dict_indices.empty()) {
-    schema = FixSchema(*schema, dict_indices, columns);
-  }
-  std::shared_ptr<Table> table = Table::Make(schema, columns);
-  RETURN_NOT_OK(table->Validate());
-  *out = table;
-  return Status::OK();
-  END_PARQUET_CATCH_EXCEPTIONS
-}
-
-Status FileReaderImpl::GetColumn(int i, FileColumnIteratorFactory iterator_factory,
-                                 std::unique_ptr<ColumnReader>* out) {
-  if (i < 0 || i >= this->num_columns()) {
-    return Status::Invalid("Column index out of bounds (got ", i,
-                           ", should be "
-                           "between 0 and ",
-                           this->num_columns() - 1, ")");
-  }
-
-  std::unique_ptr<FileColumnIterator> input(iterator_factory(i, reader_.get()));
-  *out = std::unique_ptr<ColumnReader>(
-      new PrimitiveImpl(pool_, std::move(input), reader_properties_.read_dictionary(i)));
-  return Status::OK();
-}
-
-Status FileReaderImpl::ReadSchemaField(int i, const std::vector<int>& indices,
-                                       const std::vector<int>& row_groups,
-                                       std::shared_ptr<ChunkedArray>* out) {
-  BEGIN_PARQUET_CATCH_EXCEPTIONS
-  auto parquet_schema = reader_->metadata()->schema();
-  auto node = parquet_schema->group_node()->field(i).get();
-  std::unique_ptr<ColumnReaderImpl> reader_impl;
-  RETURN_NOT_OK(GetReaderForNode(i, node, indices, 1, SomeRowGroupsFactory(row_groups),
-                                 &reader_impl));
-  if (reader_impl == nullptr) {
-    *out = nullptr;
-    return Status::OK();
-  }
-  // TODO(wesm): This calculation doesn't make much sense when we have repeated
-  // schema nodes
-  int64_t records_to_read = GetTotalRecords(row_groups, i);
-  return reader_impl->NextBatch(records_to_read, out);
+  auto result_schema = ::arrow::schema(fields, reader_->metadata()->key_value_metadata());
+  *out = Table::Make(result_schema, columns);
+  return (*out)->Validate();
   END_PARQUET_CATCH_EXCEPTIONS
 }
 
diff --git a/cpp/src/parquet/arrow/reader.h b/cpp/src/parquet/arrow/reader.h
index 5b2f2c7..e73fcc0 100644
--- a/cpp/src/parquet/arrow/reader.h
+++ b/cpp/src/parquet/arrow/reader.h
@@ -28,6 +28,7 @@
 namespace arrow {
 
 class ChunkedArray;
+class KeyValueMetadata;
 class RecordBatchReader;
 class Schema;
 class Table;
@@ -39,6 +40,7 @@ namespace parquet {
 class FileMetaData;
 class ParquetFileReader;
 class ReaderProperties;
+class SchemaDescriptor;
 
 namespace arrow {
 
@@ -171,11 +173,6 @@ class PARQUET_EXPORT FileReader {
   /// \brief Return arrow schema for all the columns.
   virtual ::arrow::Status GetSchema(std::shared_ptr<::arrow::Schema>* out) = 0;
 
-  /// \brief Return arrow schema by apply selection of column indices.
-  /// \returns error status if passed wrong indices.
-  virtual ::arrow::Status GetSchema(const std::vector<int>& indices,
-                                    std::shared_ptr<::arrow::Schema>* out) = 0;
-
   // Read column as a whole into an Array.
   virtual ::arrow::Status ReadColumn(int i,
                                      std::shared_ptr<::arrow::ChunkedArray>* out) = 0;
@@ -311,6 +308,17 @@ PARQUET_EXPORT
                          const ArrowReaderProperties& properties,
                          std::unique_ptr<FileReader>* reader);
 
+PARQUET_EXPORT
+::arrow::Status FromParquetSchema(
+    const SchemaDescriptor* parquet_schema, const ArrowReaderProperties& properties,
+    const std::shared_ptr<const ::arrow::KeyValueMetadata>& key_value_metadata,
+    std::shared_ptr<::arrow::Schema>* out);
+
+PARQUET_EXPORT
+::arrow::Status FromParquetSchema(const SchemaDescriptor* parquet_schema,
+                                  const ArrowReaderProperties& properties,
+                                  std::shared_ptr<::arrow::Schema>* out);
+
 }  // namespace arrow
 }  // namespace parquet
 
diff --git a/cpp/src/parquet/arrow/reader_internal.cc b/cpp/src/parquet/arrow/reader_internal.cc
index de00173..f41e875 100644
--- a/cpp/src/parquet/arrow/reader_internal.cc
+++ b/cpp/src/parquet/arrow/reader_internal.cc
@@ -22,18 +22,24 @@
 #include <cstdint>
 #include <cstring>
 #include <memory>
+#include <string>
 #include <type_traits>
+#include <utility>
 #include <vector>
 
+#include <boost/algorithm/string/predicate.hpp>
+
 #include "arrow/array.h"
+#include "arrow/builder.h"
 #include "arrow/compute/kernel.h"
 #include "arrow/table.h"
 #include "arrow/type.h"
-#include "arrow/type_traits.h"
+#include "arrow/util/checked_cast.h"
 #include "arrow/util/int-util.h"
 #include "arrow/util/logging.h"
 #include "arrow/util/ubsan.h"
 
+#include "parquet/arrow/reader.h"
 #include "parquet/column_reader.h"
 #include "parquet/platform.h"
 #include "parquet/schema.h"
@@ -55,10 +61,15 @@ using arrow::TimestampArray;
 using arrow::compute::Datum;
 
 using ::arrow::BitUtil::FromBigEndian;
+using ::arrow::internal::checked_cast;
 using ::arrow::internal::SafeLeftShift;
 using ::arrow::util::SafeLoadAs;
 
 using parquet::internal::RecordReader;
+using parquet::schema::GroupNode;
+using parquet::schema::Node;
+using parquet::schema::PrimitiveNode;
+using ParquetType = parquet::Type;
 
 namespace parquet {
 namespace arrow {
@@ -67,6 +78,497 @@ template <typename ArrowType>
 using ArrayType = typename ::arrow::TypeTraits<ArrowType>::ArrayType;
 
 // ----------------------------------------------------------------------
+// Schema logic
+
+static Status MakeArrowDecimal(const LogicalType& logical_type,
+                               std::shared_ptr<DataType>* out) {
+  const auto& decimal = checked_cast<const DecimalLogicalType&>(logical_type);
+  *out = ::arrow::decimal(decimal.precision(), decimal.scale());
+  return Status::OK();
+}
+
+static Status MakeArrowInt(const LogicalType& logical_type,
+                           std::shared_ptr<DataType>* out) {
+  const auto& integer = checked_cast<const IntLogicalType&>(logical_type);
+  switch (integer.bit_width()) {
+    case 8:
+      *out = integer.is_signed() ? ::arrow::int8() : ::arrow::uint8();
+      break;
+    case 16:
+      *out = integer.is_signed() ? ::arrow::int16() : ::arrow::uint16();
+      break;
+    case 32:
+      *out = integer.is_signed() ? ::arrow::int32() : ::arrow::uint32();
+      break;
+    default:
+      return Status::TypeError(logical_type.ToString(),
+                               " can not annotate physical type Int32");
+  }
+  return Status::OK();
+}
+
+static Status MakeArrowInt64(const LogicalType& logical_type,
+                             std::shared_ptr<DataType>* out) {
+  const auto& integer = checked_cast<const IntLogicalType&>(logical_type);
+  switch (integer.bit_width()) {
+    case 64:
+      *out = integer.is_signed() ? ::arrow::int64() : ::arrow::uint64();
+      break;
+    default:
+      return Status::TypeError(logical_type.ToString(),
+                               " can not annotate physical type Int64");
+  }
+  return Status::OK();
+}
+
+static Status MakeArrowTime32(const LogicalType& logical_type,
+                              std::shared_ptr<DataType>* out) {
+  const auto& time = checked_cast<const TimeLogicalType&>(logical_type);
+  switch (time.time_unit()) {
+    case LogicalType::TimeUnit::MILLIS:
+      *out = ::arrow::time32(::arrow::TimeUnit::MILLI);
+      break;
+    default:
+      return Status::TypeError(logical_type.ToString(),
+                               " can not annotate physical type Time32");
+  }
+  return Status::OK();
+}
+
+static Status MakeArrowTime64(const LogicalType& logical_type,
+                              std::shared_ptr<DataType>* out) {
+  const auto& time = checked_cast<const TimeLogicalType&>(logical_type);
+  switch (time.time_unit()) {
+    case LogicalType::TimeUnit::MICROS:
+      *out = ::arrow::time64(::arrow::TimeUnit::MICRO);
+      break;
+    case LogicalType::TimeUnit::NANOS:
+      *out = ::arrow::time64(::arrow::TimeUnit::NANO);
+      break;
+    default:
+      return Status::TypeError(logical_type.ToString(),
+                               " can not annotate physical type Time64");
+  }
+  return Status::OK();
+}
+
+static Status MakeArrowTimestamp(const LogicalType& logical_type,
+                                 std::shared_ptr<DataType>* out) {
+  const auto& timestamp = checked_cast<const TimestampLogicalType&>(logical_type);
+  const bool utc_normalized =
+      timestamp.is_from_converted_type() ? false : timestamp.is_adjusted_to_utc();
+  static const char* utc_timezone = "UTC";
+  switch (timestamp.time_unit()) {
+    case LogicalType::TimeUnit::MILLIS:
+      *out = (utc_normalized ? ::arrow::timestamp(::arrow::TimeUnit::MILLI, utc_timezone)
+                             : ::arrow::timestamp(::arrow::TimeUnit::MILLI));
+      break;
+    case LogicalType::TimeUnit::MICROS:
+      *out = (utc_normalized ? ::arrow::timestamp(::arrow::TimeUnit::MICRO, utc_timezone)
+                             : ::arrow::timestamp(::arrow::TimeUnit::MICRO));
+      break;
+    case LogicalType::TimeUnit::NANOS:
+      *out = (utc_normalized ? ::arrow::timestamp(::arrow::TimeUnit::NANO, utc_timezone)
+                             : ::arrow::timestamp(::arrow::TimeUnit::NANO));
+      break;
+    default:
+      return Status::TypeError("Unrecognized time unit in timestamp logical_type: ",
+                               logical_type.ToString());
+  }
+  return Status::OK();
+}
+
+static Status FromByteArray(const LogicalType& logical_type,
+                            std::shared_ptr<DataType>* out) {
+  switch (logical_type.type()) {
+    case LogicalType::Type::STRING:
+      *out = ::arrow::utf8();
+      break;
+    case LogicalType::Type::DECIMAL:
+      RETURN_NOT_OK(MakeArrowDecimal(logical_type, out));
+      break;
+    case LogicalType::Type::NONE:
+    case LogicalType::Type::ENUM:
+    case LogicalType::Type::JSON:
+    case LogicalType::Type::BSON:
+      *out = ::arrow::binary();
+      break;
+    default:
+      return Status::NotImplemented("Unhandled logical logical_type ",
+                                    logical_type.ToString(), " for binary array");
+  }
+  return Status::OK();
+}
+
+static Status FromFLBA(const LogicalType& logical_type, int32_t physical_length,
+                       std::shared_ptr<DataType>* out) {
+  switch (logical_type.type()) {
+    case LogicalType::Type::DECIMAL:
+      RETURN_NOT_OK(MakeArrowDecimal(logical_type, out));
+      break;
+    case LogicalType::Type::NONE:
+    case LogicalType::Type::INTERVAL:
+    case LogicalType::Type::UUID:
+      *out = ::arrow::fixed_size_binary(physical_length);
+      break;
+    default:
+      return Status::NotImplemented("Unhandled logical logical_type ",
+                                    logical_type.ToString(),
+                                    " for fixed-length binary array");
+  }
+
+  return Status::OK();
+}
+
+static Status FromInt32(const LogicalType& logical_type, std::shared_ptr<DataType>* out) {
+  switch (logical_type.type()) {
+    case LogicalType::Type::INT:
+      RETURN_NOT_OK(MakeArrowInt(logical_type, out));
+      break;
+    case LogicalType::Type::DATE:
+      *out = ::arrow::date32();
+      break;
+    case LogicalType::Type::TIME:
+      RETURN_NOT_OK(MakeArrowTime32(logical_type, out));
+      break;
+    case LogicalType::Type::DECIMAL:
+      RETURN_NOT_OK(MakeArrowDecimal(logical_type, out));
+      break;
+    case LogicalType::Type::NONE:
+      *out = ::arrow::int32();
+      break;
+    default:
+      return Status::NotImplemented("Unhandled logical type ", logical_type.ToString(),
+                                    " for INT32");
+  }
+  return Status::OK();
+}
+
+static Status FromInt64(const LogicalType& logical_type, std::shared_ptr<DataType>* out) {
+  switch (logical_type.type()) {
+    case LogicalType::Type::INT:
+      RETURN_NOT_OK(MakeArrowInt64(logical_type, out));
+      break;
+    case LogicalType::Type::DECIMAL:
+      RETURN_NOT_OK(MakeArrowDecimal(logical_type, out));
+      break;
+    case LogicalType::Type::TIMESTAMP:
+      RETURN_NOT_OK(MakeArrowTimestamp(logical_type, out));
+      break;
+    case LogicalType::Type::TIME:
+      RETURN_NOT_OK(MakeArrowTime64(logical_type, out));
+      break;
+    case LogicalType::Type::NONE:
+      *out = ::arrow::int64();
+      break;
+    default:
+      return Status::NotImplemented("Unhandled logical type ", logical_type.ToString(),
+                                    " for INT64");
+  }
+  return Status::OK();
+}
+
+Status GetPrimitiveType(const schema::PrimitiveNode& primitive,
+                        std::shared_ptr<DataType>* out) {
+  const std::shared_ptr<const LogicalType>& logical_type = primitive.logical_type();
+  if (logical_type->is_invalid() || logical_type->is_null()) {
+    *out = ::arrow::null();
+    return Status::OK();
+  }
+
+  switch (primitive.physical_type()) {
+    case ParquetType::BOOLEAN:
+      *out = ::arrow::boolean();
+      break;
+    case ParquetType::INT32:
+      RETURN_NOT_OK(FromInt32(*logical_type, out));
+      break;
+    case ParquetType::INT64:
+      RETURN_NOT_OK(FromInt64(*logical_type, out));
+      break;
+    case ParquetType::INT96:
+      *out = ::arrow::timestamp(::arrow::TimeUnit::NANO);
+      break;
+    case ParquetType::FLOAT:
+      *out = ::arrow::float32();
+      break;
+    case ParquetType::DOUBLE:
+      *out = ::arrow::float64();
+      break;
+    case ParquetType::BYTE_ARRAY:
+      RETURN_NOT_OK(FromByteArray(*logical_type, out));
+      break;
+    case ParquetType::FIXED_LEN_BYTE_ARRAY:
+      RETURN_NOT_OK(FromFLBA(*logical_type, primitive.type_length(), out));
+      break;
+    default: {
+      // PARQUET-1565: This can occur if the file is corrupt
+      return Status::IOError("Invalid physical column type: ",
+                             TypeToString(primitive.physical_type()));
+    }
+  }
+  return Status::OK();
+}
+
+struct SchemaTreeContext {
+  SchemaManifest* manifest;
+  ArrowReaderProperties properties;
+  const SchemaDescriptor* schema;
+
+  void LinkParent(const SchemaField* child, const SchemaField* parent) {
+    manifest->child_to_parent[child] = parent;
+  }
+
+  void RecordLeaf(const SchemaField* leaf) {
+    manifest->column_index_to_field[leaf->column_index] = leaf;
+  }
+};
+
+Status GetTypeForNode(int column_index, const schema::PrimitiveNode& primitive_node,
+                      SchemaTreeContext* ctx, std::shared_ptr<DataType>* out) {
+  std::shared_ptr<DataType> storage_type;
+  RETURN_NOT_OK(GetPrimitiveType(primitive_node, &storage_type));
+  if (ctx->properties.read_dictionary(column_index)) {
+    *out = ::arrow::dictionary(::arrow::int32(), storage_type);
+  } else {
+    *out = storage_type;
+  }
+  return Status::OK();
+}
+
+Status NodeToSchemaField(const Node& node, int16_t max_def_level, int16_t max_rep_level,
+                         SchemaTreeContext* ctx, const SchemaField* parent,
+                         SchemaField* out);
+
+Status GroupToSchemaField(const GroupNode& node, int16_t max_def_level,
+                          int16_t max_rep_level, SchemaTreeContext* ctx,
+                          const SchemaField* parent, SchemaField* out);
+
+Status PopulateLeaf(int column_index, const std::shared_ptr<Field>& field,
+                    int16_t max_def_level, int16_t max_rep_level, SchemaTreeContext* ctx,
+                    const SchemaField* parent, SchemaField* out) {
+  out->field = field;
+  out->column_index = column_index;
+  out->max_definition_level = max_def_level;
+  out->max_repetition_level = max_rep_level;
+  ctx->RecordLeaf(out);
+  ctx->LinkParent(out, parent);
+  return Status::OK();
+}
+
+// Special case mentioned in the format spec:
+//   If the name is array or ends in _tuple, this should be a list of struct
+//   even for single child elements.
+bool HasStructListName(const GroupNode& node) {
+  return node.name() == "array" || boost::algorithm::ends_with(node.name(), "_tuple");
+}
+
+Status GroupToStruct(const GroupNode& node, int16_t max_def_level, int16_t max_rep_level,
+                     SchemaTreeContext* ctx, const SchemaField* parent,
+                     SchemaField* out) {
+  std::vector<std::shared_ptr<Field>> arrow_fields;
+  out->children.resize(node.field_count());
+  for (int i = 0; i < node.field_count(); i++) {
+    RETURN_NOT_OK(NodeToSchemaField(*node.field(i), max_def_level, max_rep_level, ctx,
+                                    out, &out->children[i]));
+    arrow_fields.push_back(out->children[i].field);
+  }
+  auto struct_type = ::arrow::struct_(arrow_fields);
+  out->field = ::arrow::field(node.name(), struct_type, node.is_optional());
+  out->max_definition_level = max_def_level;
+  out->max_repetition_level = max_rep_level;
+  return Status::OK();
+}
+
+Status ListToSchemaField(const GroupNode& group, int16_t max_def_level,
+                         int16_t max_rep_level, SchemaTreeContext* ctx,
+                         const SchemaField* parent, SchemaField* out) {
+  if (group.field_count() != 1) {
+    return Status::NotImplemented(
+        "Only LIST-annotated groups with a single child can be handled.");
+  }
+
+  out->children.resize(1);
+  SchemaField* child_field = &out->children[0];
+
+  ctx->LinkParent(out, parent);
+  ctx->LinkParent(child_field, out);
+
+  const Node& list_node = *group.field(0);
+
+  if (!list_node.is_repeated()) {
+    return Status::NotImplemented(
+        "Non-repeated nodes in a LIST-annotated group are not supported.");
+  }
+
+  ++max_def_level;
+  ++max_rep_level;
+  if (list_node.is_group()) {
+    // Resolve 3-level encoding
+    //
+    // required/optional group name=whatever {
+    //   repeated group name=list {
+    //     required/optional TYPE item;
+    //   }
+    // }
+    //
+    // yields list<item: TYPE ?nullable> ?nullable
+    //
+    // We distinguish the special base that we have
+    //
+    // required/optional group name=whatever {
+    //   repeated group name=array or $SOMETHING_tuple {
+    //     required/optional TYPE item;
+    //   }
+    // }
+    //
+    // In this latter case, the inner type of the list should be a struct
+    // rather than a primitive value
+    //
+    // yields list<item: struct<item: TYPE ?nullable> not null> ?nullable
+    const auto& list_group = static_cast<const GroupNode&>(list_node);
+    // Special case mentioned in the format spec:
+    //   If the name is array or ends in _tuple, this should be a list of struct
+    //   even for single child elements.
+    if (list_group.field_count() == 1 && !HasStructListName(list_group)) {
+      // List of primitive type
+      RETURN_NOT_OK(NodeToSchemaField(*list_group.field(0), max_def_level, max_rep_level,
+                                      ctx, out, child_field));
+    } else {
+      RETURN_NOT_OK(
+          GroupToStruct(list_group, max_def_level, max_rep_level, ctx, out, child_field));
+    }
+  } else {
+    // Two-level list encoding
+    //
+    // required/optional group LIST {
+    //   repeated TYPE;
+    // }
+    const auto& primitive_node = static_cast<const PrimitiveNode&>(list_node);
+    int column_index = ctx->schema->GetColumnIndex(primitive_node);
+    std::shared_ptr<DataType> type;
+    RETURN_NOT_OK(GetTypeForNode(column_index, primitive_node, ctx, &type));
+    auto item_field = ::arrow::field(list_node.name(), type, /*nullable=*/false);
+    RETURN_NOT_OK(PopulateLeaf(column_index, item_field, max_def_level, max_rep_level,
+                               ctx, out, child_field));
+  }
+  out->field = ::arrow::field(group.name(), ::arrow::list(child_field->field),
+                              group.is_optional());
+  out->max_definition_level = max_def_level;
+  out->max_repetition_level = max_rep_level;
+  return Status::OK();
+}
+
+Status GroupToSchemaField(const GroupNode& node, int16_t max_def_level,
+                          int16_t max_rep_level, SchemaTreeContext* ctx,
+                          const SchemaField* parent, SchemaField* out) {
+  if (node.logical_type()->is_list()) {
+    return ListToSchemaField(node, max_def_level, max_rep_level, ctx, parent, out);
+  }
+  std::shared_ptr<DataType> type;
+  if (node.is_repeated()) {
+    // Simple repeated struct
+    //
+    // repeated group $NAME {
+    //   r/o TYPE[0] f0
+    //   r/o TYPE[1] f1
+    // }
+    out->children.resize(1);
+    RETURN_NOT_OK(
+        GroupToStruct(node, max_def_level, max_rep_level, ctx, out, &out->children[0]));
+    out->field = ::arrow::field(node.name(), ::arrow::list(out->children[0].field),
+                                node.is_optional());
+    out->max_definition_level = max_def_level;
+    out->max_repetition_level = max_rep_level;
+    return Status::OK();
+  } else {
+    return GroupToStruct(node, max_def_level, max_rep_level, ctx, parent, out);
+  }
+}
+
+Status NodeToSchemaField(const Node& node, int16_t max_def_level, int16_t max_rep_level,
+                         SchemaTreeContext* ctx, const SchemaField* parent,
+                         SchemaField* out) {
+  if (node.is_optional()) {
+    ++max_def_level;
+  } else if (node.is_repeated()) {
+    // Repeated fields add a definition level. This is used to distinguish
+    // between an empty list and a list with an item in it.
+    ++max_rep_level;
+    ++max_def_level;
+  }
+
+  ctx->LinkParent(out, parent);
+
+  // Now, walk the schema and create a ColumnDescriptor for each leaf node
+  if (node.is_group()) {
+    return GroupToSchemaField(static_cast<const GroupNode&>(node), max_def_level,
+                              max_rep_level, ctx, parent, out);
+  } else {
+    const auto& primitive_node = static_cast<const PrimitiveNode&>(node);
+    int column_index = ctx->schema->GetColumnIndex(primitive_node);
+    std::shared_ptr<DataType> type;
+    RETURN_NOT_OK(GetTypeForNode(column_index, primitive_node, ctx, &type));
+    if (node.is_repeated()) {
+      // One-level list encoding, e.g.
+      // a: repeated int32;
+      out->children.resize(1);
+      auto child_field = ::arrow::field(node.name(), type, /*nullable=*/false);
+      RETURN_NOT_OK(PopulateLeaf(column_index, child_field, max_def_level, max_rep_level,
+                                 ctx, out, &out->children[0]));
+
+      out->field = ::arrow::field(node.name(), ::arrow::list(child_field),
+                                  /*nullable=*/false);
+      // Is this right?
+      out->max_definition_level = max_def_level;
+      out->max_repetition_level = max_rep_level;
+      return Status::OK();
+    } else {
+      return PopulateLeaf(column_index,
+                          ::arrow::field(node.name(), type, node.is_optional()),
+                          max_def_level, max_rep_level, ctx, parent, out);
+    }
+  }
+}
+
+Status BuildSchemaManifest(const SchemaDescriptor* schema,
+                           const ArrowReaderProperties& properties,
+                           SchemaManifest* manifest) {
+  SchemaTreeContext ctx;
+  ctx.manifest = manifest;
+  ctx.properties = properties;
+  ctx.schema = schema;
+  const GroupNode& schema_node = *schema->group_node();
+  manifest->descr = schema;
+  manifest->schema_fields.resize(schema_node.field_count());
+  for (int i = 0; i < static_cast<int>(schema_node.field_count()); ++i) {
+    RETURN_NOT_OK(NodeToSchemaField(*schema_node.field(i), 0, 0, &ctx,
+                                    /*parent=*/nullptr, &manifest->schema_fields[i]));
+  }
+  return Status::OK();
+}
+
+Status FromParquetSchema(
+    const SchemaDescriptor* schema, const ArrowReaderProperties& properties,
+    const std::shared_ptr<const KeyValueMetadata>& key_value_metadata,
+    std::shared_ptr<::arrow::Schema>* out) {
+  SchemaManifest manifest;
+  RETURN_NOT_OK(BuildSchemaManifest(schema, properties, &manifest));
+  std::vector<std::shared_ptr<Field>> fields(manifest.schema_fields.size());
+  for (int i = 0; i < static_cast<int>(fields.size()); i++) {
+    fields[i] = manifest.schema_fields[i].field;
+  }
+  *out = ::arrow::schema(fields, key_value_metadata);
+  return Status::OK();
+}
+
+Status FromParquetSchema(const SchemaDescriptor* parquet_schema,
+                         const ArrowReaderProperties& properties,
+                         std::shared_ptr<::arrow::Schema>* out) {
+  return FromParquetSchema(parquet_schema, properties, nullptr, out);
+}
+
+// ----------------------------------------------------------------------
 // Primitive types
 
 template <typename ArrowType, typename ParquetType>
@@ -183,11 +685,8 @@ Status TransferDictionary(RecordReader* reader,
   auto dict_reader = dynamic_cast<internal::DictionaryRecordReader*>(reader);
   DCHECK(dict_reader);
   *out = dict_reader->GetResult();
-
-  const auto& dict_type = static_cast<const ::arrow::DictionaryType&>(*(*out)->type());
-  if (!logical_value_type->Equals(*dict_type.value_type())) {
-    *out = CastChunksTo(**out,
-                        ::arrow::dictionary(dict_type.index_type(), logical_value_type));
+  if (!logical_value_type->Equals(*(*out)->type())) {
+    *out = CastChunksTo(**out, logical_value_type);
   }
   return Status::OK();
 }
@@ -196,7 +695,8 @@ Status TransferBinary(RecordReader* reader,
                       const std::shared_ptr<DataType>& logical_value_type,
                       std::shared_ptr<ChunkedArray>* out) {
   if (reader->read_dictionary()) {
-    return TransferDictionary(reader, logical_value_type, out);
+    return TransferDictionary(
+        reader, ::arrow::dictionary(::arrow::int32(), logical_value_type), out);
   }
   auto binary_reader = dynamic_cast<internal::BinaryRecordReader*>(reader);
   DCHECK(binary_reader);
@@ -519,7 +1019,12 @@ Status TransferColumnData(internal::RecordReader* reader,
                           const ColumnDescriptor* descr, MemoryPool* pool,
                           std::shared_ptr<ChunkedArray>* out) {
   Datum result;
+  std::shared_ptr<ChunkedArray> chunked_result;
   switch (value_type->id()) {
+    case ::arrow::Type::DICTIONARY: {
+      RETURN_NOT_OK(TransferDictionary(reader, value_type, &chunked_result));
+      result = chunked_result;
+    } break;
     case ::arrow::Type::NA: {
       result = std::make_shared<::arrow::NullArray>(reader->values_written());
       break;
@@ -548,9 +1053,8 @@ Status TransferColumnData(internal::RecordReader* reader,
     case ::arrow::Type::FIXED_SIZE_BINARY:
     case ::arrow::Type::BINARY:
     case ::arrow::Type::STRING: {
-      std::shared_ptr<ChunkedArray> out;
-      RETURN_NOT_OK(TransferBinary(reader, value_type, &out));
-      result = out;
+      RETURN_NOT_OK(TransferBinary(reader, value_type, &chunked_result));
+      result = chunked_result;
     } break;
     case ::arrow::Type::DECIMAL: {
       switch (descr->physical_type()) {
@@ -612,5 +1116,114 @@ Status TransferColumnData(internal::RecordReader* reader,
   return Status::OK();
 }
 
+Status ReconstructNestedList(const std::shared_ptr<Array>& arr,
+                             std::shared_ptr<Field> field, int16_t max_def_level,
+                             int16_t max_rep_level, const int16_t* def_levels,
+                             const int16_t* rep_levels, int64_t total_levels,
+                             ::arrow::MemoryPool* pool, std::shared_ptr<Array>* out) {
+  // Walk downwards to extract nullability
+  std::vector<bool> nullable;
+  std::vector<std::shared_ptr<::arrow::Int32Builder>> offset_builders;
+  std::vector<std::shared_ptr<::arrow::BooleanBuilder>> valid_bits_builders;
+  nullable.push_back(field->nullable());
+  while (field->type()->num_children() > 0) {
+    if (field->type()->num_children() > 1) {
+      return Status::NotImplemented("Fields with more than one child are not supported.");
+    } else {
+      if (field->type()->id() != ::arrow::Type::LIST) {
+        return Status::NotImplemented("Currently only nesting with Lists is supported.");
+      }
+      field = field->type()->child(0);
+    }
+    offset_builders.emplace_back(
+        std::make_shared<::arrow::Int32Builder>(::arrow::int32(), pool));
+    valid_bits_builders.emplace_back(
+        std::make_shared<::arrow::BooleanBuilder>(::arrow::boolean(), pool));
+    nullable.push_back(field->nullable());
+  }
+
+  int64_t list_depth = offset_builders.size();
+  // This describes the minimal definition that describes a level that
+  // reflects a value in the primitive values array.
+  int16_t values_def_level = max_def_level;
+  if (nullable[nullable.size() - 1]) {
+    values_def_level--;
+  }
+
+  // The definition levels that are needed so that a list is declared
+  // as empty and not null.
+  std::vector<int16_t> empty_def_level(list_depth);
+  int def_level = 0;
+  for (int i = 0; i < list_depth; i++) {
+    if (nullable[i]) {
+      def_level++;
+    }
+    empty_def_level[i] = static_cast<int16_t>(def_level);
+    def_level++;
+  }
+
+  int32_t values_offset = 0;
+  std::vector<int64_t> null_counts(list_depth, 0);
+  for (int64_t i = 0; i < total_levels; i++) {
+    int16_t rep_level = rep_levels[i];
+    if (rep_level < max_rep_level) {
+      for (int64_t j = rep_level; j < list_depth; j++) {
+        if (j == (list_depth - 1)) {
+          RETURN_NOT_OK(offset_builders[j]->Append(values_offset));
+        } else {
+          RETURN_NOT_OK(offset_builders[j]->Append(
+              static_cast<int32_t>(offset_builders[j + 1]->length())));
+        }
+
+        if (((empty_def_level[j] - 1) == def_levels[i]) && (nullable[j])) {
+          RETURN_NOT_OK(valid_bits_builders[j]->Append(false));
+          null_counts[j]++;
+          break;
+        } else {
+          RETURN_NOT_OK(valid_bits_builders[j]->Append(true));
+          if (empty_def_level[j] == def_levels[i]) {
+            break;
+          }
+        }
+      }
+    }
+    if (def_levels[i] >= values_def_level) {
+      values_offset++;
+    }
+  }
+  // Add the final offset to all lists
+  for (int64_t j = 0; j < list_depth; j++) {
+    if (j == (list_depth - 1)) {
+      RETURN_NOT_OK(offset_builders[j]->Append(values_offset));
+    } else {
+      RETURN_NOT_OK(offset_builders[j]->Append(
+          static_cast<int32_t>(offset_builders[j + 1]->length())));
+    }
+  }
+
+  std::vector<std::shared_ptr<Buffer>> offsets;
+  std::vector<std::shared_ptr<Buffer>> valid_bits;
+  std::vector<int64_t> list_lengths;
+  for (int64_t j = 0; j < list_depth; j++) {
+    list_lengths.push_back(offset_builders[j]->length() - 1);
+    std::shared_ptr<Array> array;
+    RETURN_NOT_OK(offset_builders[j]->Finish(&array));
+    offsets.emplace_back(std::static_pointer_cast<Int32Array>(array)->values());
+    RETURN_NOT_OK(valid_bits_builders[j]->Finish(&array));
+    valid_bits.emplace_back(std::static_pointer_cast<BooleanArray>(array)->values());
+  }
+
+  *out = arr;
+
+  // TODO(wesm): Use passed-in field
+  for (int64_t j = list_depth - 1; j >= 0; j--) {
+    auto list_type =
+        ::arrow::list(::arrow::field("item", (*out)->type(), nullable[j + 1]));
+    *out = std::make_shared<::arrow::ListArray>(list_type, list_lengths[j], offsets[j],
+                                                *out, valid_bits[j], null_counts[j]);
+  }
+  return Status::OK();
+}
+
 }  // namespace arrow
 }  // namespace parquet
diff --git a/cpp/src/parquet/arrow/reader_internal.h b/cpp/src/parquet/arrow/reader_internal.h
index 5c0fa5e..cbf44ec 100644
--- a/cpp/src/parquet/arrow/reader_internal.h
+++ b/cpp/src/parquet/arrow/reader_internal.h
@@ -17,17 +17,32 @@
 
 #pragma once
 
+#include <deque>
 #include <memory>
+#include <unordered_map>
+#include <unordered_set>
+#include <vector>
+
+#include "arrow/status.h"
+
+#include "parquet/column_reader.h"
+#include "parquet/file_reader.h"
+#include "parquet/metadata.h"
+#include "parquet/schema.h"
 
 namespace arrow {
 
+class Array;
 class ChunkedArray;
 class DataType;
+class Field;
 class MemoryPool;
-class Status;
+class Schema;
 
 }  // namespace arrow
 
+using arrow::Status;
+
 namespace parquet {
 
 class ColumnDescriptor;
@@ -40,11 +55,145 @@ class RecordReader;
 
 namespace arrow {
 
-::arrow::Status TransferColumnData(internal::RecordReader* reader,
-                                   std::shared_ptr<::arrow::DataType> value_type,
-                                   const ColumnDescriptor* descr,
-                                   ::arrow::MemoryPool* pool,
-                                   std::shared_ptr<::arrow::ChunkedArray>* out);
+class ArrowReaderProperties;
+class ColumnReaderImpl;
+
+// ----------------------------------------------------------------------
+// Iteration utilities
+
+// Abstraction to decouple row group iteration details from the ColumnReader,
+// so we can read only a single row group if we want
+class FileColumnIterator {
+ public:
+  explicit FileColumnIterator(int column_index, ParquetFileReader* reader,
+                              std::vector<int> row_groups)
+      : column_index_(column_index),
+        reader_(reader),
+        schema_(reader->metadata()->schema()),
+        row_groups_(row_groups.begin(), row_groups.end()) {}
+
+  virtual ~FileColumnIterator() {}
+
+  std::unique_ptr<::parquet::PageReader> NextChunk() {
+    if (row_groups_.empty()) {
+      return nullptr;
+    }
+
+    auto row_group_reader = reader_->RowGroup(row_groups_.front());
+    row_groups_.pop_front();
+    return row_group_reader->GetColumnPageReader(column_index_);
+  }
+
+  const SchemaDescriptor* schema() const { return schema_; }
+
+  const ColumnDescriptor* descr() const { return schema_->Column(column_index_); }
+
+  std::shared_ptr<FileMetaData> metadata() const { return reader_->metadata(); }
+
+  int column_index() const { return column_index_; }
+
+ protected:
+  int column_index_;
+  ParquetFileReader* reader_;
+  const SchemaDescriptor* schema_;
+  std::deque<int> row_groups_;
+};
+
+using FileColumnIteratorFactory =
+    std::function<FileColumnIterator*(int, ParquetFileReader*)>;
+
+Status TransferColumnData(::parquet::internal::RecordReader* reader,
+                          std::shared_ptr<::arrow::DataType> value_type,
+                          const ColumnDescriptor* descr, ::arrow::MemoryPool* pool,
+                          std::shared_ptr<::arrow::ChunkedArray>* out);
+
+Status ReconstructNestedList(const std::shared_ptr<::arrow::Array>& arr,
+                             std::shared_ptr<::arrow::Field> field, int16_t max_def_level,
+                             int16_t max_rep_level, const int16_t* def_levels,
+                             const int16_t* rep_levels, int64_t total_levels,
+                             ::arrow::MemoryPool* pool,
+                             std::shared_ptr<::arrow::Array>* out);
+
+struct ReaderContext {
+  ParquetFileReader* reader;
+  ::arrow::MemoryPool* pool;
+  FileColumnIteratorFactory iterator_factory;
+  bool filter_leaves;
+  std::unordered_set<int> included_leaves;
+
+  bool IncludesLeaf(int leaf_index) const {
+    return (!this->filter_leaves ||
+            (included_leaves.find(leaf_index) != included_leaves.end()));
+  }
+};
+
+struct PARQUET_EXPORT SchemaField {
+  std::shared_ptr<::arrow::Field> field;
+  std::vector<SchemaField> children;
+
+  // Only set for leaf nodes
+  int column_index = -1;
+
+  int16_t max_definition_level;
+  int16_t max_repetition_level;
+
+  bool is_leaf() const { return column_index != -1; }
+
+  Status GetReader(const ReaderContext& context,
+                   std::unique_ptr<ColumnReaderImpl>* out) const;
+};
+
+struct SchemaManifest {
+  const SchemaDescriptor* descr;
+  std::vector<SchemaField> schema_fields;
+
+  std::unordered_map<int, const SchemaField*> column_index_to_field;
+  std::unordered_map<const SchemaField*, const SchemaField*> child_to_parent;
+
+  Status GetColumnField(int column_index, const SchemaField** out) const {
+    auto it = column_index_to_field.find(column_index);
+    if (it == column_index_to_field.end()) {
+      return Status::KeyError("Column index ", column_index,
+                              " not found in schema manifest, may be malformed");
+    }
+    *out = it->second;
+    return Status::OK();
+  }
+
+  const SchemaField* GetParent(const SchemaField* field) const {
+    // Returns nullptr also if not found
+    auto it = child_to_parent.find(field);
+    if (it == child_to_parent.end()) {
+      return nullptr;
+    }
+    return it->second;
+  }
+
+  bool GetFieldIndices(const std::vector<int>& column_indices, std::vector<int>* out) {
+    // Coalesce a list of schema fields indices which are the roots of the
+    // columns referred by a list of column indices
+    const schema::GroupNode* group = descr->group_node();
+    std::unordered_set<int> already_added;
+    out->clear();
+    for (auto& column_idx : column_indices) {
+      auto field_node = descr->GetColumnRoot(column_idx);
+      auto field_idx = group->FieldIndex(*field_node);
+      if (field_idx < 0) {
+        return false;
+      }
+      auto insertion = already_added.insert(field_idx);
+      if (insertion.second) {
+        out->push_back(field_idx);
+      }
+    }
+    return true;
+  }
+};
+
+PARQUET_EXPORT
+Status BuildSchemaManifest(const SchemaDescriptor* schema,
+                           const ArrowReaderProperties& properties,
+                           SchemaManifest* manifest);
 
 }  // namespace arrow
 }  // namespace parquet
diff --git a/cpp/src/parquet/arrow/schema.cc b/cpp/src/parquet/arrow/schema.cc
index 6404338..82c8566 100644
--- a/cpp/src/parquet/arrow/schema.cc
+++ b/cpp/src/parquet/arrow/schema.cc
@@ -53,446 +53,8 @@ namespace parquet {
 
 namespace arrow {
 
-const auto TIMESTAMP_MS = ::arrow::timestamp(::arrow::TimeUnit::MILLI);
-const auto TIMESTAMP_US = ::arrow::timestamp(::arrow::TimeUnit::MICRO);
-const auto TIMESTAMP_NS = ::arrow::timestamp(::arrow::TimeUnit::NANO);
-
-static Status MakeArrowDecimal(const LogicalType& logical_type,
-                               std::shared_ptr<ArrowType>* out) {
-  const auto& decimal = checked_cast<const DecimalLogicalType&>(logical_type);
-  *out = ::arrow::decimal(decimal.precision(), decimal.scale());
-  return Status::OK();
-}
-
-static Status MakeArrowInt(const LogicalType& logical_type,
-                           std::shared_ptr<ArrowType>* out) {
-  const auto& integer = checked_cast<const IntLogicalType&>(logical_type);
-  switch (integer.bit_width()) {
-    case 8:
-      *out = integer.is_signed() ? ::arrow::int8() : ::arrow::uint8();
-      break;
-    case 16:
-      *out = integer.is_signed() ? ::arrow::int16() : ::arrow::uint16();
-      break;
-    case 32:
-      *out = integer.is_signed() ? ::arrow::int32() : ::arrow::uint32();
-      break;
-    default:
-      return Status::TypeError(logical_type.ToString(),
-                               " can not annotate physical type Int32");
-  }
-  return Status::OK();
-}
-
-static Status MakeArrowInt64(const LogicalType& logical_type,
-                             std::shared_ptr<ArrowType>* out) {
-  const auto& integer = checked_cast<const IntLogicalType&>(logical_type);
-  switch (integer.bit_width()) {
-    case 64:
-      *out = integer.is_signed() ? ::arrow::int64() : ::arrow::uint64();
-      break;
-    default:
-      return Status::TypeError(logical_type.ToString(),
-                               " can not annotate physical type Int64");
-  }
-  return Status::OK();
-}
-
-static Status MakeArrowTime32(const LogicalType& logical_type,
-                              std::shared_ptr<ArrowType>* out) {
-  const auto& time = checked_cast<const TimeLogicalType&>(logical_type);
-  switch (time.time_unit()) {
-    case LogicalType::TimeUnit::MILLIS:
-      *out = ::arrow::time32(::arrow::TimeUnit::MILLI);
-      break;
-    default:
-      return Status::TypeError(logical_type.ToString(),
-                               " can not annotate physical type Time32");
-  }
-  return Status::OK();
-}
-
-static Status MakeArrowTime64(const LogicalType& logical_type,
-                              std::shared_ptr<ArrowType>* out) {
-  const auto& time = checked_cast<const TimeLogicalType&>(logical_type);
-  switch (time.time_unit()) {
-    case LogicalType::TimeUnit::MICROS:
-      *out = ::arrow::time64(::arrow::TimeUnit::MICRO);
-      break;
-    case LogicalType::TimeUnit::NANOS:
-      *out = ::arrow::time64(::arrow::TimeUnit::NANO);
-      break;
-    default:
-      return Status::TypeError(logical_type.ToString(),
-                               " can not annotate physical type Time64");
-  }
-  return Status::OK();
-}
-
-static Status MakeArrowTimestamp(const LogicalType& logical_type,
-                                 std::shared_ptr<ArrowType>* out) {
-  const auto& timestamp = checked_cast<const TimestampLogicalType&>(logical_type);
-  const bool utc_normalized =
-      timestamp.is_from_converted_type() ? false : timestamp.is_adjusted_to_utc();
-  static const char* utc_timezone = "UTC";
-  switch (timestamp.time_unit()) {
-    case LogicalType::TimeUnit::MILLIS:
-      *out = (utc_normalized ? ::arrow::timestamp(::arrow::TimeUnit::MILLI, utc_timezone)
-                             : ::arrow::timestamp(::arrow::TimeUnit::MILLI));
-      break;
-    case LogicalType::TimeUnit::MICROS:
-      *out = (utc_normalized ? ::arrow::timestamp(::arrow::TimeUnit::MICRO, utc_timezone)
-                             : ::arrow::timestamp(::arrow::TimeUnit::MICRO));
-      break;
-    case LogicalType::TimeUnit::NANOS:
-      *out = (utc_normalized ? ::arrow::timestamp(::arrow::TimeUnit::NANO, utc_timezone)
-                             : ::arrow::timestamp(::arrow::TimeUnit::NANO));
-      break;
-    default:
-      return Status::TypeError("Unrecognized time unit in timestamp logical_type: ",
-                               logical_type.ToString());
-  }
-  return Status::OK();
-}
-
-static Status FromByteArray(const LogicalType& logical_type,
-                            std::shared_ptr<ArrowType>* out) {
-  switch (logical_type.type()) {
-    case LogicalType::Type::STRING:
-      *out = ::arrow::utf8();
-      break;
-    case LogicalType::Type::DECIMAL:
-      RETURN_NOT_OK(MakeArrowDecimal(logical_type, out));
-      break;
-    case LogicalType::Type::NONE:
-    case LogicalType::Type::ENUM:
-    case LogicalType::Type::JSON:
-    case LogicalType::Type::BSON:
-      *out = ::arrow::binary();
-      break;
-    default:
-      return Status::NotImplemented("Unhandled logical logical_type ",
-                                    logical_type.ToString(), " for binary array");
-  }
-  return Status::OK();
-}
-
-static Status FromFLBA(const LogicalType& logical_type, int32_t physical_length,
-                       std::shared_ptr<ArrowType>* out) {
-  switch (logical_type.type()) {
-    case LogicalType::Type::DECIMAL:
-      RETURN_NOT_OK(MakeArrowDecimal(logical_type, out));
-      break;
-    case LogicalType::Type::NONE:
-    case LogicalType::Type::INTERVAL:
-    case LogicalType::Type::UUID:
-      *out = ::arrow::fixed_size_binary(physical_length);
-      break;
-    default:
-      return Status::NotImplemented("Unhandled logical logical_type ",
-                                    logical_type.ToString(),
-                                    " for fixed-length binary array");
-  }
-
-  return Status::OK();
-}
-
-static Status FromInt32(const LogicalType& logical_type,
-                        std::shared_ptr<ArrowType>* out) {
-  switch (logical_type.type()) {
-    case LogicalType::Type::INT:
-      RETURN_NOT_OK(MakeArrowInt(logical_type, out));
-      break;
-    case LogicalType::Type::DATE:
-      *out = ::arrow::date32();
-      break;
-    case LogicalType::Type::TIME:
-      RETURN_NOT_OK(MakeArrowTime32(logical_type, out));
-      break;
-    case LogicalType::Type::DECIMAL:
-      RETURN_NOT_OK(MakeArrowDecimal(logical_type, out));
-      break;
-    case LogicalType::Type::NONE:
-      *out = ::arrow::int32();
-      break;
-    default:
-      return Status::NotImplemented("Unhandled logical type ", logical_type.ToString(),
-                                    " for INT32");
-  }
-  return Status::OK();
-}
-
-static Status FromInt64(const LogicalType& logical_type,
-                        std::shared_ptr<ArrowType>* out) {
-  switch (logical_type.type()) {
-    case LogicalType::Type::INT:
-      RETURN_NOT_OK(MakeArrowInt64(logical_type, out));
-      break;
-    case LogicalType::Type::DECIMAL:
-      RETURN_NOT_OK(MakeArrowDecimal(logical_type, out));
-      break;
-    case LogicalType::Type::TIMESTAMP:
-      RETURN_NOT_OK(MakeArrowTimestamp(logical_type, out));
-      break;
-    case LogicalType::Type::TIME:
-      RETURN_NOT_OK(MakeArrowTime64(logical_type, out));
-      break;
-    case LogicalType::Type::NONE:
-      *out = ::arrow::int64();
-      break;
-    default:
-      return Status::NotImplemented("Unhandled logical type ", logical_type.ToString(),
-                                    " for INT64");
-  }
-  return Status::OK();
-}
-
-Status FromPrimitive(const PrimitiveNode& primitive, std::shared_ptr<ArrowType>* out) {
-  const std::shared_ptr<const LogicalType>& logical_type = primitive.logical_type();
-  if (logical_type->is_invalid() || logical_type->is_null()) {
-    *out = ::arrow::null();
-    return Status::OK();
-  }
-
-  switch (primitive.physical_type()) {
-    case ParquetType::BOOLEAN:
-      *out = ::arrow::boolean();
-      break;
-    case ParquetType::INT32:
-      RETURN_NOT_OK(FromInt32(*logical_type, out));
-      break;
-    case ParquetType::INT64:
-      RETURN_NOT_OK(FromInt64(*logical_type, out));
-      break;
-    case ParquetType::INT96:
-      *out = TIMESTAMP_NS;
-      break;
-    case ParquetType::FLOAT:
-      *out = ::arrow::float32();
-      break;
-    case ParquetType::DOUBLE:
-      *out = ::arrow::float64();
-      break;
-    case ParquetType::BYTE_ARRAY:
-      RETURN_NOT_OK(FromByteArray(*logical_type, out));
-      break;
-    case ParquetType::FIXED_LEN_BYTE_ARRAY:
-      RETURN_NOT_OK(FromFLBA(*logical_type, primitive.type_length(), out));
-      break;
-    default: {
-      // PARQUET-1565: This can occur if the file is corrupt
-      return Status::IOError("Invalid physical column type: ",
-                             TypeToString(primitive.physical_type()));
-    }
-  }
-  return Status::OK();
-}
-
-// Forward declaration
-Status NodeToFieldInternal(const Node& node,
-                           const std::unordered_set<const Node*>* included_leaf_nodes,
-                           std::shared_ptr<Field>* out);
-
-/*
- * Auxilary function to test if a parquet schema node is a leaf node
- * that should be included in a resulting arrow schema
- */
-inline bool IsIncludedLeaf(const Node& node,
-                           const std::unordered_set<const Node*>* included_leaf_nodes) {
-  if (included_leaf_nodes == nullptr) {
-    return true;
-  }
-  auto search = included_leaf_nodes->find(&node);
-  return (search != included_leaf_nodes->end());
-}
-
-Status StructFromGroup(const GroupNode& group,
-                       const std::unordered_set<const Node*>* included_leaf_nodes,
-                       std::shared_ptr<ArrowType>* out) {
-  std::vector<std::shared_ptr<Field>> fields;
-  std::shared_ptr<Field> field;
-
-  *out = nullptr;
-
-  for (int i = 0; i < group.field_count(); i++) {
-    RETURN_NOT_OK(NodeToFieldInternal(*group.field(i), included_leaf_nodes, &field));
-    if (field != nullptr) {
-      fields.push_back(field);
-    }
-  }
-  if (fields.size() > 0) {
-    *out = std::make_shared<::arrow::StructType>(fields);
-  }
-  return Status::OK();
-}
-
-Status NodeToList(const GroupNode& group,
-                  const std::unordered_set<const Node*>* included_leaf_nodes,
-                  std::shared_ptr<ArrowType>* out) {
-  *out = nullptr;
-  if (group.field_count() == 1) {
-    // This attempts to resolve the preferred 3-level list encoding.
-    const Node& list_node = *group.field(0);
-    if (list_node.is_group() && list_node.is_repeated()) {
-      const auto& list_group = static_cast<const GroupNode&>(list_node);
-      // Special case mentioned in the format spec:
-      //   If the name is array or ends in _tuple, this should be a list of struct
-      //   even for single child elements.
-      if (list_group.field_count() == 1 && !schema::HasStructListName(list_group)) {
-        // List of primitive type
-        std::shared_ptr<Field> item_field;
-        RETURN_NOT_OK(
-            NodeToFieldInternal(*list_group.field(0), included_leaf_nodes, &item_field));
-
-        if (item_field != nullptr) {
-          *out = ::arrow::list(item_field);
-        }
-      } else {
-        // List of struct
-        std::shared_ptr<ArrowType> inner_type;
-        RETURN_NOT_OK(StructFromGroup(list_group, included_leaf_nodes, &inner_type));
-        if (inner_type != nullptr) {
-          auto item_field = std::make_shared<Field>(list_node.name(), inner_type, false);
-          *out = ::arrow::list(item_field);
-        }
-      }
-    } else if (list_node.is_repeated()) {
-      // repeated primitive node
-      std::shared_ptr<ArrowType> inner_type;
-      if (IsIncludedLeaf(static_cast<const Node&>(list_node), included_leaf_nodes)) {
-        RETURN_NOT_OK(
-            FromPrimitive(static_cast<const PrimitiveNode&>(list_node), &inner_type));
-        auto item_field = std::make_shared<Field>(list_node.name(), inner_type, false);
-        *out = ::arrow::list(item_field);
-      }
-    } else {
-      return Status::NotImplemented(
-          "Non-repeated groups in a LIST-annotated group are not supported.");
-    }
-  } else {
-    return Status::NotImplemented(
-        "Only LIST-annotated groups with a single child can be handled.");
-  }
-  return Status::OK();
-}
-
-Status NodeToField(const Node& node, std::shared_ptr<Field>* out) {
-  return NodeToFieldInternal(node, nullptr, out);
-}
-
-Status NodeToFieldInternal(const Node& node,
-                           const std::unordered_set<const Node*>* included_leaf_nodes,
-                           std::shared_ptr<Field>* out) {
-  std::shared_ptr<ArrowType> type = nullptr;
-  bool nullable = !node.is_required();
-
-  *out = nullptr;
-
-  if (node.is_repeated()) {
-    // 1-level LIST encoding fields are required
-    std::shared_ptr<ArrowType> inner_type;
-    if (node.is_group()) {
-      RETURN_NOT_OK(StructFromGroup(static_cast<const GroupNode&>(node),
-                                    included_leaf_nodes, &inner_type));
-    } else if (IsIncludedLeaf(node, included_leaf_nodes)) {
-      RETURN_NOT_OK(FromPrimitive(static_cast<const PrimitiveNode&>(node), &inner_type));
-    }
-    if (inner_type != nullptr) {
-      auto item_field = std::make_shared<Field>(node.name(), inner_type, false);
-      type = ::arrow::list(item_field);
-      nullable = false;
-    }
-  } else if (node.is_group()) {
-    const auto& group = static_cast<const GroupNode&>(node);
-    if (node.logical_type()->is_list()) {
-      RETURN_NOT_OK(NodeToList(group, included_leaf_nodes, &type));
-    } else {
-      RETURN_NOT_OK(StructFromGroup(group, included_leaf_nodes, &type));
-    }
-  } else {
-    // Primitive (leaf) node
-    if (IsIncludedLeaf(node, included_leaf_nodes)) {
-      RETURN_NOT_OK(FromPrimitive(static_cast<const PrimitiveNode&>(node), &type));
-    }
-  }
-  if (type != nullptr) {
-    *out = std::make_shared<Field>(node.name(), type, nullable);
-  }
-  return Status::OK();
-}
-
-std::shared_ptr<Field> ToDictionary32(const Field& field) {
-  auto new_ty = ::arrow::dictionary(::arrow::int32(), field.type());
-  return field.WithType(new_ty);
-}
-
-Status FromParquetSchema(
-    const SchemaDescriptor* parquet_schema, const ArrowReaderProperties& properties,
-    const std::shared_ptr<const KeyValueMetadata>& key_value_metadata,
-    std::shared_ptr<::arrow::Schema>* out) {
-  const GroupNode& schema_node = *parquet_schema->group_node();
-
-  int num_fields = static_cast<int>(schema_node.field_count());
-  std::vector<std::shared_ptr<Field>> fields(num_fields);
-  for (int i = 0; i < num_fields; i++) {
-    RETURN_NOT_OK(NodeToField(*schema_node.field(i), &fields[i]));
-  }
-  *out = std::make_shared<::arrow::Schema>(fields, key_value_metadata);
-  return Status::OK();
-}
-
-Status FromParquetSchema(
-    const SchemaDescriptor* parquet_schema, const std::vector<int>& column_indices,
-    const ArrowReaderProperties& properties,
-    const std::shared_ptr<const KeyValueMetadata>& key_value_metadata,
-    std::shared_ptr<::arrow::Schema>* out) {
-  // TODO(wesm): Consider adding an arrow::Schema name attribute, which comes
-  // from the root Parquet node
-
-  // Put the right leaf nodes in an unordered set
-  // Index in column_indices should be unique, duplicate indices are merged into one and
-  // ordering by its first appearing.
-  int num_columns = static_cast<int>(column_indices.size());
-  std::unordered_set<const Node*> top_nodes;  // to deduplicate the top nodes
-  std::vector<const Node*> base_nodes;        // to keep the ordering
-  std::unordered_set<const Node*> included_leaf_nodes(num_columns);
-  for (int i = 0; i < num_columns; i++) {
-    const ColumnDescriptor* column_desc = parquet_schema->Column(column_indices[i]);
-    const Node* node = column_desc->schema_node().get();
-
-    included_leaf_nodes.insert(node);
-    const Node* column_root = parquet_schema->GetColumnRoot(column_indices[i]);
-    auto it = top_nodes.insert(column_root);
-    if (it.second) {
-      base_nodes.push_back(column_root);
-    }
-  }
-
-  std::vector<std::shared_ptr<Field>> fields;
-  std::shared_ptr<Field> field;
-  for (auto node : base_nodes) {
-    RETURN_NOT_OK(NodeToFieldInternal(*node, &included_leaf_nodes, &field));
-    if (field != nullptr) {
-      fields.push_back(field);
-    }
-  }
-
-  *out = std::make_shared<::arrow::Schema>(fields, key_value_metadata);
-  return Status::OK();
-}
-
-Status FromParquetSchema(const SchemaDescriptor* parquet_schema,
-                         const std::vector<int>& column_indices,
-                         const ArrowReaderProperties& properties,
-                         std::shared_ptr<::arrow::Schema>* out) {
-  return FromParquetSchema(parquet_schema, column_indices, properties, nullptr, out);
-}
-
-Status FromParquetSchema(const SchemaDescriptor* parquet_schema,
-                         const ArrowReaderProperties& properties,
-                         std::shared_ptr<::arrow::Schema>* out) {
-  return FromParquetSchema(parquet_schema, properties, nullptr, out);
-}
+// ----------------------------------------------------------------------
+// Parquet to Arrow schema conversion
 
 Status ListToNode(const std::shared_ptr<::arrow::ListType>& type, const std::string& name,
                   bool nullable, const WriterProperties& properties,
diff --git a/cpp/src/parquet/arrow/schema.h b/cpp/src/parquet/arrow/schema.h
index 1133431..b3cc66b 100644
--- a/cpp/src/parquet/arrow/schema.h
+++ b/cpp/src/parquet/arrow/schema.h
@@ -39,50 +39,9 @@ class WriterProperties;
 
 namespace arrow {
 
-class ArrowReaderProperties;
 class ArrowWriterProperties;
 
 PARQUET_EXPORT
-::arrow::Status NodeToField(const schema::Node& node,
-                            std::shared_ptr<::arrow::Field>* out);
-
-/// Convert parquet schema to arrow schema with selected indices
-/// \param parquet_schema to be converted
-/// \param column_indices indices of leaf nodes in parquet schema tree. Appearing ordering
-///                       matters for the converted schema. Repeated indices are ignored
-///                       except for the first one
-/// \param properties reader options for FileReader
-/// \param key_value_metadata optional metadata, can be nullptr
-/// \param out the corresponding arrow schema
-/// \return Status::OK() on a successful conversion.
-PARQUET_EXPORT
-::arrow::Status FromParquetSchema(
-    const SchemaDescriptor* parquet_schema, const std::vector<int>& column_indices,
-    const ArrowReaderProperties& properties,
-    const std::shared_ptr<const KeyValueMetadata>& key_value_metadata,
-    std::shared_ptr<::arrow::Schema>* out);
-
-// Without indices
-PARQUET_EXPORT
-::arrow::Status FromParquetSchema(
-    const SchemaDescriptor* parquet_schema, const ArrowReaderProperties& properties,
-    const std::shared_ptr<const KeyValueMetadata>& key_value_metadata,
-    std::shared_ptr<::arrow::Schema>* out);
-
-// Without metadata
-PARQUET_EXPORT
-::arrow::Status FromParquetSchema(const SchemaDescriptor* parquet_schema,
-                                  const std::vector<int>& column_indices,
-                                  const ArrowReaderProperties& properties,
-                                  std::shared_ptr<::arrow::Schema>* out);
-
-// Without metadata or indices
-PARQUET_EXPORT
-::arrow::Status FromParquetSchema(const SchemaDescriptor* parquet_schema,
-                                  const ArrowReaderProperties& properties,
-                                  std::shared_ptr<::arrow::Schema>* out);
-
-PARQUET_EXPORT
 ::arrow::Status FieldToNode(const std::shared_ptr<::arrow::Field>& field,
                             const WriterProperties& properties,
                             const ArrowWriterProperties& arrow_properties,
diff --git a/cpp/src/parquet/arrow/writer.cc b/cpp/src/parquet/arrow/writer.cc
index bb2bab1..ee3b880 100644
--- a/cpp/src/parquet/arrow/writer.cc
+++ b/cpp/src/parquet/arrow/writer.cc
@@ -19,6 +19,7 @@
 
 #include <algorithm>
 #include <cstddef>
+#include <deque>
 #include <type_traits>
 #include <utility>
 #include <vector>
@@ -33,6 +34,7 @@
 #include "arrow/util/logging.h"
 
 #include "parquet/arrow/reader.h"
+#include "parquet/arrow/reader_internal.h"
 #include "parquet/arrow/schema.h"
 #include "parquet/column_writer.h"
 #include "parquet/exception.h"
@@ -78,7 +80,12 @@ namespace {
 
 class LevelBuilder {
  public:
-  explicit LevelBuilder(MemoryPool* pool) : def_levels_(pool), rep_levels_(pool) {}
+  explicit LevelBuilder(MemoryPool* pool, const SchemaField* schema_field,
+                        const SchemaManifest* schema_manifest)
+      : def_levels_(pool),
+        rep_levels_(pool),
+        schema_field_(schema_field),
+        schema_manifest_(schema_manifest) {}
 
   Status VisitInline(const Array& array);
 
@@ -121,8 +128,23 @@ class LevelBuilder {
 
 #undef NOT_IMPLEMENTED_VISIT
 
-  Status GenerateLevels(const Array& array, const std::shared_ptr<Field>& field,
-                        int64_t* values_offset, int64_t* num_values, int64_t* num_levels,
+  Status ExtractNullability() {
+    // Walk upwards to extract nullability
+    const SchemaField* current_field = schema_field_;
+    while (current_field != nullptr) {
+      nullable_.push_front(current_field->field->nullable());
+      if (current_field->field->type()->num_children() > 1) {
+        return Status::NotImplemented(
+            "Fields with more than one child are not supported.");
+      } else {
+        current_field = schema_manifest_->GetParent(current_field);
+      }
+    }
+    return Status::OK();
+  }
+
+  Status GenerateLevels(const Array& array, int64_t* values_offset, int64_t* num_values,
+                        int64_t* num_levels,
                         const std::shared_ptr<ResizableBuffer>& def_levels_scratch,
                         std::shared_ptr<Buffer>* def_levels_out,
                         std::shared_ptr<Buffer>* rep_levels_out,
@@ -135,18 +157,7 @@ class LevelBuilder {
     *values_offset = min_offset_idx_;
     *values_array = values_array_;
 
-    // Walk downwards to extract nullability
-    std::shared_ptr<Field> current_field = field;
-    nullable_.push_back(current_field->nullable());
-    while (current_field->type()->num_children() > 0) {
-      if (current_field->type()->num_children() > 1) {
-        return Status::NotImplemented(
-            "Fields with more than one child are not supported.");
-      } else {
-        current_field = current_field->type()->child(0);
-      }
-      nullable_.push_back(current_field->nullable());
-    }
+    RETURN_NOT_OK(ExtractNullability());
 
     // Generate the levels.
     if (nullable_.size() == 1) {
@@ -264,11 +275,14 @@ class LevelBuilder {
   Int16BufferBuilder def_levels_;
   Int16BufferBuilder rep_levels_;
 
+  const SchemaField* schema_field_;
+  const SchemaManifest* schema_manifest_;
+
   std::vector<int64_t> null_counts_;
   std::vector<const uint8_t*> valid_bitmaps_;
   std::vector<const int32_t*> offsets_;
   std::vector<int32_t> array_offsets_;
-  std::vector<bool> nullable_;
+  std::deque<bool> nullable_;
 
   int64_t min_offset_idx_;
   int64_t max_offset_idx_;
@@ -319,8 +333,12 @@ Status GetLeafType(const ::arrow::DataType& type, ::arrow::Type::type* leaf_type
 class ArrowColumnWriter {
  public:
   ArrowColumnWriter(ColumnWriterContext* ctx, ColumnWriter* column_writer,
-                    const std::shared_ptr<Field>& field)
-      : ctx_(ctx), writer_(column_writer), field_(field) {}
+                    const SchemaField* schema_field,
+                    const SchemaManifest* schema_manifest)
+      : ctx_(ctx),
+        writer_(column_writer),
+        schema_field_(schema_field),
+        schema_manifest_(schema_manifest) {}
 
   Status Write(const Array& data);
 
@@ -430,7 +448,8 @@ class ArrowColumnWriter {
 
   ColumnWriterContext* ctx_;
   ColumnWriter* writer_;
-  std::shared_ptr<Field> field_;
+  const SchemaField* schema_field_;
+  const SchemaManifest* schema_manifest_;
 };
 
 template <typename ParquetType, typename ArrowType>
@@ -945,11 +964,10 @@ Status ArrowColumnWriter::Write(const Array& data) {
   int64_t values_offset = 0;
   int64_t num_levels = 0;
   int64_t num_values = 0;
-  LevelBuilder level_builder(ctx_->memory_pool);
-
+  LevelBuilder level_builder(ctx_->memory_pool, schema_field_, schema_manifest_);
   std::shared_ptr<Buffer> def_levels_buffer, rep_levels_buffer;
   RETURN_NOT_OK(level_builder.GenerateLevels(
-      data, field_, &values_offset, &num_values, &num_levels, ctx_->def_levels_buffer,
+      data, &values_offset, &num_values, &num_levels, ctx_->def_levels_buffer,
       &def_levels_buffer, &rep_levels_buffer, &_values_array));
   const int16_t* def_levels = nullptr;
   if (def_levels_buffer) {
@@ -1021,6 +1039,11 @@ class FileWriter::Impl {
         arrow_properties_(arrow_properties),
         closed_(false) {}
 
+  Status Init() {
+    return BuildSchemaManifest(writer_->schema(), default_arrow_reader_properties(),
+                               &schema_manifest_);
+  }
+
   Status NewRowGroup(int64_t chunk_size) {
     if (row_group_writer_ != nullptr) {
       PARQUET_CATCH_NOT_OK(row_group_writer_->Close());
@@ -1075,17 +1098,11 @@ class FileWriter::Impl {
     ColumnWriter* column_writer;
     PARQUET_CATCH_NOT_OK(column_writer = row_group_writer_->NextColumn());
 
-    // TODO(wesm): This trick to construct a schema for one Parquet root node
-    // will not work for arbitrary nested data
-    int current_column_idx = row_group_writer_->current_column();
-    std::shared_ptr<::arrow::Schema> arrow_schema;
-    RETURN_NOT_OK(FromParquetSchema(writer_->schema(), {current_column_idx - 1},
-                                    default_arrow_reader_properties(),
-                                    writer_->key_value_metadata(), &arrow_schema));
-
-    ArrowColumnWriter arrow_writer(&column_write_context_, column_writer,
-                                   arrow_schema->field(0));
-
+    const SchemaField* schema_field;
+    RETURN_NOT_OK(schema_manifest_.GetColumnField(row_group_writer_->current_column(),
+                                                  &schema_field));
+    ArrowColumnWriter arrow_writer(&column_write_context_, column_writer, schema_field,
+                                   &schema_manifest_);
     RETURN_NOT_OK(arrow_writer.Write(*data, offset, size));
     return arrow_writer.Close();
   }
@@ -1101,6 +1118,8 @@ class FileWriter::Impl {
  private:
   friend class FileWriter;
 
+  SchemaManifest schema_manifest_;
+
   std::unique_ptr<ParquetFileWriter> writer_;
   RowGroupWriter* row_group_writer_;
   ColumnWriterContext column_write_context_;
@@ -1135,6 +1154,15 @@ const std::shared_ptr<FileMetaData> FileWriter::metadata() const {
 
 FileWriter::~FileWriter() {}
 
+Status FileWriter::Make(::arrow::MemoryPool* pool,
+                        std::unique_ptr<ParquetFileWriter> writer,
+                        const std::shared_ptr<::arrow::Schema>& schema,
+                        const std::shared_ptr<ArrowWriterProperties>& arrow_properties,
+                        std::unique_ptr<FileWriter>* out) {
+  out->reset(new FileWriter(pool, std::move(writer), schema, arrow_properties));
+  return (*out)->impl_->Init();
+}
+
 FileWriter::FileWriter(MemoryPool* pool, std::unique_ptr<ParquetFileWriter> writer,
                        const std::shared_ptr<::arrow::Schema>& schema,
                        const std::shared_ptr<ArrowWriterProperties>& arrow_properties)
@@ -1163,9 +1191,7 @@ Status FileWriter::Open(const ::arrow::Schema& schema, ::arrow::MemoryPool* pool
       ParquetFileWriter::Open(sink, schema_node, properties, schema.metadata());
 
   auto schema_ptr = std::make_shared<::arrow::Schema>(schema);
-  writer->reset(
-      new FileWriter(pool, std::move(base_writer), schema_ptr, arrow_properties));
-  return Status::OK();
+  return Make(pool, std::move(base_writer), schema_ptr, arrow_properties, writer);
 }
 
 Status WriteFileMetaData(const FileMetaData& file_metadata,
diff --git a/cpp/src/parquet/arrow/writer.h b/cpp/src/parquet/arrow/writer.h
index 2b9d892..8f7d499 100644
--- a/cpp/src/parquet/arrow/writer.h
+++ b/cpp/src/parquet/arrow/writer.h
@@ -128,10 +128,11 @@ std::shared_ptr<ArrowWriterProperties> PARQUET_EXPORT default_arrow_writer_prope
  */
 class PARQUET_EXPORT FileWriter {
  public:
-  FileWriter(::arrow::MemoryPool* pool, std::unique_ptr<ParquetFileWriter> writer,
-             const std::shared_ptr<::arrow::Schema>& schema,
-             const std::shared_ptr<ArrowWriterProperties>& arrow_properties =
-                 default_arrow_writer_properties());
+  static ::arrow::Status Make(
+      ::arrow::MemoryPool* pool, std::unique_ptr<ParquetFileWriter> writer,
+      const std::shared_ptr<::arrow::Schema>& schema,
+      const std::shared_ptr<ArrowWriterProperties>& arrow_properties,
+      std::unique_ptr<FileWriter>* out);
 
   static ::arrow::Status Open(const ::arrow::Schema& schema, ::arrow::MemoryPool* pool,
                               const std::shared_ptr<::arrow::io::OutputStream>& sink,
@@ -164,6 +165,10 @@ class PARQUET_EXPORT FileWriter {
   const std::shared_ptr<FileMetaData> metadata() const;
 
  private:
+  FileWriter(::arrow::MemoryPool* pool, std::unique_ptr<ParquetFileWriter> writer,
+             const std::shared_ptr<::arrow::Schema>& schema,
+             const std::shared_ptr<ArrowWriterProperties>& arrow_properties);
+
   class PARQUET_NO_EXPORT Impl;
   std::unique_ptr<Impl> impl_;
   std::shared_ptr<::arrow::Schema> schema_;
diff --git a/cpp/src/parquet/file_writer.cc b/cpp/src/parquet/file_writer.cc
index a741cfa..22c75fa 100644
--- a/cpp/src/parquet/file_writer.cc
+++ b/cpp/src/parquet/file_writer.cc
@@ -87,7 +87,7 @@ class RowGroupSerializer : public RowGroupWriter::Contents {
         properties_(properties),
         total_bytes_written_(0),
         closed_(false),
-        current_column_index_(0),
+        next_column_index_(0),
         num_rows_(0),
         buffered_row_group_(buffered_row_group) {
     if (buffered_row_group) {
@@ -122,7 +122,7 @@ class RowGroupSerializer : public RowGroupWriter::Contents {
       total_bytes_written_ += column_writers_[0]->Close();
     }
 
-    ++current_column_index_;
+    ++next_column_index_;
 
     const ColumnDescriptor* column_descr = col_meta->descr();
     std::unique_ptr<PageWriter> pager =
@@ -192,7 +192,7 @@ class RowGroupSerializer : public RowGroupWriter::Contents {
   const WriterProperties* properties_;
   int64_t total_bytes_written_;
   bool closed_;
-  int current_column_index_;
+  int next_column_index_;
   mutable int64_t num_rows_;
   bool buffered_row_group_;
 
@@ -203,7 +203,7 @@ class RowGroupSerializer : public RowGroupWriter::Contents {
       if (num_rows_ == 0) {
         num_rows_ = current_col_rows;
       } else if (num_rows_ != current_col_rows) {
-        ThrowRowsMisMatchError(current_column_index_, current_col_rows, num_rows_);
+        ThrowRowsMisMatchError(next_column_index_, current_col_rows, num_rows_);
       }
     } else if (buffered_row_group_ &&
                column_writers_.size() > 0) {  // when buffered_row_group = true
diff --git a/cpp/src/parquet/metadata.cc b/cpp/src/parquet/metadata.cc
index bee818d..5410dc8 100644
--- a/cpp/src/parquet/metadata.cc
+++ b/cpp/src/parquet/metadata.cc
@@ -778,32 +778,32 @@ class RowGroupMetaDataBuilder::RowGroupMetaDataBuilderImpl {
  public:
   explicit RowGroupMetaDataBuilderImpl(const std::shared_ptr<WriterProperties>& props,
                                        const SchemaDescriptor* schema, void* contents)
-      : properties_(props), schema_(schema), current_column_(0) {
+      : properties_(props), schema_(schema), next_column_(0) {
     row_group_ = reinterpret_cast<format::RowGroup*>(contents);
     InitializeColumns(schema->num_columns());
   }
 
   ColumnChunkMetaDataBuilder* NextColumnChunk() {
-    if (!(current_column_ < num_columns())) {
+    if (!(next_column_ < num_columns())) {
       std::stringstream ss;
       ss << "The schema only has " << num_columns()
-         << " columns, requested metadata for column: " << current_column_;
+         << " columns, requested metadata for column: " << next_column_;
       throw ParquetException(ss.str());
     }
-    auto column = schema_->Column(current_column_);
+    auto column = schema_->Column(next_column_);
     auto column_builder = ColumnChunkMetaDataBuilder::Make(
-        properties_, column, &row_group_->columns[current_column_++]);
+        properties_, column, &row_group_->columns[next_column_++]);
     auto column_builder_ptr = column_builder.get();
     column_builders_.push_back(std::move(column_builder));
     return column_builder_ptr;
   }
 
-  int current_column() { return current_column_; }
+  int current_column() { return next_column_ - 1; }
 
   void Finish(int64_t total_bytes_written) {
-    if (!(current_column_ == schema_->num_columns())) {
+    if (!(next_column_ == schema_->num_columns())) {
       std::stringstream ss;
-      ss << "Only " << current_column_ - 1 << " out of " << schema_->num_columns()
+      ss << "Only " << next_column_ - 1 << " out of " << schema_->num_columns()
          << " columns are initialized";
       throw ParquetException(ss.str());
     }
@@ -836,7 +836,7 @@ class RowGroupMetaDataBuilder::RowGroupMetaDataBuilderImpl {
   const std::shared_ptr<WriterProperties> properties_;
   const SchemaDescriptor* schema_;
   std::vector<std::unique_ptr<ColumnChunkMetaDataBuilder>> column_builders_;
-  int current_column_;
+  int next_column_;
 };
 
 std::unique_ptr<RowGroupMetaDataBuilder> RowGroupMetaDataBuilder::Make(
diff --git a/cpp/src/parquet/schema-internal.h b/cpp/src/parquet/schema-internal.h
index ad5ae44..9bcdc5d 100644
--- a/cpp/src/parquet/schema-internal.h
+++ b/cpp/src/parquet/schema-internal.h
@@ -39,57 +39,6 @@ class SchemaElement;
 
 namespace schema {
 
-inline bool str_endswith_tuple(const std::string& str) {
-  if (str.size() >= 6) {
-    return str.substr(str.size() - 6, 6) == "_tuple";
-  }
-  return false;
-}
-
-// Special case mentioned in the format spec:
-//   If the name is array or ends in _tuple, this should be a list of struct
-//   even for single child elements.
-inline bool HasStructListName(const GroupNode& node) {
-  return (node.name() == "array" || str_endswith_tuple(node.name()));
-}
-
-// TODO(itaiin): This aux. function is to be deleted once repeated structs are supported
-inline bool IsSimpleStruct(const Node* node) {
-  if (!node->is_group()) return false;
-  if (node->is_repeated()) return false;
-  if (node->converted_type() == ConvertedType::LIST) return false;
-  // Special case mentioned in the format spec:
-  //   If the name is array or ends in _tuple, this should be a list of struct
-  //   even for single child elements.
-  auto group = static_cast<const GroupNode*>(node);
-  if (group->field_count() == 1 && HasStructListName(*group)) return false;
-
-  return true;
-}
-
-// Coalesce a list of schema fields indices which are the roots of the
-// columns referred by a list of column indices
-inline bool ColumnIndicesToFieldIndices(const SchemaDescriptor& descr,
-                                        const std::vector<int>& column_indices,
-                                        std::vector<int>* out) {
-  const GroupNode* group = descr.group_node();
-  std::unordered_set<int> already_added;
-  out->clear();
-  for (auto& column_idx : column_indices) {
-    auto field_node = descr.GetColumnRoot(column_idx);
-    auto field_idx = group->FieldIndex(*field_node);
-    if (field_idx < 0) {
-      return false;
-    }
-    auto insertion = already_added.insert(field_idx);
-    if (insertion.second) {
-      out->push_back(field_idx);
-    }
-  }
-
-  return true;
-}
-
 // ----------------------------------------------------------------------
 // Conversion from Parquet Thrift metadata
 
diff --git a/cpp/src/parquet/schema.cc b/cpp/src/parquet/schema.cc
index 71aa919..e961127 100644
--- a/cpp/src/parquet/schema.cc
+++ b/cpp/src/parquet/schema.cc
@@ -857,6 +857,9 @@ void SchemaDescriptor::BuildTree(const NodePtr& node, int16_t max_def_level,
       BuildTree(group->field(i), max_def_level, max_rep_level, base);
     }
   } else {
+    node_to_leaf_index_[static_cast<const PrimitiveNode*>(node.get())] =
+        static_cast<int>(leaves_.size());
+
     // Primitive node, append to leaves
     leaves_.push_back(ColumnDescriptor(node, max_def_level, max_rep_level, this));
     leaf_to_base_.emplace(static_cast<int>(leaves_.size()) - 1, base);
@@ -865,6 +868,14 @@ void SchemaDescriptor::BuildTree(const NodePtr& node, int16_t max_def_level,
   }
 }
 
+int SchemaDescriptor::GetColumnIndex(const PrimitiveNode& node) const {
+  auto it = node_to_leaf_index_.find(&node);
+  if (it == node_to_leaf_index_.end()) {
+    return -1;
+  }
+  return it->second;
+}
+
 ColumnDescriptor::ColumnDescriptor(const schema::NodePtr& node,
                                    int16_t max_definition_level,
                                    int16_t max_repetition_level,
diff --git a/cpp/src/parquet/schema.h b/cpp/src/parquet/schema.h
index 566d5fb..8fb3a54 100644
--- a/cpp/src/parquet/schema.h
+++ b/cpp/src/parquet/schema.h
@@ -430,6 +430,10 @@ class PARQUET_EXPORT SchemaDescriptor {
 
   void updateColumnOrders(const std::vector<ColumnOrder>& column_orders);
 
+  /// \brief Return column index corresponding to a particular
+  /// PrimitiveNode. Returns -1 if not found
+  int GetColumnIndex(const schema::PrimitiveNode& node) const;
+
  private:
   friend class ColumnDescriptor;
 
@@ -444,6 +448,8 @@ class PARQUET_EXPORT SchemaDescriptor {
   // Result of leaf node / tree analysis
   std::vector<ColumnDescriptor> leaves_;
 
+  std::unordered_map<const schema::PrimitiveNode*, int> node_to_leaf_index_;
+
   // Mapping between leaf nodes and root group of leaf (first node
   // below the schema's root group)
   //
diff --git a/python/pyarrow/_parquet.pxd b/python/pyarrow/_parquet.pxd
index 270bb61..3a9dc9b 100644
--- a/python/pyarrow/_parquet.pxd
+++ b/python/pyarrow/_parquet.pxd
@@ -390,14 +390,14 @@ cdef extern from "parquet/arrow/reader.h" namespace "parquet::arrow" nogil:
 
         void set_use_threads(c_bool use_threads)
 
-
-cdef extern from "parquet/arrow/schema.h" namespace "parquet::arrow" nogil:
     CStatus FromParquetSchema(
         const SchemaDescriptor* parquet_schema,
         const ArrowReaderProperties& properties,
         const shared_ptr[const CKeyValueMetadata]& key_value_metadata,
         shared_ptr[CSchema]* out)
 
+cdef extern from "parquet/arrow/schema.h" namespace "parquet::arrow" nogil:
+
     CStatus ToParquetSchema(
         const CSchema* arrow_schema,
         const ArrowReaderProperties& properties,
diff --git a/r/R/arrowExports.R b/r/R/arrowExports.R
index 2031fea..7af16a7 100644
--- a/r/R/arrowExports.R
+++ b/r/R/arrowExports.R
@@ -752,12 +752,8 @@ write_parquet_file <- function(table, filename){
     invisible(.Call(`_arrow_write_parquet_file` , table, filename))
 }
 
-parquet___arrow___FileReader__GetSchema2 <- function(reader, indices){
-    .Call(`_arrow_parquet___arrow___FileReader__GetSchema2` , reader, indices)
-}
-
-parquet___arrow___FileReader__GetSchema1 <- function(reader){
-    .Call(`_arrow_parquet___arrow___FileReader__GetSchema1` , reader)
+parquet___arrow___FileReader__GetSchema <- function(reader){
+    .Call(`_arrow_parquet___arrow___FileReader__GetSchema` , reader)
 }
 
 RecordBatch__num_columns <- function(x){
diff --git a/r/R/parquet.R b/r/R/parquet.R
index 2647536..b75f93e 100644
--- a/r/R/parquet.R
+++ b/r/R/parquet.R
@@ -25,18 +25,13 @@
       if(quo_is_null(col_select)) {
         shared_ptr(`arrow::Table`, parquet___arrow___FileReader__ReadTable1(self))
       } else {
-        all_vars <- shared_ptr(`arrow::Schema`, parquet___arrow___FileReader__GetSchema1(self))$names
+        all_vars <- shared_ptr(`arrow::Schema`, parquet___arrow___FileReader__GetSchema(self))$names
         indices <- match(vars_select(all_vars, !!col_select), all_vars) - 1L
         shared_ptr(`arrow::Table`, parquet___arrow___FileReader__ReadTable2(self, indices))
       }
     },
-    GetSchema = function(column_indices = NULL) {
-      if (is.null(column_indices)) {
-        shared_ptr(`arrow::Schema`, parquet___arrow___FileReader__GetSchema1(self))
-      } else {
-        shared_ptr(`arrow::Schema`, parquet___arrow___FileReader__GetSchema2(self, column_indices))
-      }
-
+    GetSchema = function() {
+      shared_ptr(`arrow::Schema`, parquet___arrow___FileReader__GetSchema(self))
     }
   )
 )
diff --git a/r/src/arrowExports.cpp b/r/src/arrowExports.cpp
index bcb0ac5..8cfaa77 100644
--- a/r/src/arrowExports.cpp
+++ b/r/src/arrowExports.cpp
@@ -2900,32 +2900,16 @@ RcppExport SEXP _arrow_write_parquet_file(SEXP table_sexp, SEXP filename_sexp){
 
 // parquet.cpp
 #if defined(ARROW_R_WITH_ARROW)
-std::shared_ptr<arrow::Schema> parquet___arrow___FileReader__GetSchema2(const std::unique_ptr<parquet::arrow::FileReader>& reader, const std::vector<int>& indices);
-RcppExport SEXP _arrow_parquet___arrow___FileReader__GetSchema2(SEXP reader_sexp, SEXP indices_sexp){
+std::shared_ptr<arrow::Schema> parquet___arrow___FileReader__GetSchema(const std::unique_ptr<parquet::arrow::FileReader>& reader);
+RcppExport SEXP _arrow_parquet___arrow___FileReader__GetSchema(SEXP reader_sexp){
 BEGIN_RCPP
 	Rcpp::traits::input_parameter<const std::unique_ptr<parquet::arrow::FileReader>&>::type reader(reader_sexp);
-	Rcpp::traits::input_parameter<const std::vector<int>&>::type indices(indices_sexp);
-	return Rcpp::wrap(parquet___arrow___FileReader__GetSchema2(reader, indices));
+	return Rcpp::wrap(parquet___arrow___FileReader__GetSchema(reader));
 END_RCPP
 }
 #else
-RcppExport SEXP _arrow_parquet___arrow___FileReader__GetSchema2(SEXP reader_sexp, SEXP indices_sexp){
-	Rf_error("Cannot call parquet___arrow___FileReader__GetSchema2(). Please use arrow::install_arrow() to install required runtime libraries. ");
-}
-#endif
-
-// parquet.cpp
-#if defined(ARROW_R_WITH_ARROW)
-std::shared_ptr<arrow::Schema> parquet___arrow___FileReader__GetSchema1(const std::unique_ptr<parquet::arrow::FileReader>& reader);
-RcppExport SEXP _arrow_parquet___arrow___FileReader__GetSchema1(SEXP reader_sexp){
-BEGIN_RCPP
-	Rcpp::traits::input_parameter<const std::unique_ptr<parquet::arrow::FileReader>&>::type reader(reader_sexp);
-	return Rcpp::wrap(parquet___arrow___FileReader__GetSchema1(reader));
-END_RCPP
-}
-#else
-RcppExport SEXP _arrow_parquet___arrow___FileReader__GetSchema1(SEXP reader_sexp){
-	Rf_error("Cannot call parquet___arrow___FileReader__GetSchema1(). Please use arrow::install_arrow() to install required runtime libraries. ");
+RcppExport SEXP _arrow_parquet___arrow___FileReader__GetSchema(SEXP reader_sexp){
+	Rf_error("Cannot call parquet___arrow___FileReader__GetSchema(). Please use arrow::install_arrow() to install required runtime libraries. ");
 }
 #endif
 
@@ -3904,8 +3888,7 @@ static const R_CallMethodDef CallEntries[] = {
 		{ "_arrow_parquet___arrow___FileReader__ReadTable1", (DL_FUNC) &_arrow_parquet___arrow___FileReader__ReadTable1, 1}, 
 		{ "_arrow_parquet___arrow___FileReader__ReadTable2", (DL_FUNC) &_arrow_parquet___arrow___FileReader__ReadTable2, 2}, 
 		{ "_arrow_write_parquet_file", (DL_FUNC) &_arrow_write_parquet_file, 2}, 
-		{ "_arrow_parquet___arrow___FileReader__GetSchema2", (DL_FUNC) &_arrow_parquet___arrow___FileReader__GetSchema2, 2}, 
-		{ "_arrow_parquet___arrow___FileReader__GetSchema1", (DL_FUNC) &_arrow_parquet___arrow___FileReader__GetSchema1, 1}, 
+		{ "_arrow_parquet___arrow___FileReader__GetSchema", (DL_FUNC) &_arrow_parquet___arrow___FileReader__GetSchema, 1}, 
 		{ "_arrow_RecordBatch__num_columns", (DL_FUNC) &_arrow_RecordBatch__num_columns, 1}, 
 		{ "_arrow_RecordBatch__num_rows", (DL_FUNC) &_arrow_RecordBatch__num_rows, 1}, 
 		{ "_arrow_RecordBatch__schema", (DL_FUNC) &_arrow_RecordBatch__schema, 1}, 
diff --git a/r/src/parquet.cpp b/r/src/parquet.cpp
index 9ad1438..5124e9e 100644
--- a/r/src/parquet.cpp
+++ b/r/src/parquet.cpp
@@ -94,26 +94,11 @@ void write_parquet_file(const std::shared_ptr<arrow::Table>& table,
 }
 
 // [[arrow::export]]
-std::shared_ptr<arrow::Schema> parquet___arrow___FileReader__GetSchema2(
-    const std::unique_ptr<parquet::arrow::FileReader>& reader,
-    const std::vector<int>& indices) {
+std::shared_ptr<arrow::Schema> parquet___arrow___FileReader__GetSchema(
+    const std::unique_ptr<parquet::arrow::FileReader>& reader) {
   std::shared_ptr<arrow::Schema> schema;
-  STOP_IF_NOT_OK(reader->GetSchema(indices, &schema));
+  STOP_IF_NOT_OK(reader->GetSchema(&schema));
   return schema;
 }
 
-// [[arrow::export]]
-std::shared_ptr<arrow::Schema> parquet___arrow___FileReader__GetSchema1(
-    const std::unique_ptr<parquet::arrow::FileReader>& reader) {
-  // FileReader does not have this exposed
-  // std::shared_ptr<arrow::Schema> schema;
-  // STOP_IF_NOT_OK(reader->GetSchema(&schema));
-
-  // so going indirectly about it
-  std::shared_ptr<arrow::RecordBatchReader> record_batch_reader;
-  STOP_IF_NOT_OK(reader->GetRecordBatchReader({}, &record_batch_reader));
-
-  return record_batch_reader->schema();
-}
-
 #endif