You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@parquet.apache.org by we...@apache.org on 2017/04/10 18:25:42 UTC

parquet-cpp git commit: PARQUET-918: FromParquetSchema API crashes on nested schemas

Repository: parquet-cpp
Updated Branches:
  refs/heads/master 6b22b4685 -> b5e8cc430


PARQUET-918: FromParquetSchema API crashes on nested schemas

This is #275 with an Arrow API fix. Passing build: https://travis-ci.org/wesm/parquet-cpp/builds/220597810

Closes #275

Author: Itai Incze <it...@gmail.com>
Author: Wes McKinney <we...@twosigma.com>

Closes #295 from wesm/PARQUET-918 and squashes the following commits:

02f55fd [Wes McKinney] Fix Arrow APIs
a259750 [Itai Incze] Fixed: repeated group schema conversion bug
5fe3a01 [Itai Incze] fixed typos
1f7dec2 [Itai Incze] changed ReadTable tests to use API-fabricated parquet
69cc7a6 [Itai Incze] Improved FromParquetSchema tests and naming
34236b7 [Itai Incze] linting and readability
5ee1f44 [Itai Incze] Fix for [PARQUET-918]


Project: http://git-wip-us.apache.org/repos/asf/parquet-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/parquet-cpp/commit/b5e8cc43
Tree: http://git-wip-us.apache.org/repos/asf/parquet-cpp/tree/b5e8cc43
Diff: http://git-wip-us.apache.org/repos/asf/parquet-cpp/diff/b5e8cc43

Branch: refs/heads/master
Commit: b5e8cc4308fbf5565ce318a707d0c442f939a960
Parents: 6b22b46
Author: Itai Incze <it...@gmail.com>
Authored: Mon Apr 10 14:25:34 2017 -0400
Committer: Wes McKinney <we...@twosigma.com>
Committed: Mon Apr 10 14:25:34 2017 -0400

----------------------------------------------------------------------
 src/parquet/arrow/arrow-reader-writer-test.cc | 105 ++++++++++++++++
 src/parquet/arrow/arrow-schema-test.cc        | 135 +++++++++++++++++++++
 src/parquet/arrow/reader.cc                   |  10 +-
 src/parquet/arrow/schema.cc                   | 133 +++++++++++++++-----
 4 files changed, 346 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/b5e8cc43/src/parquet/arrow/arrow-reader-writer-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/arrow/arrow-reader-writer-test.cc b/src/parquet/arrow/arrow-reader-writer-test.cc
index dd46893..2f8f421 100644
--- a/src/parquet/arrow/arrow-reader-writer-test.cc
+++ b/src/parquet/arrow/arrow-reader-writer-test.cc
@@ -25,6 +25,9 @@
 #include "parquet/arrow/reader.h"
 #include "parquet/arrow/test-util.h"
 #include "parquet/arrow/writer.h"
+#include "parquet/arrow/schema.h"
+
+#include "parquet/file/writer.h"
 
 #include "arrow/api.h"
 #include "arrow/test-util.h"
@@ -45,6 +48,7 @@ using ParquetType = parquet::Type;
 using parquet::schema::GroupNode;
 using parquet::schema::NodePtr;
 using parquet::schema::PrimitiveNode;
