You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@parquet.apache.org by uw...@apache.org on 2017/06/22 16:14:41 UTC

parquet-cpp git commit: PARQUET-911: [C++] Support nested structs in parquet_arrow

Repository: parquet-cpp
Updated Branches:
  refs/heads/master 99759a38b -> 29ed01ea7


PARQUET-911: [C++] Support nested structs in parquet_arrow

Support for simple StructArray reads.
Not supported in conjunction with lists yet.

Author: Itai Incze <it...@gmail.com>

Closes #312 from itaiin/PARQUET-911 and squashes the following commits:

beb16f5 [Itai Incze] document ReadSchemaField API and fix Appveyor errors
9fe3038 [Itai Incze] review changes + BooleanArray bugfix
420bb76 [Itai Incze] fix struct field type bug + minor changes
c23f3fb [Itai Incze] Refactor per code reviews
11a12c3 [Itai Incze] fix msvc compiler errors
283f08d [Itai Incze] fix schema field ordering on partial read
aeb1384 [Itai Incze] fix per code review
8bd47e8 [Itai Incze] Fix column index bug and extend tests
01d69ee [Itai Incze] Fix osx compiler c-array init error
14cbef8 [Itai Incze] support for simple StructArray reads. Not supported in conjunction with lists yet.


Project: http://git-wip-us.apache.org/repos/asf/parquet-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/parquet-cpp/commit/29ed01ea
Tree: http://git-wip-us.apache.org/repos/asf/parquet-cpp/tree/29ed01ea
Diff: http://git-wip-us.apache.org/repos/asf/parquet-cpp/diff/29ed01ea

Branch: refs/heads/master
Commit: 29ed01ea70fb5eba22b6df547c33fca047e08031
Parents: 99759a3
Author: Itai Incze <it...@gmail.com>
Authored: Thu Jun 22 18:14:35 2017 +0200
Committer: Uwe L. Korn <uw...@apache.org>
Committed: Thu Jun 22 18:14:35 2017 +0200

----------------------------------------------------------------------
 src/parquet/arrow/arrow-reader-writer-test.cc | 327 +++++++++++++++++--
 src/parquet/arrow/reader.cc                   | 350 ++++++++++++++++++---
 src/parquet/arrow/reader.h                    |  40 ++-
 src/parquet/arrow/schema.cc                   |   9 +-
 src/parquet/arrow/test-util.h                 |   2 +-
 src/parquet/column/column-reader-test.cc      |  13 +-
 src/parquet/column/reader.h                   |  52 ++-
 src/parquet/schema-test.cc                    |  26 ++
 src/parquet/schema.cc                         |  80 ++++-
 src/parquet/schema.h                          |  22 ++
 src/parquet/util/schema-util.h                |  84 +++++
 11 files changed, 904 insertions(+), 101 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/29ed01ea/src/parquet/arrow/arrow-reader-writer-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/arrow/arrow-reader-writer-test.cc b/src/parquet/arrow/arrow-reader-writer-test.cc
index b9c77f1..16dddb0 100644
--- a/src/parquet/arrow/arrow-reader-writer-test.cc
+++ b/src/parquet/arrow/arrow-reader-writer-test.cc
@@ -50,6 +50,7 @@ using arrow::PrimitiveArray;
 using arrow::Status;
 using arrow::Table;
 using arrow::TimeUnit;
+using arrow::ArrayVisitor;
 
 using ArrowId = ::arrow::Type;
 using ParquetType = parquet::Type;