+using parquet::arrow::FromParquetSchema;
 
 namespace parquet {
 namespace arrow {
@@ -875,5 +879,106 @@ TEST(TestArrowReadWrite, ReadColumnSubset) {
   ASSERT_TRUE(result->Equals(expected));
 }
 
+class TestNestedSchemaRead : public ::testing::Test {
+ protected:
+  virtual void SetUp() {
+    // We are using parquet low-level file api to create the nested parquet
+    CreateNestedParquet();
+    InitReader(&reader_);
+  }
+
+  void InitReader(std::shared_ptr<FileReader>* out) {
+    std::shared_ptr<Buffer> buffer = nested_parquet_->GetBuffer();
+    std::unique_ptr<FileReader> reader;
+    ASSERT_OK_NO_THROW(OpenFile(
+      std::make_shared<BufferReader>(buffer), ::arrow::default_memory_pool(),
+      ::parquet::default_reader_properties(), nullptr, &reader));
+
+    *out = std::move(reader);
+  }
+
+  void InitNewParquetFile(const std::shared_ptr<GroupNode>& schema, int num_rows) {
+    nested_parquet_ = std::make_shared<InMemoryOutputStream>();
+    writer_ = parquet::ParquetFileWriter::Open(nested_parquet_,
+       schema, default_writer_properties());
+    row_group_writer_ = writer_->AppendRowGroup(num_rows);
+  }
+
+  void FinalizeParquetFile() {
+    row_group_writer_->Close();
+    writer_->Close();
+  }
+
+  void CreateNestedParquet() {
+    std::vector<NodePtr> parquet_fields;
+    std::shared_ptr<Array> values;
+
+    // create the schema:
+    // required group group1 {
+    //   required int32 leaf1;
+    //   required int32 leaf2;
+    // }
+    // required int32 leaf3;
+
+    parquet_fields.push_back(
+        GroupNode::Make("group1", Repetition::REQUIRED, {
+          PrimitiveNode::Make(
+            "leaf1", Repetition::REQUIRED, ParquetType::INT32),
+          PrimitiveNode::Make(
+            "leaf2", Repetition::REQUIRED, ParquetType::INT32)}));
+    parquet_fields.push_back(PrimitiveNode::Make(
+        "leaf3", Repetition::REQUIRED, ParquetType::INT32));
+
+    const int num_columns = 3;
+    auto schema_node = GroupNode::Make("schema", Repetition::REQUIRED, parquet_fields);
+
+    InitNewParquetFile(std::static_pointer_cast<GroupNode>(schema_node), 0);
+
+    for (int i = 0; i < num_columns; i++) {
+      auto column_writer = row_group_writer_->NextColumn();
+      auto typed_writer = reinterpret_cast<TypedColumnWriter<Int32Type>*>(column_writer);
+      typed_writer->WriteBatch(0, nullptr, nullptr, nullptr);
+    }
+
+    FinalizeParquetFile();
+  }
+
+  std::shared_ptr<InMemoryOutputStream> nested_parquet_;
+  std::shared_ptr<FileReader> reader_;
+  std::unique_ptr<ParquetFileWriter> writer_;
+  RowGroupWriter* row_group_writer_;
+};
+
+TEST_F(TestNestedSchemaRead, ReadIntoTableFull) {
+  std::shared_ptr<Table> table;
+  ASSERT_OK_NO_THROW(reader_->ReadTable(&table));
+  ASSERT_EQ(table->num_rows(), 0);
+  ASSERT_EQ(table->num_columns(), 2);
+  ASSERT_EQ(table->schema()->field(0)->type()->num_children(), 2);
+}
+
+TEST_F(TestNestedSchemaRead, ReadTablePartial) {
+  std::shared_ptr<Table> table;
+
+  // columns: {group1.leaf1, leaf3}
+  ASSERT_OK_NO_THROW(reader_->ReadTable({0, 2}, &table));
+  ASSERT_EQ(table->num_rows(), 0);
+  ASSERT_EQ(table->num_columns(), 2);
+  ASSERT_EQ(table->schema()->field(0)->type()->num_children(), 1);
+
+  // columns: {group1.leaf1, group1.leaf2}
+  ASSERT_OK_NO_THROW(reader_->ReadTable({0, 1}, &table));
+  ASSERT_EQ(table->num_rows(), 0);
+  ASSERT_EQ(table->num_columns(), 1);
+  ASSERT_EQ(table->schema()->field(0)->type()->num_children(), 2);
+
+  // columns: {leaf3}
+  ASSERT_OK_NO_THROW(reader_->ReadTable({2}, &table));
+  ASSERT_EQ(table->num_rows(), 0);
+  ASSERT_EQ(table->num_columns(), 1);
+  ASSERT_EQ(table->schema()->field(0)->type()->num_children(), 0);
+}
+
 }  // namespace arrow
+
 }  // namespace parquet

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/b5e8cc43/src/parquet/arrow/arrow-schema-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/arrow/arrow-schema-test.cc b/src/parquet/arrow/arrow-schema-test.cc
index 83100d3..96de92e 100644
--- a/src/parquet/arrow/arrow-schema-test.cc
+++ b/src/parquet/arrow/arrow-schema-test.cc
@@ -73,6 +73,13 @@ class TestConvertParquetSchema : public ::testing::Test {
     return FromParquetSchema(&descr_, &result_schema_);
   }
 
+  ::arrow::Status ConvertSchema(const std::vector<NodePtr>& nodes,
+        const std::vector<int>& column_indices) {
+    NodePtr schema = GroupNode::Make("schema", Repetition::REPEATED, nodes);
+    descr_.Init(schema);
+    return FromParquetSchema(&descr_, column_indices, &result_schema_);
+  }
+
  protected:
   SchemaDescriptor descr_;
   std::shared_ptr<::arrow::Schema> result_schema_;
@@ -348,6 +355,134 @@ TEST_F(TestConvertParquetSchema, UnsupportedThings) {
   }
 }
 
+TEST_F(TestConvertParquetSchema, ParquetNestedSchema) {
+  std::vector<NodePtr> parquet_fields;
+  std::vector<std::shared_ptr<Field>> arrow_fields;
+
+  // required group group1 {
+  //   required bool leaf1;
+  //   required int32 leaf2;
+  // }
+  // required int64 leaf3;
+  {
+    parquet_fields.push_back(
+        GroupNode::Make("group1", Repetition::REQUIRED, {
+          PrimitiveNode::Make(
+            "leaf1", Repetition::REQUIRED, ParquetType::BOOLEAN),
+          PrimitiveNode::Make(
+            "leaf2", Repetition::REQUIRED, ParquetType::INT32)}));
+    parquet_fields.push_back(PrimitiveNode::Make(
+        "leaf3", Repetition::REQUIRED, ParquetType::INT64));
+
+    auto group1_fields = {
+      std::make_shared<Field>("leaf1", BOOL, false),
+      std::make_shared<Field>("leaf2", INT32, false)};
+    auto arrow_group1_type = std::make_shared<::arrow::StructType>(group1_fields);
+    arrow_fields.push_back(std::make_shared<Field>("group1", arrow_group1_type, false));
+    arrow_fields.push_back(std::make_shared<Field>("leaf3", INT64, false));
+  }
+
+  auto arrow_schema = std::make_shared<::arrow::Schema>(arrow_fields);
+  ASSERT_OK(ConvertSchema(parquet_fields));
+
+  CheckFlatSchema(arrow_schema);
+}
+
+TEST_F(TestConvertParquetSchema, ParquetNestedSchemaPartial) {
+  std::vector<NodePtr> parquet_fields;
+  std::vector<std::shared_ptr<Field>> arrow_fields;
+
+  // Full Parquet Schema:
+  // required group group1 {
+  //   required int64 leaf1;
+  //   required int64 leaf2;
+  // }
+  // required group group2 {
+  //   required int64 leaf3;
+  //   required int64 leaf4;
+  // }
+  // required int64 leaf5;
+  //
+  // Expected partial arrow schema (columns 0, 3, 4):
+  // required group group1 {
+  //   required int64 leaf1;
+  // }
+  // required group group2 {
+  //   required int64 leaf4;
+  // }
+  // required int64 leaf5;
+  {
+    parquet_fields.push_back(
+        GroupNode::Make("group1", Repetition::REQUIRED, {
+          PrimitiveNode::Make(
+            "leaf1", Repetition::REQUIRED, ParquetType::INT64),
+          PrimitiveNode::Make(
+            "leaf2", Repetition::REQUIRED, ParquetType::INT64)}));
+    parquet_fields.push_back(
+        GroupNode::Make("group2", Repetition::REQUIRED, {
+          PrimitiveNode::Make(
+            "leaf3", Repetition::REQUIRED, ParquetType::INT64),
+          PrimitiveNode::Make(
+            "leaf4", Repetition::REQUIRED, ParquetType::INT64)}));
+    parquet_fields.push_back(PrimitiveNode::Make(
+        "leaf5", Repetition::REQUIRED, ParquetType::INT64));
+
+    auto group1_fields = {std::make_shared<Field>("leaf1", INT64, false)};
+    auto arrow_group1_type = std::make_shared<::arrow::StructType>(group1_fields);
+    auto group2_fields = {std::make_shared<Field>("leaf4", INT64, false)};
+    auto arrow_group2_type = std::make_shared<::arrow::StructType>(group2_fields);
+
+    arrow_fields.push_back(std::make_shared<Field>("group1", arrow_group1_type, false));
+    arrow_fields.push_back(std::make_shared<Field>("group2", arrow_group2_type, false));
+    arrow_fields.push_back(std::make_shared<Field>("leaf5", INT64, false));
+  }
+
+  auto arrow_schema = std::make_shared<::arrow::Schema>(arrow_fields);
+  ASSERT_OK(ConvertSchema(parquet_fields, {0, 3, 4}));
+
+  CheckFlatSchema(arrow_schema);
+}
+
+TEST_F(TestConvertParquetSchema, ParquetRepeatedNestedSchema) {
+  std::vector<NodePtr> parquet_fields;
+  std::vector<std::shared_ptr<Field>> arrow_fields;
+  {
+    //   optional int32 leaf1;
+    //   repeated group outerGroup {
+    //     optional int32 leaf2;
+    //     repeated group innerGroup {
+    //       optional int32 leaf3;
+    //     }
+    //   }
+    parquet_fields.push_back(
+        PrimitiveNode::Make("leaf1", Repetition::OPTIONAL, ParquetType::INT32));
+    parquet_fields.push_back(
+      GroupNode::Make("outerGroup", Repetition::REPEATED, {
+        PrimitiveNode::Make(
+          "leaf2", Repetition::OPTIONAL, ParquetType::INT32),
+        GroupNode::Make("innerGroup", Repetition::REPEATED, {
+          PrimitiveNode::Make(
+            "leaf3", Repetition::OPTIONAL, ParquetType::INT32)})}));
+
+    auto inner_group_fields = {std::make_shared<Field>("leaf3", INT32, true)};
+    auto inner_group_type = std::make_shared<::arrow::StructType>(inner_group_fields);
+    auto outer_group_fields = {
+        std::make_shared<Field>("leaf2", INT32, true),
+        std::make_shared<Field>("innerGroup", ::arrow::list(
+            std::make_shared<Field>("innerGroup", inner_group_type, false)), false)};
+    auto outer_group_type = std::make_shared<::arrow::StructType>(outer_group_fields);
+
+    arrow_fields.push_back(std::make_shared<Field>("leaf1", INT32, true));
+    arrow_fields.push_back(
+        std::make_shared<Field>("outerGroup", ::arrow::list(
+            std::make_shared<Field>("outerGroup", outer_group_type, false)), false));
+  }
+  auto arrow_schema = std::make_shared<::arrow::Schema>(arrow_fields);
+  ASSERT_OK(ConvertSchema(parquet_fields));
+
+  CheckFlatSchema(arrow_schema);
+}
+
 class TestConvertArrowSchema : public ::testing::Test {
  public:
   virtual void SetUp() {}

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/b5e8cc43/src/parquet/arrow/reader.cc
----------------------------------------------------------------------
diff --git a/src/parquet/arrow/reader.cc b/src/parquet/arrow/reader.cc
index 2ca9207..38d5583 100644
--- a/src/parquet/arrow/reader.cc
+++ b/src/parquet/arrow/reader.cc
@@ -347,9 +347,9 @@ Status FileReader::Impl::ReadTable(
   std::shared_ptr<::arrow::Schema> schema;
   RETURN_NOT_OK(GetSchema(indices, &schema));
 
-  int num_columns = static_cast<int>(indices.size());
-  int nthreads = std::min<int>(num_threads_, num_columns);
-  std::vector<std::shared_ptr<Column>> columns(num_columns);
+  int num_fields = static_cast<int>(schema->num_fields());
+  int nthreads = std::min<int>(num_threads_, num_fields);
+  std::vector<std::shared_ptr<Column>> columns(num_fields);
 
   auto ReadColumnFunc = [&indices, &schema, &columns, this](int i) {
     std::shared_ptr<Array> array;
@@ -359,11 +359,11 @@ Status FileReader::Impl::ReadTable(
   };
 
   if (nthreads == 1) {
-    for (int i = 0; i < num_columns; i++) {
+    for (int i = 0; i < num_fields; i++) {
       RETURN_NOT_OK(ReadColumnFunc(i));
     }
   } else {
-    RETURN_NOT_OK(ParallelFor(nthreads, num_columns, ReadColumnFunc));
+    RETURN_NOT_OK(ParallelFor(nthreads, num_fields, ReadColumnFunc));
   }
 
   *table = std::make_shared<Table>(schema, columns);

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/b5e8cc43/src/parquet/arrow/schema.cc
----------------------------------------------------------------------
diff --git a/src/parquet/arrow/schema.cc b/src/parquet/arrow/schema.cc
index 76b7f77..e589581 100644
--- a/src/parquet/arrow/schema.cc
+++ b/src/parquet/arrow/schema.cc
@@ -19,6 +19,7 @@
 
 #include <string>
 #include <vector>
+#include <unordered_set>
 
 #include "parquet/api/schema.h"
 
@@ -183,12 +184,39 @@ Status FromPrimitive(const PrimitiveNode* primitive, TypePtr* out) {
   return Status::OK();
 }
 
-Status StructFromGroup(const GroupNode* group, TypePtr* out) {
-  std::vector<std::shared_ptr<Field>> fields(group->field_count());
+// Forward declaration
+Status NodeToFieldInternal(const NodePtr& node,
+    const std::unordered_set<NodePtr>* included_leaf_nodes, std::shared_ptr<Field>* out);
+
+/*
+ * Auxilary function to test if a parquet schema node is a leaf node
+ * that should be included in a resulting arrow schema
+ */
+inline bool IsIncludedLeaf(const NodePtr& node,
+    const std::unordered_set<NodePtr>* included_leaf_nodes) {
+  if (included_leaf_nodes == nullptr) {
+    return true;
+  }
+  auto search = included_leaf_nodes->find(node);
+  return (search != included_leaf_nodes->end());
+}
+
+Status StructFromGroup(const GroupNode* group,
+    const std::unordered_set<NodePtr>* included_leaf_nodes, TypePtr* out) {
+  std::vector<std::shared_ptr<Field>> fields;
+  std::shared_ptr<Field> field;
+
+  *out = nullptr;
+
   for (int i = 0; i < group->field_count(); i++) {
-    RETURN_NOT_OK(NodeToField(group->field(i), &fields[i]));
+    RETURN_NOT_OK(NodeToFieldInternal(group->field(i), included_leaf_nodes, &field));
+    if (field != nullptr) {
+      fields.push_back(field);
+    }
+  }
+  if (fields.size() > 0) {
+    *out = std::make_shared<::arrow::StructType>(fields);
   }
-  *out = std::make_shared<::arrow::StructType>(fields);
   return Status::OK();
 }
 
@@ -197,7 +225,9 @@ bool str_endswith_tuple(const std::string& str) {
   return false;
 }
 
-Status NodeToList(const GroupNode* group, TypePtr* out) {
+Status NodeToList(const GroupNode* group,
+    const std::unordered_set<NodePtr>* included_leaf_nodes, TypePtr* out) {
+  *out = nullptr;
   if (group->field_count() == 1) {
     // This attempts to resolve the preferred 3-level list encoding.
     NodePtr list_node = group->field(0);
@@ -210,22 +240,31 @@ Status NodeToList(const GroupNode* group, TypePtr* out) {
           !str_endswith_tuple(list_node->name())) {
         // List of primitive type
         std::shared_ptr<Field> item_field;
-        RETURN_NOT_OK(NodeToField(list_group->field(0), &item_field));
-        *out = ::arrow::list(item_field);
+        RETURN_NOT_OK(NodeToFieldInternal(
+          list_group->field(0), included_leaf_nodes, &item_field));
+
+        if (item_field != nullptr) {
+          *out = ::arrow::list(item_field);
+        }
       } else {
         // List of struct
         std::shared_ptr<::arrow::DataType> inner_type;
-        RETURN_NOT_OK(StructFromGroup(list_group, &inner_type));
-        auto item_field = std::make_shared<Field>(list_node->name(), inner_type, false);
-        *out = ::arrow::list(item_field);
+        RETURN_NOT_OK(StructFromGroup(list_group, included_leaf_nodes, &inner_type));
+        if (inner_type != nullptr) {
+          auto item_field = std::make_shared<Field>(list_node->name(), inner_type, false);
+          *out = ::arrow::list(item_field);
+        }
       }
     } else if (list_node->is_repeated()) {
       // repeated primitive node
       std::shared_ptr<::arrow::DataType> inner_type;
-      const PrimitiveNode* primitive = static_cast<const PrimitiveNode*>(list_node.get());
-      RETURN_NOT_OK(FromPrimitive(primitive, &inner_type));
-      auto item_field = std::make_shared<Field>(list_node->name(), inner_type, false);
-      *out = ::arrow::list(item_field);
+      if (IsIncludedLeaf(static_cast<NodePtr>(list_node), included_leaf_nodes)) {
+        const PrimitiveNode* primitive =
+          static_cast<const PrimitiveNode*>(list_node.get());
+        RETURN_NOT_OK(FromPrimitive(primitive, &inner_type));
+        auto item_field = std::make_shared<Field>(list_node->name(), inner_type, false);
+        *out = ::arrow::list(item_field);
+      }
     } else {
       return Status::NotImplemented(
           "Non-repeated groups in a LIST-annotated group are not supported.");
@@ -238,31 +277,49 @@ Status NodeToList(const GroupNode* group, TypePtr* out) {
 }
 
 Status NodeToField(const NodePtr& node, std::shared_ptr<Field>* out) {
-  std::shared_ptr<::arrow::DataType> type;
+  return NodeToFieldInternal(node, nullptr, out);
+}
+
+Status NodeToFieldInternal(const NodePtr& node,
+    const std::unordered_set<NodePtr>* included_leaf_nodes, std::shared_ptr<Field>* out) {
+
+  std::shared_ptr<::arrow::DataType> type = nullptr;
   bool nullable = !node->is_required();
 
+  *out = nullptr;
+
   if (node->is_repeated()) {
     // 1-level LIST encoding fields are required
     std::shared_ptr<::arrow::DataType> inner_type;
-    const PrimitiveNode* primitive = static_cast<const PrimitiveNode*>(node.get());
-    RETURN_NOT_OK(FromPrimitive(primitive, &inner_type));
-    auto item_field = std::make_shared<Field>(node->name(), inner_type, false);
-    type = ::arrow::list(item_field);
-    nullable = false;
+    if (node->is_group()) {
+      const GroupNode* group = static_cast<const GroupNode*>(node.get());
+      RETURN_NOT_OK(StructFromGroup(group, included_leaf_nodes, &inner_type));
+    } else if (IsIncludedLeaf(static_cast<NodePtr>(node), included_leaf_nodes)) {
+      const PrimitiveNode* primitive = static_cast<const PrimitiveNode*>(node.get());
+      RETURN_NOT_OK(FromPrimitive(primitive, &inner_type));
+    }
+    if (inner_type != nullptr) {
+      auto item_field = std::make_shared<Field>(node->name(), inner_type, false);
+      type = ::arrow::list(item_field);
+      nullable = false;
+    }
   } else if (node->is_group()) {
     const GroupNode* group = static_cast<const GroupNode*>(node.get());
     if (node->logical_type() == LogicalType::LIST) {
-      RETURN_NOT_OK(NodeToList(group, &type));
+      RETURN_NOT_OK(NodeToList(group, included_leaf_nodes, &type));
     } else {
-      RETURN_NOT_OK(StructFromGroup(group, &type));
+      RETURN_NOT_OK(StructFromGroup(group, included_leaf_nodes, &type));
     }
   } else {
     // Primitive (leaf) node
-    const PrimitiveNode* primitive = static_cast<const PrimitiveNode*>(node.get());
-    RETURN_NOT_OK(FromPrimitive(primitive, &type));
+    if (IsIncludedLeaf(static_cast<NodePtr>(node), included_leaf_nodes)) {
+      const PrimitiveNode* primitive = static_cast<const PrimitiveNode*>(node.get());
+      RETURN_NOT_OK(FromPrimitive(primitive, &type));
+    }
+  }
+  if (type != nullptr) {
+    *out = std::make_shared<Field>(node->name(), type, nullable);
   }
-
-  *out = std::make_shared<Field>(node->name(), type, nullable);
   return Status::OK();
 }
 
@@ -270,8 +327,9 @@ Status FromParquetSchema(
     const SchemaDescriptor* parquet_schema, std::shared_ptr<::arrow::Schema>* out) {
   const GroupNode* schema_node = parquet_schema->group_node();
 
-  std::vector<std::shared_ptr<Field>> fields(schema_node->field_count());
-  for (int i = 0; i < schema_node->field_count(); i++) {
+  int num_fields = static_cast<int>(schema_node->field_count());
+  std::vector<std::shared_ptr<Field>> fields(num_fields);
+  for (int i = 0; i < num_fields; i++) {
     RETURN_NOT_OK(NodeToField(schema_node->field(i), &fields[i]));
   }
 
@@ -285,11 +343,22 @@ Status FromParquetSchema(const SchemaDescriptor* parquet_schema,
   // from the root Parquet node
   const GroupNode* schema_node = parquet_schema->group_node();
 
-  int num_fields = static_cast<int>(column_indices.size());
+  // Put the right leaf nodes in an unordered set
+  int num_columns = static_cast<int>(column_indices.size());
+  std::unordered_set<NodePtr> included_leaf_nodes(num_columns);
+  for (int i = 0; i < num_columns; i++) {
+    auto column_desc = parquet_schema->Column(column_indices[i]);
+    included_leaf_nodes.insert(column_desc->schema_node());
+  }
 
-  std::vector<std::shared_ptr<Field>> fields(num_fields);
-  for (int i = 0; i < num_fields; i++) {
-    RETURN_NOT_OK(NodeToField(schema_node->field(column_indices[i]), &fields[i]));
+  std::vector<std::shared_ptr<Field>> fields;
+  std::shared_ptr<Field> field;
+  for (int i = 0; i < schema_node->field_count(); i++) {
+    RETURN_NOT_OK(NodeToFieldInternal(
+      schema_node->field(i), &included_leaf_nodes, &field));
+    if (field != nullptr) {
+      fields.push_back(field);
+    }
   }
 
   *out = std::make_shared<::arrow::Schema>(fields);