@@ -387,7 +388,7 @@ class TestParquetIO : public ::testing::Test {
     // Also test that slice offsets are respected
     values = values->Slice(5, values->length() - 5);
     std::shared_ptr<ListArray> lists;
-    ASSERT_OK(MakeListArary(
+    ASSERT_OK(MakeListArray(
         values, size, nullable_lists ? null_count : 0, nullable_elements, &lists));
     *out = MakeSimpleTable(lists->Slice(3, size - 6), nullable_lists);
   }
@@ -399,10 +400,10 @@ class TestParquetIO : public ::testing::Test {
     ASSERT_OK(NullableArray<TestType>(
         size * 6, nullable_elements ? null_count : 0, kDefaultSeed, &values));
     std::shared_ptr<ListArray> lists;
-    ASSERT_OK(MakeListArary(
+    ASSERT_OK(MakeListArray(
         values, size * 3, nullable_lists ? null_count : 0, nullable_elements, &lists));
     std::shared_ptr<ListArray> parent_lists;
-    ASSERT_OK(MakeListArary(lists, size, nullable_parent_lists ? null_count : 0,
+    ASSERT_OK(MakeListArray(lists, size, nullable_parent_lists ? null_count : 0,
         nullable_lists, &parent_lists));
     *out = MakeSimpleTable(parent_lists, nullable_parent_lists);
   }
@@ -1080,22 +1081,17 @@ TEST(TestArrowWrite, CheckChunkSize) {
       Invalid, WriteTable(*table, ::arrow::default_memory_pool(), sink, chunk_size));
 }
 
-class TestNestedSchemaRead : public ::testing::Test {
+class TestNestedSchemaRead : public ::testing::TestWithParam<Repetition::type> {
  protected:
-  virtual void SetUp() {
-    // We are using parquet low-level file api to create the nested parquet
-    CreateNestedParquet();
-    InitReader(&reader_);
-  }
+  // make it *3 to make it easily divisible by 3
+  const int NUM_SIMPLE_TEST_ROWS = SMALL_SIZE * 3;
+  std::shared_ptr<::arrow::Int32Array> values_array_ = nullptr;
 
-  void InitReader(std::shared_ptr<FileReader>* out) {
+  void InitReader() {
     std::shared_ptr<Buffer> buffer = nested_parquet_->GetBuffer();
-    std::unique_ptr<FileReader> reader;
     ASSERT_OK_NO_THROW(
         OpenFile(std::make_shared<BufferReader>(buffer), ::arrow::default_memory_pool(),
-            ::parquet::default_reader_properties(), nullptr, &reader));
-
-    *out = std::move(reader);
+            ::parquet::default_reader_properties(), nullptr, &reader_));
   }
 
   void InitNewParquetFile(const std::shared_ptr<GroupNode>& schema, int num_rows) {
@@ -1110,61 +1106,330 @@ class TestNestedSchemaRead : public ::testing::Test {
     writer_->Close();
   }
 
-  void CreateNestedParquet() {
+  void MakeValues(int num_rows) {
+    std::shared_ptr<Array> arr;
+    ASSERT_OK(NullableArray<::arrow::Int32Type>(num_rows, 0, kDefaultSeed, &arr));
+    values_array_ = std::dynamic_pointer_cast<::arrow::Int32Array>(arr);
+  }
+
+  void WriteColumnData(size_t num_rows, int16_t* def_levels,
+      int16_t* rep_levels, int32_t* values) {
+    auto typed_writer = static_cast<TypedColumnWriter<Int32Type>*>(
+      row_group_writer_->NextColumn());
+    typed_writer->WriteBatch(num_rows, def_levels, rep_levels, values);
+  }
+
+  void ValidateArray(const Array& array, size_t expected_nulls) {
+    ASSERT_EQ(array.length(), values_array_->length());
+    ASSERT_EQ(array.null_count(), expected_nulls);
+    // Also independently count the nulls
+    auto local_null_count = 0;
+    for (int i = 0; i < array.length(); i++) {
+      if (array.IsNull(i)) {
+        local_null_count++;
+      }
+    }
+    ASSERT_EQ(local_null_count, expected_nulls);
+  }
+
+  void ValidateColumnArray(const ::arrow::Int32Array& array,
+      size_t expected_nulls) {
+    ValidateArray(array, expected_nulls);
+
+    int j = 0;
+    for (int i = 0; i < values_array_->length(); i++) {
+      if (array.IsNull(i)) {
+        continue;
+      }
+      ASSERT_EQ(array.Value(i), values_array_->Value(j));
+      j++;
+    }
+  }
+
+  void ValidateTableArrayTypes(const Table& table) {
+    for (int i = 0; i < table.num_columns(); i++) {
+      const std::shared_ptr<::arrow::Field> schema_field = table.schema()->field(i);
+      const std::shared_ptr<Column> column = table.column(i);
+      // Compare with the column field
+      ASSERT_TRUE(schema_field->Equals(column->field()));
+      // Compare with the array type
+      ASSERT_TRUE(schema_field->type()->Equals(column->data()->chunk(0)->type()));
+    }
+  }
+
+  // A parquet with a simple nested schema
+  void CreateSimpleNestedParquet(Repetition::type struct_repetition) {
     std::vector<NodePtr> parquet_fields;
-    std::shared_ptr<Array> values;
+    // TODO(itaiin): We are using parquet low-level file api to create the nested parquet
+    // this needs to change when a nested writes are implemented
 
     // create the schema:
-    // required group group1 {
+    // <struct_repetition> group group1 {
     //   required int32 leaf1;
-    //   required int32 leaf2;
+    //   optional int32 leaf2;
     // }
     // required int32 leaf3;
 
-    parquet_fields.push_back(GroupNode::Make("group1", Repetition::REQUIRED,
+    parquet_fields.push_back(GroupNode::Make("group1", struct_repetition,
         {PrimitiveNode::Make("leaf1", Repetition::REQUIRED, ParquetType::INT32),
-            PrimitiveNode::Make("leaf2", Repetition::REQUIRED, ParquetType::INT32)}));
+         PrimitiveNode::Make("leaf2", Repetition::OPTIONAL, ParquetType::INT32)}));
     parquet_fields.push_back(
         PrimitiveNode::Make("leaf3", Repetition::REQUIRED, ParquetType::INT32));
 
-    const int num_columns = 3;
     auto schema_node = GroupNode::Make("schema", Repetition::REQUIRED, parquet_fields);
 
-    InitNewParquetFile(std::static_pointer_cast<GroupNode>(schema_node), 0);
+    // Create definition levels for the different columns that contain interleaved
+    // nulls and values at all nesting levels
+
+    //  definition levels for optional fields
+    std::vector<int16_t> leaf1_def_levels(NUM_SIMPLE_TEST_ROWS);
+    std::vector<int16_t> leaf2_def_levels(NUM_SIMPLE_TEST_ROWS);
+    std::vector<int16_t> leaf3_def_levels(NUM_SIMPLE_TEST_ROWS);
+    for (int i = 0; i < NUM_SIMPLE_TEST_ROWS; i++)  {
+      // leaf1 is required within the optional group1, so it is only null
+      // when the group is null
+      leaf1_def_levels[i] = (i % 3 == 0) ? 0 : 1;
+      // leaf2 is optional, can be null in the primitive (def-level 1) or
+      // struct level (def-level 0)
+      leaf2_def_levels[i] = i % 3;
+      // leaf3 is required
+      leaf3_def_levels[i] = 0;
+    }
 
-    for (int i = 0; i < num_columns; i++) {
-      auto column_writer = row_group_writer_->NextColumn();
-      auto typed_writer = reinterpret_cast<TypedColumnWriter<Int32Type>*>(column_writer);
-      typed_writer->WriteBatch(0, nullptr, nullptr, nullptr);
+    std::vector<int16_t> rep_levels(NUM_SIMPLE_TEST_ROWS, 0);
+
+    // Produce values for the columns
+    MakeValues(NUM_SIMPLE_TEST_ROWS);
+    int32_t* values = reinterpret_cast<int32_t*>(values_array_->data()->mutable_data());
+
+    // Create the actual parquet file
+    InitNewParquetFile(std::static_pointer_cast<GroupNode>(schema_node),
+      NUM_SIMPLE_TEST_ROWS);
+
+    // leaf1 column
+    WriteColumnData(NUM_SIMPLE_TEST_ROWS, leaf1_def_levels.data(),
+      rep_levels.data(), values);
+    // leaf2 column
+    WriteColumnData(NUM_SIMPLE_TEST_ROWS, leaf2_def_levels.data(),
+      rep_levels.data(), values);
+    // leaf3 column
+    WriteColumnData(NUM_SIMPLE_TEST_ROWS, leaf3_def_levels.data(),
+      rep_levels.data(), values);
+
+    FinalizeParquetFile();
+    InitReader();
+  }
+
+  NodePtr CreateSingleTypedNestedGroup(int index, int depth, int num_children,
+      Repetition::type node_repetition, ParquetType::type leaf_type) {
+    std::vector<NodePtr> children;
+
+    for (int i = 0; i < num_children; i++) {
+      if (depth <= 1) {
+        children.push_back(PrimitiveNode::Make("leaf",
+          node_repetition, leaf_type));
+      } else {
+        children.push_back(CreateSingleTypedNestedGroup(i, depth - 1, num_children,
+          node_repetition, leaf_type));
+      }
     }
 
+    std::stringstream ss;
+    ss << "group-" << depth << "-" << index;
+    return NodePtr(GroupNode::Make(ss.str(), node_repetition, children));
+  }
+
+  // A deeply nested schema
+  void CreateMultiLevelNestedParquet(int num_trees, int tree_depth,
+      int num_children, int num_rows, Repetition::type node_repetition) {
+    // Create the schema
+    std::vector<NodePtr> parquet_fields;
+    for (int i = 0; i < num_trees; i++) {
+      parquet_fields.push_back(CreateSingleTypedNestedGroup(i, tree_depth, num_children,
+        node_repetition, ParquetType::INT32));
+    }
+    auto schema_node = GroupNode::Make("schema", Repetition::REQUIRED, parquet_fields);
+
+    int num_columns = num_trees * static_cast<int>((std::pow(num_children, tree_depth)));
+
+    std::vector<int16_t> def_levels(num_rows);
+    std::vector<int16_t> rep_levels(num_rows);
+    for (int i = 0; i < num_rows; i++) {
+      if (node_repetition == Repetition::REQUIRED) {
+        def_levels[i] = 0; // all is required
+      } else {
+        def_levels[i] = i % tree_depth; // all is optional
+      }
+      rep_levels[i] = 0; // none is repeated
+    }
+
+    // Produce values for the columns
+    MakeValues(num_rows);
+    int32_t* values = reinterpret_cast<int32_t*>(values_array_->data()->mutable_data());
+
+    // Create the actual parquet file
+    InitNewParquetFile(std::static_pointer_cast<GroupNode>(schema_node), num_rows);
+
+    for (int i = 0; i < num_columns; i++) {
+      WriteColumnData(num_rows, def_levels.data(), rep_levels.data(), values);
+    }
     FinalizeParquetFile();
+    InitReader();
   }
 
+  class DeepParquetTestVisitor : public ArrayVisitor {
+   public:
+    DeepParquetTestVisitor(Repetition::type node_repetition,
+      std::shared_ptr<::arrow::Int32Array> expected) :
+      node_repetition_(node_repetition), expected_(expected) {}
+
+    Status Validate(std::shared_ptr<Array> tree) {
+      return tree->Accept(this);
+    }
+
+    virtual Status Visit(const ::arrow::Int32Array& array) {
+      if (node_repetition_ == Repetition::REQUIRED) {
+        if (!array.Equals(expected_)) {
+          return Status::Invalid("leaf array data mismatch");
+        }
+      } else if (node_repetition_ == Repetition::OPTIONAL) {
+        if (array.length() != expected_->length()) {
+          return Status::Invalid("Bad leaf array length");
+        }
+        // expect only 1 value every `depth` row
+        if (array.null_count() != SMALL_SIZE) {
+          return Status::Invalid("Unexpected null count");
+        }
+      } else {
+        return Status::NotImplemented("Unsupported repetition");
+      }
+      return Status::OK();
+    }
+
+    virtual Status Visit(const ::arrow::StructArray& array) {
+      for (auto& child : array.fields()) {
+        if (node_repetition_ == Repetition::REQUIRED) {
+          RETURN_NOT_OK(child->Accept(this));
+        } else if (node_repetition_ == Repetition::OPTIONAL) {
+          // Null count Must be a multiple of SMALL_SIZE
+          if (array.null_count() % SMALL_SIZE != 0) {
+            return Status::Invalid("Unexpected struct null count");
+          }
+        } else {
+          return Status::NotImplemented("Unsupported repetition");
+        }
+      }
+      return Status::OK();
+    }
+
+   private:
+    Repetition::type node_repetition_;
+    std::shared_ptr<::arrow::Int32Array> expected_;
+  };
+
   std::shared_ptr<InMemoryOutputStream> nested_parquet_;
-  std::shared_ptr<FileReader> reader_;
+  std::unique_ptr<FileReader> reader_;
   std::unique_ptr<ParquetFileWriter> writer_;
   RowGroupWriter* row_group_writer_;
 };
 
 TEST_F(TestNestedSchemaRead, ReadIntoTableFull) {
+  CreateSimpleNestedParquet(Repetition::OPTIONAL);
+
   std::shared_ptr<Table> table;
-  ASSERT_RAISES(NotImplemented, reader_->ReadTable(&table));
+  ASSERT_OK_NO_THROW(reader_->ReadTable(&table));
+  ASSERT_EQ(table->num_rows(), NUM_SIMPLE_TEST_ROWS);
+  ASSERT_EQ(table->num_columns(), 2);
+  ASSERT_EQ(table->schema()->field(0)->type()->num_children(), 2);
+  ValidateTableArrayTypes(*table);
+
+  auto struct_field_array = std::static_pointer_cast<::arrow::StructArray>(
+    table->column(0)->data()->chunk(0));
+  auto leaf1_array = std::static_pointer_cast<::arrow::Int32Array>(
+    struct_field_array->field(0));
+  auto leaf2_array = std::static_pointer_cast<::arrow::Int32Array>(
+    struct_field_array->field(1));
+  auto leaf3_array = std::static_pointer_cast<::arrow::Int32Array>(
+    table->column(1)->data()->chunk(0));
+
+  // validate struct and leaf arrays
+
+  // validate struct array
+  ValidateArray(*struct_field_array, NUM_SIMPLE_TEST_ROWS / 3);
+  // validate leaf1
+  ValidateColumnArray(*leaf1_array, NUM_SIMPLE_TEST_ROWS / 3);
+  // validate leaf2
+  ValidateColumnArray(*leaf2_array, NUM_SIMPLE_TEST_ROWS * 2/ 3);
+  // validate leaf3
+  ValidateColumnArray(*leaf3_array, 0);
 }
 
 TEST_F(TestNestedSchemaRead, ReadTablePartial) {
+  CreateSimpleNestedParquet(Repetition::OPTIONAL);
   std::shared_ptr<Table> table;
 
-  ASSERT_RAISES(NotImplemented, reader_->ReadTable({0, 2}, &table));
-  ASSERT_RAISES(NotImplemented, reader_->ReadTable({0, 1}, &table));
+  // columns: {group1.leaf1, leaf3}
+  ASSERT_OK_NO_THROW(reader_->ReadTable({0, 2}, &table));
+  ASSERT_EQ(table->num_rows(), NUM_SIMPLE_TEST_ROWS);
+  ASSERT_EQ(table->num_columns(), 2);
+  ASSERT_EQ(table->schema()->field(0)->name(), "group1");
+  ASSERT_EQ(table->schema()->field(1)->name(), "leaf3");
+  ASSERT_EQ(table->schema()->field(0)->type()->num_children(), 1);
+  ValidateTableArrayTypes(*table);
+
+  // columns: {group1.leaf1, group1.leaf2}
+  ASSERT_OK_NO_THROW(reader_->ReadTable({0, 1}, &table));
+  ASSERT_EQ(table->num_rows(), NUM_SIMPLE_TEST_ROWS);
+  ASSERT_EQ(table->num_columns(), 1);
+  ASSERT_EQ(table->schema()->field(0)->name(), "group1");
+  ASSERT_EQ(table->schema()->field(0)->type()->num_children(), 2);
+  ValidateTableArrayTypes(*table);
 
   // columns: {leaf3}
   ASSERT_OK_NO_THROW(reader_->ReadTable({2}, &table));
-  ASSERT_EQ(table->num_rows(), 0);
+  ASSERT_EQ(table->num_rows(), NUM_SIMPLE_TEST_ROWS);
   ASSERT_EQ(table->num_columns(), 1);
+  ASSERT_EQ(table->schema()->field(0)->name(), "leaf3");
   ASSERT_EQ(table->schema()->field(0)->type()->num_children(), 0);
+  ValidateTableArrayTypes(*table);
+
+  // Test with different ordering
+  ASSERT_OK_NO_THROW(reader_->ReadTable({2, 0}, &table));
+  ASSERT_EQ(table->num_rows(), NUM_SIMPLE_TEST_ROWS);
+  ASSERT_EQ(table->num_columns(), 2);
+  ASSERT_EQ(table->schema()->field(0)->name(), "leaf3");
+  ASSERT_EQ(table->schema()->field(1)->name(), "group1");
+  ASSERT_EQ(table->schema()->field(1)->type()->num_children(), 1);
+  ValidateTableArrayTypes(*table);
+}
+
+TEST_F(TestNestedSchemaRead, StructAndListTogetherUnsupported) {
+  CreateSimpleNestedParquet(Repetition::REPEATED);
+  std::shared_ptr<Table> table;
+  ASSERT_RAISES(NotImplemented, reader_->ReadTable(&table));
+}
+
+TEST_P(TestNestedSchemaRead, DeepNestedSchemaRead) {
+  const int num_trees = 10;
+  const int depth = 5;
+  const int num_children = 3;
+  int num_rows = SMALL_SIZE * depth;
+  CreateMultiLevelNestedParquet(num_trees, depth, num_children, num_rows, GetParam());
+  std::shared_ptr<Table> table;
+  ASSERT_OK_NO_THROW(reader_->ReadTable(&table));
+  ASSERT_EQ(table->num_columns(), num_trees);
+  ASSERT_EQ(table->num_rows(), num_rows);
+
+  DeepParquetTestVisitor visitor(GetParam(), values_array_);
+  for (int i = 0; i < table->num_columns(); i++) {
+    auto tree = table->column(i)->data()->chunk(0);
+    ASSERT_OK_NO_THROW(visitor.Validate(tree));
+  }
 }
 
+INSTANTIATE_TEST_CASE_P(Repetition_type, TestNestedSchemaRead,
+  ::testing::Values(Repetition::REQUIRED, Repetition::OPTIONAL));
+
 TEST(TestArrowReaderAdHoc, Int96BadMemoryAccess) {
   // PARQUET-995
   const char* data_dir = std::getenv("PARQUET_TEST_DATA");

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/29ed01ea/src/parquet/arrow/reader.cc
----------------------------------------------------------------------
diff --git a/src/parquet/arrow/reader.cc b/src/parquet/arrow/reader.cc
index a531454..7c1b381 100644
--- a/src/parquet/arrow/reader.cc
+++ b/src/parquet/arrow/reader.cc
@@ -28,6 +28,7 @@
 
 #include "parquet/arrow/schema.h"
 #include "parquet/util/bit-util.h"
+#include "parquet/util/schema-util.h"
 
 #include "arrow/api.h"
 
@@ -37,11 +38,14 @@ using arrow::Column;
 using arrow::Field;
 using arrow::Int32Array;
 using arrow::ListArray;
+using arrow::StructArray;
 using arrow::MemoryPool;
 using arrow::PoolBuffer;
 using arrow::Status;
 using arrow::Table;
 
+using parquet::schema::NodePtr;
+
 // Help reduce verbosity
 using ParquetReader = parquet::ParquetFileReader;
 
@@ -179,7 +183,13 @@ class FileReader::Impl {
   virtual ~Impl() {}
 
   Status GetColumn(int i, std::unique_ptr<ColumnReader>* out);
+  Status ReadSchemaField(int i, std::shared_ptr<Array>* out);
+  Status ReadSchemaField(int i, const std::vector<int>& indices,
+      std::shared_ptr<Array>* out);
+  Status GetReaderForNode(int index, const NodePtr& node, const std::vector<int>& indices,
+      int16_t def_level, std::unique_ptr<ColumnReader::Impl>* out);
   Status ReadColumn(int i, std::shared_ptr<Array>* out);
+  Status GetSchema(std::shared_ptr<::arrow::Schema>* out);
   Status GetSchema(
       const std::vector<int>& indices, std::shared_ptr<::arrow::Schema>* out);
   Status ReadRowGroup(int row_group_index, const std::vector<int>& indices,
@@ -204,9 +214,21 @@ class FileReader::Impl {
   int num_threads_;
 };
 
+typedef const int16_t* ValueLevelsPtr;
+
 class ColumnReader::Impl {
  public:
-  Impl(MemoryPool* pool, std::unique_ptr<FileColumnIterator> input)
+  virtual ~Impl() {}
+  virtual Status NextBatch(int batch_size, std::shared_ptr<Array>* out) = 0;
+  virtual Status GetDefLevels(ValueLevelsPtr* data, size_t* length) = 0;
+  virtual Status GetRepLevels(ValueLevelsPtr* data, size_t* length) = 0;
+  virtual const std::shared_ptr<Field> field() = 0;
+};
+
+// Reader implementation for primitive arrays
+class PrimitiveImpl: public ColumnReader::Impl {
+ public:
+  PrimitiveImpl(MemoryPool* pool, std::unique_ptr<FileColumnIterator> input)
       : pool_(pool),
         input_(std::move(input)),
         descr_(input_->descr()),
@@ -217,9 +239,9 @@ class ColumnReader::Impl {
     NextRowGroup();
   }
 
-  virtual ~Impl() {}
+  virtual ~PrimitiveImpl() {}
 
-  Status NextBatch(int batch_size, std::shared_ptr<Array>* out);
+  Status NextBatch(int batch_size, std::shared_ptr<Array>* out) override;
 
   template <typename ArrowType, typename ParquetType>
   Status TypedReadBatch(int batch_size, std::shared_ptr<Array>* out);
@@ -243,6 +265,11 @@ class ColumnReader::Impl {
   Status WrapIntoListArray(const int16_t* def_levels, const int16_t* rep_levels,
       int64_t total_values_read, std::shared_ptr<Array>* array);
 
+  Status GetDefLevels(ValueLevelsPtr* data, size_t* length) override;
+  Status GetRepLevels(ValueLevelsPtr* data, size_t* length) override;
+
+  const std::shared_ptr<Field> field() override { return field_; }
+
  private:
   void NextRowGroup();
 
@@ -272,6 +299,36 @@ class ColumnReader::Impl {
   int64_t null_count_;
 };
 
+// Reader implementation for struct array
+class StructImpl: public ColumnReader::Impl {
+ public:
+  explicit StructImpl(const std::vector<std::shared_ptr<Impl>>& children,
+      int16_t struct_def_level, MemoryPool* pool, const NodePtr& node)
+      : children_(children), struct_def_level_(struct_def_level), pool_(pool),
+        def_levels_buffer_(pool) {
+    InitField(node, children);
+  }
+
+  virtual ~StructImpl() {}
+
+  Status NextBatch(int batch_size, std::shared_ptr<Array>* out) override;
+  Status GetDefLevels(ValueLevelsPtr* data, size_t* length) override;
+  Status GetRepLevels(ValueLevelsPtr* data, size_t* length) override;
+  const std::shared_ptr<Field> field() override { return field_; }
+
+ private:
+  std::vector<std::shared_ptr<Impl>> children_;
+  int16_t struct_def_level_;
+  MemoryPool* pool_;
+  std::shared_ptr<Field> field_;
+  PoolBuffer def_levels_buffer_;
+
+  Status DefLevelsToNullArray(std::shared_ptr<MutableBuffer>* null_bitmap,
+      int64_t* null_count);
+  void InitField(const NodePtr& node,
+      const std::vector<std::shared_ptr<Impl>>& children);
+};
+
 FileReader::FileReader(MemoryPool* pool, std::unique_ptr<ParquetFileReader> reader)
     : impl_(new FileReader::Impl(pool, std::move(reader))) {}
 
@@ -281,11 +338,96 @@ Status FileReader::Impl::GetColumn(int i, std::unique_ptr<ColumnReader>* out) {
   std::unique_ptr<FileColumnIterator> input(new AllRowGroupsIterator(i, reader_.get()));
 
   std::unique_ptr<ColumnReader::Impl> impl(
-      new ColumnReader::Impl(pool_, std::move(input)));
+      new PrimitiveImpl(pool_, std::move(input)));
   *out = std::unique_ptr<ColumnReader>(new ColumnReader(std::move(impl)));
   return Status::OK();
 }
 
+Status FileReader::Impl::GetReaderForNode(int index, const NodePtr& node,
+    const std::vector<int>& indices, int16_t def_level,
+    std::unique_ptr<ColumnReader::Impl>* out) {
+
+  *out = nullptr;
+
+  if (IsSimpleStruct(node)) {
+    const schema::GroupNode* group = static_cast<const schema::GroupNode*>(node.get());
+    std::vector<std::shared_ptr<ColumnReader::Impl>> children;
+    for (int i = 0; i < group->field_count(); i++) {
+      std::unique_ptr<ColumnReader::Impl> child_reader;
+      // TODO(itaiin): Remove the -1 index hack when all types of nested reads
+      // are supported. This currently just signals the lower level reader resolution
+      // to abort
+      RETURN_NOT_OK(GetReaderForNode(index, group->field(i), indices,
+        def_level + 1, &child_reader));
+      if (child_reader != nullptr) {
+        children.push_back(std::move(child_reader));
+      }
+    }
+
+    if (children.size() > 0) {
+      *out = std::unique_ptr<ColumnReader::Impl>(
+        new StructImpl(children, def_level, pool_, node));
+    }
+  } else {
+    // This should be a flat field case - translate the field index to
+    // the correct column index by walking down to the leaf node
+    NodePtr walker = node;
+    while (!walker->is_primitive()) {
+      DCHECK(walker->is_group());
+      auto group = static_cast<GroupNode*>(walker.get());
+      if (group->field_count() != 1) {
+        return Status::NotImplemented(
+          "lists with structs are not supported.");
+      }
+      walker = group->field(0);
+    }
+    auto column_index = reader_->metadata()->schema()->ColumnIndex(*walker.get());
+
+    // If the index of the column is found then a reader for the coliumn is needed.
+    // Otherwise *out keeps the nullptr value.
+    if (std::find(indices.begin(), indices.end(), column_index) != indices.end()) {
+      std::unique_ptr<ColumnReader> reader;
+      RETURN_NOT_OK(GetColumn(column_index, &reader));
+      *out = std::move(reader->impl_);
+    }
+  }
+
+  return Status::OK();
+}
+
+Status FileReader::Impl::ReadSchemaField(int i, std::shared_ptr<Array>* out) {
+  std::vector<int> indices(reader_->metadata()->num_columns());
+
+  for (size_t j = 0; j < indices.size(); ++j) {
+    indices[j] = static_cast<int>(j);
+  }
+
+  return ReadSchemaField(i, indices, out);
+}
+
+Status FileReader::Impl::ReadSchemaField(int i, const std::vector<int>& indices,
+    std::shared_ptr<Array>* out) {
+  auto parquet_schema = reader_->metadata()->schema();
+
+  auto node = parquet_schema->group_node()->field(i);
+  std::unique_ptr<ColumnReader::Impl> reader_impl;
+
+  RETURN_NOT_OK(GetReaderForNode(i, node, indices, 1, &reader_impl));
+  if (reader_impl == nullptr) {
+    *out = nullptr;
+    return Status::OK();
+  }
+
+  std::unique_ptr<ColumnReader> reader(new ColumnReader(std::move(reader_impl)));
+
+  int64_t batch_size = 0;
+  for (int j = 0; j < reader_->metadata()->num_row_groups(); j++) {
+    batch_size += reader_->metadata()->RowGroup(j)->ColumnChunk(i)->num_values();
+  }
+
+  return reader->NextBatch(static_cast<int>(batch_size), out);
+}
+
 Status FileReader::Impl::ReadColumn(int i, std::shared_ptr<Array>* out) {
   std::unique_ptr<ColumnReader> flat_column_reader;
   RETURN_NOT_OK(GetColumn(i, &flat_column_reader));
@@ -327,7 +469,7 @@ Status FileReader::Impl::ReadRowGroup(int row_group_index,
         new SingleRowGroupIterator(column_index, row_group_index, reader_.get()));
 
     std::unique_ptr<ColumnReader::Impl> impl(
-        new ColumnReader::Impl(pool_, std::move(input)));
+        new PrimitiveImpl(pool_, std::move(input)));
     ColumnReader flat_column_reader(std::move(impl));
 
     std::shared_ptr<Array> array;
@@ -357,9 +499,17 @@ Status FileReader::Impl::ReadTable(
   int nthreads = std::min<int>(num_threads_, num_fields);
   std::vector<std::shared_ptr<Column>> columns(num_fields);
 
-  auto ReadColumnFunc = [&indices, &schema, &columns, this](int i) {
+  // We only need to read schema fields which have columns indicated
+  // in the indices vector
+  std::vector<int> field_indices;
+  if (!ColumnIndicesToFieldIndices(*reader_->metadata()->schema(),
+        indices, &field_indices)) {
+    return Status::Invalid("Invalid column index");
+  }
+
+  auto ReadColumnFunc = [&indices, &field_indices, &schema, &columns, this](int i) {
     std::shared_ptr<Array> array;
-    RETURN_NOT_OK(ReadColumn(indices[i], &array));
+    RETURN_NOT_OK(ReadSchemaField(field_indices[i], indices, &array));
     columns[i] = std::make_shared<Column>(schema->field(i), array);
     return Status::OK();
   };
@@ -424,6 +574,14 @@ Status FileReader::ReadColumn(int i, std::shared_ptr<Array>* out) {
   }
 }
 
+Status FileReader::ReadSchemaField(int i, std::shared_ptr<Array>* out) {
+  try {
+    return impl_->ReadSchemaField(i, out);
+  } catch (const ::parquet::ParquetException& e) {
+    return ::arrow::Status::IOError(e.what());
+  }
+}
+
 Status FileReader::ReadTable(std::shared_ptr<Table>* out) {
   try {
     return impl_->ReadTable(out);
@@ -471,7 +629,8 @@ const ParquetFileReader* FileReader::parquet_reader() const {
 }
 
 template <typename ArrowType, typename ParquetType>
-Status ColumnReader::Impl::ReadNonNullableBatch(TypedColumnReader<ParquetType>* reader,
+Status PrimitiveImpl::ReadNonNullableBatch(
+    TypedColumnReader<ParquetType>* reader,
     int64_t values_to_read, int64_t* levels_read) {
   using ArrowCType = typename ArrowType::c_type;
   using ParquetCType = typename ParquetType::c_type;
@@ -491,7 +650,7 @@ Status ColumnReader::Impl::ReadNonNullableBatch(TypedColumnReader<ParquetType>*
 
 #define NONNULLABLE_BATCH_FAST_PATH(ArrowType, ParquetType, CType)               \
   template <>                                                                    \
-  Status ColumnReader::Impl::ReadNonNullableBatch<ArrowType, ParquetType>(       \
+  Status PrimitiveImpl::ReadNonNullableBatch<ArrowType, ParquetType>(            \
       TypedColumnReader<ParquetType> * reader, int64_t values_to_read,           \
       int64_t * levels_read) {                                                   \
     int64_t values_read;                                                         \
@@ -515,7 +674,7 @@ NONNULLABLE_BATCH_FAST_PATH(::arrow::Time32Type, Int32Type, int32_t)
 NONNULLABLE_BATCH_FAST_PATH(::arrow::Time64Type, Int64Type, int64_t)
 
 template <>
-Status ColumnReader::Impl::ReadNonNullableBatch<::arrow::TimestampType, Int96Type>(
+Status PrimitiveImpl::ReadNonNullableBatch<::arrow::TimestampType, Int96Type>(
     TypedColumnReader<Int96Type>* reader, int64_t values_to_read, int64_t* levels_read) {
   RETURN_NOT_OK(values_buffer_.Resize(values_to_read * sizeof(Int96), false));
   auto values = reinterpret_cast<Int96*>(values_buffer_.mutable_data());
@@ -533,7 +692,7 @@ Status ColumnReader::Impl::ReadNonNullableBatch<::arrow::TimestampType, Int96Typ
 }
 
 template <>
-Status ColumnReader::Impl::ReadNonNullableBatch<::arrow::Date64Type, Int32Type>(
+Status PrimitiveImpl::ReadNonNullableBatch<::arrow::Date64Type, Int32Type>(
     TypedColumnReader<Int32Type>* reader, int64_t values_to_read, int64_t* levels_read) {
   RETURN_NOT_OK(values_buffer_.Resize(values_to_read * sizeof(int32_t), false));
   auto values = reinterpret_cast<int32_t*>(values_buffer_.mutable_data());
@@ -551,8 +710,8 @@ Status ColumnReader::Impl::ReadNonNullableBatch<::arrow::Date64Type, Int32Type>(
 }
 
 template <>
-Status ColumnReader::Impl::ReadNonNullableBatch<::arrow::BooleanType, BooleanType>(
-    TypedColumnReader<BooleanType>* reader, int64_t values_to_read,
+Status PrimitiveImpl::ReadNonNullableBatch<::arrow::BooleanType,
+    BooleanType>(TypedColumnReader<BooleanType>* reader, int64_t values_to_read,
     int64_t* levels_read) {
   RETURN_NOT_OK(values_buffer_.Resize(values_to_read * sizeof(bool), false));
   auto values = reinterpret_cast<bool*>(values_buffer_.mutable_data());
@@ -569,7 +728,8 @@ Status ColumnReader::Impl::ReadNonNullableBatch<::arrow::BooleanType, BooleanTyp
 }
 
 template <typename ArrowType, typename ParquetType>
-Status ColumnReader::Impl::ReadNullableBatch(TypedColumnReader<ParquetType>* reader,
+Status PrimitiveImpl::ReadNullableBatch(
+    TypedColumnReader<ParquetType>* reader,
     int16_t* def_levels, int16_t* rep_levels, int64_t values_to_read,
     int64_t* levels_read, int64_t* values_read) {
   using ArrowCType = typename ArrowType::c_type;
@@ -599,7 +759,7 @@ Status ColumnReader::Impl::ReadNullableBatch(TypedColumnReader<ParquetType>* rea
 
 #define NULLABLE_BATCH_FAST_PATH(ArrowType, ParquetType, CType)                    \
   template <>                                                                      \
-  Status ColumnReader::Impl::ReadNullableBatch<ArrowType, ParquetType>(            \
+  Status PrimitiveImpl::ReadNullableBatch<ArrowType, ParquetType>(                 \
       TypedColumnReader<ParquetType> * reader, int16_t * def_levels,               \
       int16_t * rep_levels, int64_t values_to_read, int64_t * levels_read,         \
       int64_t * values_read) {                                                     \
@@ -625,7 +785,7 @@ NULLABLE_BATCH_FAST_PATH(::arrow::Time32Type, Int32Type, int32_t)
 NULLABLE_BATCH_FAST_PATH(::arrow::Time64Type, Int64Type, int64_t)
 
 template <>
-Status ColumnReader::Impl::ReadNullableBatch<::arrow::TimestampType, Int96Type>(
+Status PrimitiveImpl::ReadNullableBatch<::arrow::TimestampType, Int96Type>(
     TypedColumnReader<Int96Type>* reader, int16_t* def_levels, int16_t* rep_levels,
     int64_t values_to_read, int64_t* levels_read, int64_t* values_read) {
   RETURN_NOT_OK(values_buffer_.Resize(values_to_read * sizeof(Int96), false));
@@ -650,7 +810,7 @@ Status ColumnReader::Impl::ReadNullableBatch<::arrow::TimestampType, Int96Type>(
 }
 
 template <>
-Status ColumnReader::Impl::ReadNullableBatch<::arrow::Date64Type, Int32Type>(
+Status PrimitiveImpl::ReadNullableBatch<::arrow::Date64Type, Int32Type>(
     TypedColumnReader<Int32Type>* reader, int16_t* def_levels, int16_t* rep_levels,
     int64_t values_to_read, int64_t* levels_read, int64_t* values_read) {
   RETURN_NOT_OK(values_buffer_.Resize(values_to_read * sizeof(int32_t), false));
@@ -675,7 +835,7 @@ Status ColumnReader::Impl::ReadNullableBatch<::arrow::Date64Type, Int32Type>(
 }
 
 template <>
-Status ColumnReader::Impl::ReadNullableBatch<::arrow::BooleanType, BooleanType>(
+Status PrimitiveImpl::ReadNullableBatch<::arrow::BooleanType, BooleanType>(
     TypedColumnReader<BooleanType>* reader, int16_t* def_levels, int16_t* rep_levels,
     int64_t values_to_read, int64_t* levels_read, int64_t* values_read) {
   RETURN_NOT_OK(values_buffer_.Resize(values_to_read * sizeof(bool), false));
@@ -699,7 +859,7 @@ Status ColumnReader::Impl::ReadNullableBatch<::arrow::BooleanType, BooleanType>(
 }
 
 template <typename ArrowType>
-Status ColumnReader::Impl::InitDataBuffer(int batch_size) {
+Status PrimitiveImpl::InitDataBuffer(int batch_size) {
   using ArrowCType = typename ArrowType::c_type;
   data_buffer_ = std::make_shared<PoolBuffer>(pool_);
   RETURN_NOT_OK(data_buffer_->Resize(batch_size * sizeof(ArrowCType), false));
@@ -709,7 +869,7 @@ Status ColumnReader::Impl::InitDataBuffer(int batch_size) {
 }
 
 template <>
-Status ColumnReader::Impl::InitDataBuffer<::arrow::BooleanType>(int batch_size) {
+Status PrimitiveImpl::InitDataBuffer<::arrow::BooleanType>(int batch_size) {
   data_buffer_ = std::make_shared<PoolBuffer>(pool_);
   RETURN_NOT_OK(data_buffer_->Resize(::arrow::BitUtil::CeilByte(batch_size) / 8, false));
   data_buffer_ptr_ = data_buffer_->mutable_data();
@@ -718,7 +878,7 @@ Status ColumnReader::Impl::InitDataBuffer<::arrow::BooleanType>(int batch_size)
   return Status::OK();
 }
 
-Status ColumnReader::Impl::InitValidBits(int batch_size) {
+Status PrimitiveImpl::InitValidBits(int batch_size) {
   valid_bits_idx_ = 0;
   if (descr_->max_definition_level() > 0) {
     int valid_bits_size =
@@ -732,17 +892,13 @@ Status ColumnReader::Impl::InitValidBits(int batch_size) {
   return Status::OK();
 }
 
-Status ColumnReader::Impl::WrapIntoListArray(const int16_t* def_levels,
+Status PrimitiveImpl::WrapIntoListArray(const int16_t* def_levels,
     const int16_t* rep_levels, int64_t total_levels_read, std::shared_ptr<Array>* array) {
   std::shared_ptr<::arrow::Schema> arrow_schema;
   RETURN_NOT_OK(FromParquetSchema(input_->schema(), {input_->column_index()},
       input_->metadata()->key_value_metadata(), &arrow_schema));
   std::shared_ptr<Field> current_field = arrow_schema->field(0);
 
-  if (current_field->type()->id() == ::arrow::Type::STRUCT) {
-    return Status::NotImplemented("Structs are not yet supported.");
-  }
-
   if (descr_->max_repetition_level() > 0) {
     // Walk downwards to extract nullability
     std::vector<bool> nullable;
@@ -843,7 +999,8 @@ Status ColumnReader::Impl::WrapIntoListArray(const int16_t* def_levels,
 }
 
 template <typename ArrowType, typename ParquetType>
-Status ColumnReader::Impl::TypedReadBatch(int batch_size, std::shared_ptr<Array>* out) {
+Status PrimitiveImpl::TypedReadBatch(
+    int batch_size, std::shared_ptr<Array>* out) {
   using ArrowCType = typename ArrowType::c_type;
 
   int values_to_read = batch_size;
@@ -901,7 +1058,7 @@ Status ColumnReader::Impl::TypedReadBatch(int batch_size, std::shared_ptr<Array>
 }
 
 template <>
-Status ColumnReader::Impl::TypedReadBatch<::arrow::BooleanType, BooleanType>(
+Status PrimitiveImpl::TypedReadBatch<::arrow::BooleanType, BooleanType>(
     int batch_size, std::shared_ptr<Array>* out) {
   int values_to_read = batch_size;
   int total_levels_read = 0;
@@ -917,9 +1074,6 @@ Status ColumnReader::Impl::TypedReadBatch<::arrow::BooleanType, BooleanType>(
   int16_t* rep_levels = reinterpret_cast<int16_t*>(rep_levels_buffer_.mutable_data());
 
   while ((values_to_read > 0) && column_reader_) {
-    if (descr_->max_definition_level() > 0) {
-      RETURN_NOT_OK(def_levels_buffer_.Resize(values_to_read * sizeof(int16_t), false));
-    }
     auto reader = dynamic_cast<TypedColumnReader<BooleanType>*>(column_reader_.get());
     int64_t values_read;
     int64_t levels_read;
@@ -974,7 +1128,7 @@ Status ColumnReader::Impl::TypedReadBatch<::arrow::BooleanType, BooleanType>(
 }
 
 template <typename ArrowType>
-Status ColumnReader::Impl::ReadByteArrayBatch(
+Status PrimitiveImpl::ReadByteArrayBatch(
     int batch_size, std::shared_ptr<Array>* out) {
   using BuilderType = typename ::arrow::TypeTraits<ArrowType>::BuilderType;
 
@@ -1031,7 +1185,7 @@ Status ColumnReader::Impl::ReadByteArrayBatch(
 }
 
 template <typename ArrowType>
-Status ColumnReader::Impl::ReadFLBABatch(
+Status PrimitiveImpl::ReadFLBABatch(
     int batch_size, int byte_width, std::shared_ptr<Array>* out) {
   using BuilderType = typename ::arrow::TypeTraits<ArrowType>::BuilderType;
   int total_levels_read = 0;
@@ -1083,13 +1237,13 @@ Status ColumnReader::Impl::ReadFLBABatch(
 }
 
 template <>
-Status ColumnReader::Impl::TypedReadBatch<::arrow::BinaryType, ByteArrayType>(
+Status PrimitiveImpl::TypedReadBatch<::arrow::BinaryType, ByteArrayType>(
     int batch_size, std::shared_ptr<Array>* out) {
   return ReadByteArrayBatch<::arrow::BinaryType>(batch_size, out);
 }
 
 template <>
-Status ColumnReader::Impl::TypedReadBatch<::arrow::StringType, ByteArrayType>(
+Status PrimitiveImpl::TypedReadBatch<::arrow::StringType, ByteArrayType>(
     int batch_size, std::shared_ptr<Array>* out) {
   return ReadByteArrayBatch<::arrow::StringType>(batch_size, out);
 }
@@ -1099,7 +1253,8 @@ Status ColumnReader::Impl::TypedReadBatch<::arrow::StringType, ByteArrayType>(
     return TypedReadBatch<ArrowType, ParquetType>(batch_size, out); \
     break;
 
-Status ColumnReader::Impl::NextBatch(int batch_size, std::shared_ptr<Array>* out) {
+Status PrimitiveImpl::NextBatch(
+    int batch_size, std::shared_ptr<Array>* out) {
   if (!column_reader_) {
     // Exhausted all row groups.
     *out = nullptr;
@@ -1155,10 +1310,22 @@ Status ColumnReader::Impl::NextBatch(int batch_size, std::shared_ptr<Array>* out
   }
 }
 
-void ColumnReader::Impl::NextRowGroup() {
+void PrimitiveImpl::NextRowGroup() {
   column_reader_ = input_->Next();
 }
 
+Status PrimitiveImpl::GetDefLevels(ValueLevelsPtr* data, size_t* length) {
+  *data = reinterpret_cast<ValueLevelsPtr>(def_levels_buffer_.data());
+  *length = def_levels_buffer_.size() / sizeof(int16_t);
+  return Status::OK();
+}
+
+Status PrimitiveImpl::GetRepLevels(ValueLevelsPtr* data, size_t* length) {
+  *data = reinterpret_cast<ValueLevelsPtr>(rep_levels_buffer_.data());
+  *length = rep_levels_buffer_.size() / sizeof(int16_t);
+  return Status::OK();
+}
+
 ColumnReader::ColumnReader(std::unique_ptr<Impl> impl) : impl_(std::move(impl)) {}
 
 ColumnReader::~ColumnReader() {}
@@ -1167,5 +1334,114 @@ Status ColumnReader::NextBatch(int batch_size, std::shared_ptr<Array>* out) {
   return impl_->NextBatch(batch_size, out);
 }
 
+// StructImpl methods
+
+Status StructImpl::DefLevelsToNullArray(
+    std::shared_ptr<MutableBuffer>* null_bitmap_out,
+    int64_t* null_count_out) {
+  std::shared_ptr<MutableBuffer> null_bitmap;
+  auto null_count = 0;
+  ValueLevelsPtr def_levels_data;
+  size_t def_levels_length;
+  RETURN_NOT_OK(GetDefLevels(&def_levels_data, &def_levels_length));
+  RETURN_NOT_OK(GetEmptyBitmap(pool_,
+    def_levels_length, &null_bitmap));
+  uint8_t* null_bitmap_ptr = null_bitmap->mutable_data();
+  for (size_t i = 0; i < def_levels_length; i++) {
+    if (def_levels_data[i] < struct_def_level_) {
+      // Mark null
+      null_count += 1;
+    } else {
+      DCHECK_EQ(def_levels_data[i], struct_def_level_);
+      ::arrow::BitUtil::SetBit(null_bitmap_ptr, i);
+    }
+  }
+
+  *null_count_out = null_count;
+  *null_bitmap_out = (null_count == 0) ? nullptr : null_bitmap;
+  return Status::OK();
+}
+
+// TODO(itaiin): Consider caching the results of this calculation -
+//   note that this is only used once for each read for now
+Status StructImpl::GetDefLevels(ValueLevelsPtr* data, size_t* length) {
+  *data = nullptr;
+  if (children_.size() == 0) {
+    // Empty struct
+    *length = 0;
+    return Status::OK();
+  }
+
+  // We have at least one child
+  ValueLevelsPtr child_def_levels;
+  size_t child_length;
+  RETURN_NOT_OK(children_[0]->GetDefLevels(&child_def_levels, &child_length));
+  auto size = child_length * sizeof(int16_t);
+  def_levels_buffer_.Resize(size);
+  // Initialize with the minimal def level
+  std::memset(def_levels_buffer_.mutable_data(), -1, size);
+  auto result_levels = reinterpret_cast<int16_t*>(def_levels_buffer_.mutable_data());
+
+  // When a struct is defined, all of its children def levels are at least at
+  // nesting level, and def level equals nesting level.
+  // When a struct is not defined, all of its children def levels are less than
+  // the nesting level, and the def level equals max(children def levels)
+  // All other possibilities are malformed definition data.
+  for (auto& child : children_) {
+    size_t current_child_length;
+    RETURN_NOT_OK(child->GetDefLevels(&child_def_levels, &current_child_length));
+    DCHECK_EQ(child_length, current_child_length);
+    for (size_t i = 0; i < child_length; i++) {
+      // Check that value is either uninitialized, or current
+      // and previous children def levels agree on the struct level
+      DCHECK((result_levels[i] == -1) ||
+             ((result_levels[i] >= struct_def_level_) ==
+              (child_def_levels[i] >= struct_def_level_)));
+      result_levels[i] = std::max(result_levels[i],
+        std::min(child_def_levels[i], struct_def_level_));
+    }
+  }
+  *data = reinterpret_cast<ValueLevelsPtr>(def_levels_buffer_.data());
+  *length = child_length;
+  return Status::OK();
+}
+
+void StructImpl::InitField(const NodePtr& node,
+    const std::vector<std::shared_ptr<Impl>>& children) {
+  // Make a shallow node to field conversion from the children fields
+  std::vector<std::shared_ptr<::arrow::Field>> fields(children.size());
+  for (size_t i  = 0; i < children.size(); i++) {
+    fields[i] = children[i]->field();
+  }
+  auto type = std::make_shared<::arrow::StructType>(fields);
+  field_ = std::make_shared<Field>(node->name(), type);
+}
+
+Status StructImpl::GetRepLevels(ValueLevelsPtr* data, size_t* length) {
+  return Status::NotImplemented("GetRepLevels is not implemented for struct");
+}
+
+Status StructImpl::NextBatch(int batch_size, std::shared_ptr<Array>* out) {
+  std::vector<std::shared_ptr<Array>> children_arrays;
+  std::shared_ptr<MutableBuffer> null_bitmap;
+  int64_t null_count;
+
+  // Gather children arrays and def levels
+  for (auto& child : children_) {
+    std::shared_ptr<Array> child_array;
+
+    RETURN_NOT_OK(child->NextBatch(batch_size, &child_array));
+
+    children_arrays.push_back(child_array);
+  }
+
+  RETURN_NOT_OK(DefLevelsToNullArray(&null_bitmap, &null_count));
+
+  *out = std::make_shared<StructArray>(field()->type(), batch_size, children_arrays,
+    null_bitmap, null_count);
+
+  return Status::OK();
+}
+
 }  // namespace arrow
 }  // namespace parquet

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/29ed01ea/src/parquet/arrow/reader.h
----------------------------------------------------------------------
diff --git a/src/parquet/arrow/reader.h b/src/parquet/arrow/reader.h
index 24601b8..3a97c83 100644
--- a/src/parquet/arrow/reader.h
+++ b/src/parquet/arrow/reader.h
@@ -99,10 +99,44 @@ class PARQUET_EXPORT FileReader {
   // Read column as a whole into an Array.
   ::arrow::Status ReadColumn(int i, std::shared_ptr<::arrow::Array>* out);
 
-  // Read a table of flat columns into a Table.
+  // NOTE: Experimental API
+  // Reads a specific top level schema field into an Array
+  // The index i refers the index of the top level schema field, which may
+  // be nested or flat - e.g.
+  //
+  // 0 foo.bar
+  //   foo.bar.baz
+  //   foo.qux
+  // 1 foo2
+  // 2 foo3
+  //
+  // i=0 will read the entire foo struct, i=1 the foo2 primitive column etc
+  ::arrow::Status ReadSchemaField(int i, std::shared_ptr<::arrow::Array>* out);
+
+  // NOTE: Experimental API
+  // Reads a specific top level schema field into an Array, while keeping only chosen
+  // leaf columns.
+  // The index i refers the index of the top level schema field, which may
+  // be nested or flat, and indices vector refers to the leaf column indices - e.g.
+  //
+  // i  indices
+  // 0  0        foo.bar
+  // 0  1        foo.bar.baz
+  // 0  2        foo.qux
+  // 1  3        foo2
+  // 2  4        foo3
+  //
+  // i=0 indices={0,2} will read a partial struct with foo.bar and foo.quox columns
+  // i=1 indices={3} will read foo2 column
+  // i=1 indices={2} will result in out=nullptr
+  // leaf indices which are unrelated to the schema field are ignored
+  ::arrow::Status ReadSchemaField(int i, const std::vector<int>& indices,
+      std::shared_ptr<::arrow::Array>* out);
+
+  // Read a table of columns into a Table
   ::arrow::Status ReadTable(std::shared_ptr<::arrow::Table>* out);
 
-  // Read a table of flat columns into a Table. Read only the indicated column
+  // Read a table of columns into a Table. Read only the indicated column
   // indices (relative to the schema)
   ::arrow::Status ReadTable(
       const std::vector<int>& column_indices, std::shared_ptr<::arrow::Table>* out);
@@ -154,6 +188,8 @@ class PARQUET_EXPORT ColumnReader {
   explicit ColumnReader(std::unique_ptr<Impl> impl);
 
   friend class FileReader;
+  friend class PrimitiveImpl;
+  friend class StructImpl;
 };
 
 // Helper function to create a file reader from an implementation of an Arrow

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/29ed01ea/src/parquet/arrow/schema.cc
----------------------------------------------------------------------
diff --git a/src/parquet/arrow/schema.cc b/src/parquet/arrow/schema.cc
index 83968bc..a78a23b 100644
--- a/src/parquet/arrow/schema.cc
+++ b/src/parquet/arrow/schema.cc
@@ -22,6 +22,7 @@
 #include <vector>
 
 #include "parquet/api/schema.h"
+#include "parquet/util/schema-util.h"
 
 #include "arrow/api.h"
 
@@ -224,11 +225,6 @@ Status StructFromGroup(const GroupNode* group,
   return Status::OK();
 }
 
-bool str_endswith_tuple(const std::string& str) {
-  if (str.size() >= 6) { return str.substr(str.size() - 6, 6) == "_tuple"; }
-  return false;
-}
-
 Status NodeToList(const GroupNode* group,
     const std::unordered_set<NodePtr>* included_leaf_nodes, TypePtr* out) {
   *out = nullptr;
@@ -240,8 +236,7 @@ Status NodeToList(const GroupNode* group,
       // Special case mentioned in the format spec:
       //   If the name is array or ends in _tuple, this should be a list of struct
       //   even for single child elements.
-      if (list_group->field_count() == 1 && list_node->name() != "array" &&
-          !str_endswith_tuple(list_node->name())) {
+      if (list_group->field_count() == 1 && !HasStructListName(*list_group)) {
         // List of primitive type
         std::shared_ptr<Field> item_field;
         RETURN_NOT_OK(

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/29ed01ea/src/parquet/arrow/test-util.h
----------------------------------------------------------------------
diff --git a/src/parquet/arrow/test-util.h b/src/parquet/arrow/test-util.h
index a5337cf..e44fcb6 100644
--- a/src/parquet/arrow/test-util.h
+++ b/src/parquet/arrow/test-util.h
@@ -271,7 +271,7 @@ typename std::enable_if<is_arrow_bool<ArrowType>::value, Status>::type NullableA
 /// Wrap an Array into a ListArray by splitting it up into size lists.
 ///
 /// This helper function only supports (size/2) nulls.
-Status MakeListArary(const std::shared_ptr<Array>& values, int64_t size,
+Status MakeListArray(const std::shared_ptr<Array>& values, int64_t size,
     int64_t null_count, bool nullable_values, std::shared_ptr<::arrow::ListArray>* out) {
   // We always include an empty list
   int64_t non_null_entries = size - null_count - 1;

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/29ed01ea/src/parquet/column/column-reader-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/column/column-reader-test.cc b/src/parquet/column/column-reader-test.cc
index a31c817..e34ac4c 100644
--- a/src/parquet/column/column-reader-test.cc
+++ b/src/parquet/column/column-reader-test.cc
@@ -44,7 +44,8 @@ namespace test {
 
 template <typename T>
 static inline bool vector_equal_with_def_levels(const vector<T>& left,
-    const vector<int16_t> def_levels, int16_t max_def_levels, const vector<T>& right) {
+    const vector<int16_t>& def_levels, int16_t max_def_levels, int16_t max_rep_levels,
+    const vector<T>& right) {
   size_t i_left = 0;
   size_t i_right = 0;
   for (size_t i = 0; i < def_levels.size(); i++) {
@@ -57,9 +58,14 @@ static inline bool vector_equal_with_def_levels(const vector<T>& left,
       }
       i_left++;
       i_right++;
-    } else if (def_levels[i] == (max_def_levels - 1)) {
+    } else if (def_levels[i] == (max_def_levels -1)) {
       // Null entry on the lowest nested level
       i_right++;
+    } else if (def_levels[i] < (max_def_levels - 1)) {
+      // Null entry on a higher nesting level, only supported for non-repeating data
+      if (max_rep_levels == 0) {
+        i_right++;
+      }
     }
   }
 
@@ -142,7 +148,8 @@ class TestPrimitiveReader : public ::testing::Test {
     if (max_def_level_ > 0) {
       ASSERT_TRUE(vector_equal(def_levels_, dresult));
       ASSERT_TRUE(
-          vector_equal_with_def_levels(values_, dresult, max_def_level_, vresult));
+          vector_equal_with_def_levels(values_, dresult, max_def_level_,
+            max_rep_level_, vresult));
     } else {
       ASSERT_TRUE(vector_equal(values_, vresult));
     }

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/29ed01ea/src/parquet/column/reader.h
----------------------------------------------------------------------
diff --git a/src/parquet/column/reader.h b/src/parquet/column/reader.h
index 9749e56..724773d 100644
--- a/src/parquet/column/reader.h
+++ b/src/parquet/column/reader.h
@@ -24,6 +24,7 @@
 #include <memory>
 #include <unordered_map>
 #include <vector>
+#include <iostream>
 
 #include <arrow/util/bit-util.h>
 
@@ -263,20 +264,35 @@ inline int64_t TypedColumnReader<DType>::ReadBatch(int64_t batch_size,
 }
 
 inline void DefinitionLevelsToBitmap(const int16_t* def_levels, int64_t num_def_levels,
-    int16_t max_definition_level, int64_t* values_read, int64_t* null_count,
+    int16_t max_definition_level,  int16_t max_repetition_level,
+    int64_t* values_read, int64_t* null_count,
     uint8_t* valid_bits, int64_t valid_bits_offset) {
   int byte_offset = static_cast<int>(valid_bits_offset) / 8;
   int bit_offset = static_cast<int>(valid_bits_offset) % 8;
   uint8_t bitset = valid_bits[byte_offset];
 
+  // TODO(itaiin): As an interim solution we are splitting the code path here
+  // between repeated+flat column reads, and non-repeated+nested reads.
+  // Those paths need to be merged in the future
   for (int i = 0; i < num_def_levels; ++i) {
     if (def_levels[i] == max_definition_level) {
       bitset |= (1 << bit_offset);
-    } else if (def_levels[i] == (max_definition_level - 1)) {
-      bitset &= ~(1 << bit_offset);
-      *null_count += 1;
+    } else if (max_repetition_level > 0) {
+      // repetition+flat case
+      if (def_levels[i] == (max_definition_level - 1)) {
+        bitset &= ~(1 << bit_offset);
+        *null_count += 1;
+      } else {
+        continue;
+      }
     } else {
-      continue;
+      // non-repeated+nested case
+      if (def_levels[i] < max_definition_level) {
+        bitset &= ~(1 << bit_offset);
+        *null_count += 1;
+      } else {
+        throw ParquetException("definition level exceeds maximum");
+      }
     }
 
     bit_offset++;
@@ -322,9 +338,28 @@ inline int64_t TypedColumnReader<DType>::ReadBatchSpaced(int64_t batch_size,
       }
     }
 
+    // TODO(itaiin): another code path split to merge when the general case is done
+    bool has_spaced_values;
+    if (descr_->max_repetition_level() > 0) {
+      // repeated+flat case
+      has_spaced_values = !descr_->schema_node()->is_required();
+    } else {
+      // non-repeated+nested case
+      // Find if a node forces nulls in the lowest level along the hierarchy
+      const schema::Node* node = descr_->schema_node().get();
+      has_spaced_values = false;
+      while (node) {
+        auto parent = node->parent();
+        if (node->is_optional()) {
+          has_spaced_values = true;
+          break;
+        }
+        node = parent;
+      }
+    }
+
     int64_t null_count = 0;
-    if (descr_->schema_node()->is_required()) {
-      // Node is required so there are no null entries on the lowest nesting level.
+    if (!has_spaced_values) {
       int values_to_read = 0;
       for (int64_t i = 0; i < num_def_levels; ++i) {
         if (def_levels[i] == descr_->max_definition_level()) { ++values_to_read; }
@@ -336,8 +371,9 @@ inline int64_t TypedColumnReader<DType>::ReadBatchSpaced(int64_t batch_size,
       *values_read = total_values;
     } else {
       int16_t max_definition_level = descr_->max_definition_level();
+      int16_t max_repetition_level = descr_->max_repetition_level();
       DefinitionLevelsToBitmap(def_levels, num_def_levels, max_definition_level,
-          values_read, &null_count, valid_bits, valid_bits_offset);
+          max_repetition_level, values_read, &null_count, valid_bits, valid_bits_offset);
       total_values = ReadValuesSpaced(*values_read, values, static_cast<int>(null_count),
           valid_bits, valid_bits_offset);
     }

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/29ed01ea/src/parquet/schema-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/schema-test.cc b/src/parquet/schema-test.cc
index f8a7205..58f23df 100644
--- a/src/parquet/schema-test.cc
+++ b/src/parquet/schema-test.cc
@@ -333,6 +333,21 @@ TEST_F(TestGroupNode, Equals) {
   ASSERT_FALSE(group5.Equals(&group4));
 }
 
+TEST_F(TestGroupNode, FieldIndex) {
+  NodeVector fields = Fields1();
+  GroupNode group("group", Repetition::REQUIRED, fields);
+  for (size_t i = 0; i < fields.size(); i++) {
+      auto field = group.field(static_cast<int>(i));
+      ASSERT_EQ(i, group.FieldIndex(*field.get()));
+  }
+
+  // Test a non field node
+  auto non_field_alien = Int32("alien", Repetition::REQUIRED); // other name
+  auto non_field_familiar = Int32("one", Repetition::REPEATED); // other node
+  ASSERT_TRUE(group.FieldIndex(*non_field_alien.get()) < 0);
+  ASSERT_TRUE(group.FieldIndex(*non_field_familiar.get()) < 0);
+}
+
 // ----------------------------------------------------------------------
 // Test convert group
 
@@ -648,6 +663,17 @@ TEST_F(TestSchemaDescriptor, BuildTree) {
   ASSERT_EQ(descr_.Column(4)->path()->ToDotString(), "bag.records.item2");
   ASSERT_EQ(descr_.Column(5)->path()->ToDotString(), "bag.records.item3");
 
+  for (int i = 0; i < nleaves; ++i) {
+      auto col = descr_.Column(i);
+      ASSERT_EQ(i, descr_.ColumnIndex(*col->schema_node().get()));
+  }
+
+  // Test non-column nodes find
+  NodePtr non_column_alien = Int32("alien", Repetition::REQUIRED); // other path
+  NodePtr non_column_familiar = Int32("a", Repetition::REPEATED); // other node
+  ASSERT_TRUE(descr_.ColumnIndex(*non_column_alien.get()) < 0);
+  ASSERT_TRUE(descr_.ColumnIndex(*non_column_familiar.get()) < 0);
+
   ASSERT_EQ(inta.get(), descr_.GetColumnRoot(0).get());
   ASSERT_EQ(bag.get(), descr_.GetColumnRoot(3).get());
   ASSERT_EQ(bag.get(), descr_.GetColumnRoot(4).get());

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/29ed01ea/src/parquet/schema.cc
----------------------------------------------------------------------
diff --git a/src/parquet/schema.cc b/src/parquet/schema.cc
index a3debd0..1209ad1 100644
--- a/src/parquet/schema.cc
+++ b/src/parquet/schema.cc
@@ -44,6 +44,21 @@ std::shared_ptr<ColumnPath> ColumnPath::FromDotString(const std::string& dotstri
   return std::shared_ptr<ColumnPath>(new ColumnPath(std::move(path)));
 }
 
+std::shared_ptr<ColumnPath> ColumnPath::FromNode(const Node& node) {
+  // Build the path in reverse order as we traverse the nodes to the top
+  std::vector<std::string> rpath_;
+  const Node* cursor = &node;
+  // The schema node is not part of the ColumnPath
+  while (cursor->parent()) {
+    rpath_.push_back(cursor->name());
+    cursor = cursor->parent();
+  }
+
+  // Build ColumnPath in correct order
+  std::vector<std::string> path(rpath_.crbegin(), rpath_.crend());
+  return std::make_shared<ColumnPath>(std::move(path));
+}
+
 std::shared_ptr<ColumnPath> ColumnPath::extend(const std::string& node_name) const {
   std::vector<std::string> path;
   path.reserve(path_.size() + 1);
@@ -70,6 +85,12 @@ const std::vector<std::string>& ColumnPath::ToDotVector() const {
 // ----------------------------------------------------------------------
 // Base node
 
+const std::shared_ptr<ColumnPath> Node::path() const {
+  // TODO(itaiin): Cache the result, or more precisely, cache ->ToDotString()
+  //    since it is being used to access the leaf nodes
+  return ColumnPath::FromNode(*this);
+}
+
 bool Node::EqualsInternal(const Node* other) const {
   return type_ == other->type_ && name_ == other->name_ &&
          repetition_ == other->repetition_ && logical_type_ == other->logical_type_;
@@ -229,6 +250,28 @@ bool GroupNode::Equals(const Node* other) const {
   return EqualsInternal(static_cast<const GroupNode*>(other));
 }
 
+int GroupNode::FieldIndex(const std::string& name) const {
+  auto search = field_name_to_idx_.find(name);
+  if (search == field_name_to_idx_.end()) {
+    // Not found
+    return -1;
+  }
+  return search->second;
+}
+
+int GroupNode::FieldIndex(const Node& node) const {
+  int result = FieldIndex(node.name());
+  if (result < 0) {
+    return -1;
+  }
+  DCHECK(result < field_count());
+  if (!node.Equals(field(result).get())) {
+    // Same name but not the same node
+    return -1;
+  }
+  return result;
+}
+
 void GroupNode::Visit(Node::Visitor* visitor) {
   visitor->Visit(this);
 }
@@ -595,6 +638,8 @@ void SchemaDescriptor::BuildTree(const NodePtr& node, int16_t max_def_level,
     // Primitive node, append to leaves
     leaves_.push_back(ColumnDescriptor(node, max_def_level, max_rep_level, this));
     leaf_to_base_.emplace(static_cast<int>(leaves_.size()) - 1, base);
+    leaf_to_idx_.emplace(
+        node->path()->ToDotString(), static_cast<int>(leaves_.size()) - 1);
   }
 }
 
@@ -620,6 +665,28 @@ const ColumnDescriptor* SchemaDescriptor::Column(int i) const {
   return &leaves_[i];
 }
 
+int SchemaDescriptor::ColumnIndex(const std::string& node_path) const {
+  auto search = leaf_to_idx_.find(node_path);
+  if (search == leaf_to_idx_.end()) {
+    // Not found
+    return -1;
+  }
+  return search->second;
+}
+
+int SchemaDescriptor::ColumnIndex(const Node& node) const {
+  int result = ColumnIndex(node.path()->ToDotString());
+  if (result < 0) {
+    return -1;
+  }
+  DCHECK(result < num_columns());
+  if (!node.Equals(Column(result)->schema_node().get())) {
+    // Same path but not the same node
+    return -1;
+  }
+  return result;
+}
+
 const schema::NodePtr& SchemaDescriptor::GetColumnRoot(int i) const {
   DCHECK(i >= 0 && i < static_cast<int>(leaves_.size()));
   return leaf_to_base_.find(i)->second;
@@ -638,18 +705,7 @@ int ColumnDescriptor::type_length() const {
 }
 
 const std::shared_ptr<ColumnPath> ColumnDescriptor::path() const {
-  // Build the path in reverse order as we traverse the nodes to the top
-  std::vector<std::string> rpath_;
-  const Node* node = primitive_node_;
-  // The schema node is not part of the ColumnPath
-  while (node->parent()) {
-    rpath_.push_back(node->name());
-    node = node->parent();
-  }
-
-  // Build ColumnPath in correct order
-  std::vector<std::string> path_(rpath_.crbegin(), rpath_.crend());
-  return std::make_shared<ColumnPath>(std::move(path_));
+  return primitive_node_->path();
 }
 
 }  // namespace parquet

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/29ed01ea/src/parquet/schema.h
----------------------------------------------------------------------
diff --git a/src/parquet/schema.h b/src/parquet/schema.h
index 1615798..856f72d 100644
--- a/src/parquet/schema.h
+++ b/src/parquet/schema.h
@@ -38,6 +38,8 @@ class SchemaDescriptor;
 
 namespace schema {
 
+class Node;
+
 // List encodings: using the terminology from Impala to define different styles
 // of representing logical lists (a.k.a. ARRAY types) in Parquet schemas. Since
 // the converted type named in the Parquet metadata is ConvertedType::LIST we
@@ -87,6 +89,7 @@ class PARQUET_EXPORT ColumnPath {
   explicit ColumnPath(std::vector<std::string>&& path) : path_(path) {}
 
   static std::shared_ptr<ColumnPath> FromDotString(const std::string& dotstring);
+  static std::shared_ptr<ColumnPath> FromNode(const Node& node);
 
   std::shared_ptr<ColumnPath> extend(const std::string& node_name) const;
   std::string ToDotString() const;
@@ -139,6 +142,8 @@ class PARQUET_EXPORT Node {
 
   const Node* parent() const { return parent_; }
 
+  const std::shared_ptr<ColumnPath> path() const;
+
   // ToParquet returns an opaque void* to avoid exporting
   // parquet::SchemaElement into the public API
   virtual void ToParquet(void* opaque_element) const = 0;
@@ -249,6 +254,8 @@ class PARQUET_EXPORT GroupNode : public Node {
   bool Equals(const Node* other) const override;
 
   const NodePtr& field(int i) const { return fields_[i]; }
+  int FieldIndex(const std::string& name) const;
+  int FieldIndex(const Node& node) const;
 
   int field_count() const { return static_cast<int>(fields_.size()); }
 
@@ -261,16 +268,23 @@ class PARQUET_EXPORT GroupNode : public Node {
       const NodeVector& fields, LogicalType::type logical_type = LogicalType::NONE,
       int id = -1)
       : Node(Node::GROUP, name, repetition, logical_type, id), fields_(fields) {
+    field_name_to_idx_.clear();
+    auto field_idx = 0;
     for (NodePtr& field : fields_) {
       field->SetParent(this);
+      field_name_to_idx_[field->name()] = field_idx++;
     }
   }
 
   NodeVector fields_;
   bool EqualsInternal(const GroupNode* other) const;
 
+  // Mapping between field name to the field index
+  std::unordered_map<std::string, int> field_name_to_idx_;
+
   FRIEND_TEST(TestGroupNode, Attrs);
   FRIEND_TEST(TestGroupNode, Equals);
+  FRIEND_TEST(TestGroupNode, FieldIndex);
 };
 
 // ----------------------------------------------------------------------
@@ -362,6 +376,11 @@ class PARQUET_EXPORT SchemaDescriptor {
 
   const ColumnDescriptor* Column(int i) const;
 
+  // Get the index of a column by its dotstring path, or negative value if not found
+  int ColumnIndex(const std::string& node_path) const;
+  // Get the index of a column by its node, or negative value if not found
+  int ColumnIndex(const schema::Node& node) const;
+
   bool Equals(const SchemaDescriptor& other) const;
 
   // The number of physical columns appearing in the file
@@ -398,6 +417,9 @@ class PARQUET_EXPORT SchemaDescriptor {
   // -- -- -- c  |
   // -- -- -- -- d
   std::unordered_map<int, const schema::NodePtr> leaf_to_base_;
+
+  // Mapping between ColumnPath DotString to the leaf index
+  std::unordered_map<std::string, int> leaf_to_idx_;
 };
 
 }  // namespace parquet

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/29ed01ea/src/parquet/util/schema-util.h
----------------------------------------------------------------------
diff --git a/src/parquet/util/schema-util.h b/src/parquet/util/schema-util.h
new file mode 100644
index 0000000..618d21e
--- /dev/null
+++ b/src/parquet/util/schema-util.h
@@ -0,0 +1,84 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef PARQUET_SCHEMA_UTIL_H
+#define PARQUET_SCHEMA_UTIL_H
+
+#include <string>
+#include <vector>
+#include <unordered_set>
+
+#include "parquet/exception.h"
+#include "parquet/schema.h"
+#include "parquet/types.h"
+#include "parquet/util/logging.h"
+
+using parquet::ParquetException;
+using parquet::SchemaDescriptor;
+using parquet::schema::GroupNode;
+using parquet::schema::NodePtr;
+using parquet::schema::Node;
+using parquet::LogicalType;
+
+inline bool str_endswith_tuple(const std::string& str) {
+  if (str.size() >= 6) { return str.substr(str.size() - 6, 6) == "_tuple"; }
+  return false;
+}
+
+// Special case mentioned in the format spec:
+//   If the name is array or ends in _tuple, this should be a list of struct
+//   even for single child elements.
+inline bool HasStructListName(const GroupNode& node) {
+  return (node.name() == "array" ||
+          str_endswith_tuple(node.name()));
+}
+
+// TODO(itaiin): This aux. function is to be deleted once repeated structs are supported
+inline bool IsSimpleStruct(const NodePtr& node) {
+  if (!node->is_group()) return false;
+  if (node->is_repeated()) return false;
+  if (node->logical_type() == LogicalType::LIST) return false;
+  // Special case mentioned in the format spec:
+    //   If the name is array or ends in _tuple, this should be a list of struct
+    //   even for single child elements.
+  auto group = static_cast<const GroupNode*>(node.get());
+  if (group->field_count() == 1 && HasStructListName(*group)) return false;
+
+  return true;
+}
+
+// Coalesce a list of schema fields indices which are the roots of the
+// columns referred by a list of column indices
+inline bool ColumnIndicesToFieldIndices(const SchemaDescriptor& descr,
+    const std::vector<int>& column_indices, std::vector<int>* out) {
+  const GroupNode* group = descr.group_node();
+  std::unordered_set<int> already_added;
+  out->clear();
+  for (auto& column_idx : column_indices) {
+    auto field_node = descr.GetColumnRoot(column_idx);
+    auto field_idx = group->FieldIndex(field_node->name());
+    if (field_idx < 0) {
+      return false;
+    }
+    auto insertion = already_added.insert(field_idx);
+    if (insertion.second) { out->push_back(field_idx); }
+  }
+
+  return true;
+}
+
+#endif  // PARQUET_SCHEMA_UTIL_H