You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2023/06/24 00:04:20 UTC

[arrow] branch main updated: GH-35579: [C++] Support non-named FieldRefs in Parquet scanner (#35798)

This is an automated email from the ASF dual-hosted git repository.

westonpace pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/main by this push:
     new 10eedbe63c GH-35579: [C++] Support non-named FieldRefs in Parquet scanner (#35798)
10eedbe63c is described below

commit 10eedbe63c71f4cf8f0621f3a2304ab3168a2ae5
Author: Ben Harkins <60...@users.noreply.github.com>
AuthorDate: Fri Jun 23 20:04:14 2023 -0400

    GH-35579: [C++] Support non-named FieldRefs in Parquet scanner (#35798)
    
    
    
    ### Rationale for this change
    
    When setting projections/filters for the file system scanner, the Parquet implementation requires that all materialized `FieldRef`s be position-independent (containing only names). However, it may be useful to support index-based field lookups as well - assuming the dataset schema is known.
    
    ### What changes are included in this PR?
    
    Adds a translation step for field refs prior to looking them up in the fragment schema. A known dataset schema is required to do this reliably, however (since the fragment schema may be a sub/superset of the dataset schema) - so in the absence of one, we fall back to the existing behavior.
    
    ### Are these changes tested?
    
    Yes (tests are included)
    
    ### Are there any user-facing changes?
    
    Yes
    
    * Closes: #35579
    
    Authored-by: benibus <bp...@gmx.com>
    Signed-off-by: Weston Pace <we...@gmail.com>
---
 cpp/src/arrow/dataset/file_parquet.cc      | 30 ++++++++++++++++++++++++-
 cpp/src/arrow/dataset/file_parquet_test.cc | 36 ++++++++++++++++++++++++++++++
 cpp/src/arrow/dataset/test_util_internal.h | 12 ++++++----
 cpp/src/arrow/type.h                       | 14 ++++++++++++
 4 files changed, 87 insertions(+), 5 deletions(-)

diff --git a/cpp/src/arrow/dataset/file_parquet.cc b/cpp/src/arrow/dataset/file_parquet.cc
index 3a9931cf2a..c30441d911 100644
--- a/cpp/src/arrow/dataset/file_parquet.cc
+++ b/cpp/src/arrow/dataset/file_parquet.cc
@@ -224,6 +224,28 @@ Status ResolveOneFieldRef(
   return Status::OK();
 }
 
+// Converts a field ref into a position-independent ref (containing only a sequence of
+// names) based on the dataset schema. Returns `false` if no conversion was needed.
+Result<FieldRef> MaybeConvertFieldRef(FieldRef ref, const Schema& dataset_schema) {
+  if (ARROW_PREDICT_TRUE(ref.IsNameSequence())) {
+    return std::move(ref);
+  }
+
+  ARROW_ASSIGN_OR_RAISE(auto path, ref.FindOne(dataset_schema));
+  std::vector<FieldRef> named_refs;
+  named_refs.reserve(path.indices().size());
+
+  const FieldVector* child_fields = &dataset_schema.fields();
+  for (auto index : path) {
+    const auto& child_field = *(*child_fields)[index];
+    named_refs.emplace_back(child_field.name());
+    child_fields = &child_field.type()->fields();
+  }
+
+  return named_refs.size() == 1 ? std::move(named_refs[0])
+                                : FieldRef(std::move(named_refs));
+}
+
 // Compute the column projection based on the scan options
 Result<std::vector<int>> InferColumnProjection(const parquet::arrow::FileReader& reader,
                                                const ScanOptions& options) {
@@ -248,7 +270,13 @@ Result<std::vector<int>> InferColumnProjection(const parquet::arrow::FileReader&
   }
 
   std::vector<int> columns_selection;
-  for (const auto& ref : field_refs) {
+  for (auto& ref : field_refs) {
+    // In the (unlikely) absence of a known dataset schema, we require that all
+    // materialized refs are named.
+    if (options.dataset_schema) {
+      ARROW_ASSIGN_OR_RAISE(
+          ref, MaybeConvertFieldRef(std::move(ref), *options.dataset_schema));
+    }
     RETURN_NOT_OK(ResolveOneFieldRef(manifest, ref, field_lookup, duplicate_fields,
                                      &columns_selection));
   }
diff --git a/cpp/src/arrow/dataset/file_parquet_test.cc b/cpp/src/arrow/dataset/file_parquet_test.cc
index 570e028695..42f923f0e6 100644
--- a/cpp/src/arrow/dataset/file_parquet_test.cc
+++ b/cpp/src/arrow/dataset/file_parquet_test.cc
@@ -678,6 +678,42 @@ TEST_P(TestParquetFileFormatScan, PredicatePushdownRowGroupFragmentsUsingStringC
   CountRowGroupsInFragment(fragment, {0, 3}, equal(field_ref("x"), literal("a")));
 }
 
+// Tests projection with nested/indexed FieldRefs.
+// https://github.com/apache/arrow/issues/35579
+TEST_P(TestParquetFileFormatScan, ProjectWithNonNamedFieldRefs) {
+  auto table_schema = schema(
+      {field("info", struct_({field("name", utf8()),
+                              field("data", struct_({field("amount", float64()),
+                                                     field("percent", float32())}))}))});
+  auto table = TableFromJSON(table_schema, {R"([
+    {"info": {"name": "a", "data": {"amount": 10.3, "percent": 0.1}}},
+    {"info": {"name": "b", "data": {"amount": 11.6, "percent": 0.2}}},
+    {"info": {"name": "c", "data": {"amount": 12.9, "percent": 0.3}}},
+    {"info": {"name": "d", "data": {"amount": 14.2, "percent": 0.4}}},
+    {"info": {"name": "e", "data": {"amount": 15.5, "percent": 0.5}}},
+    {"info": {"name": "f", "data": {"amount": 16.8, "percent": 0.6}}}])"});
+  ASSERT_OK_AND_ASSIGN(auto expected_batch, table->CombineChunksToBatch());
+
+  TableBatchReader reader(*table);
+  SetSchema(reader.schema()->fields());
+
+  auto source = GetFileSource(&reader);
+  ASSERT_OK_AND_ASSIGN(auto fragment, format_->MakeFragment(*source));
+
+  std::vector<FieldRef> equivalent_refs = {
+      FieldRef("info", "data", "percent"), FieldRef("info", 1, 1),
+      FieldRef(0, 1, "percent"),           FieldRef(0, 1, 1),
+      FieldRef(0, FieldRef("data", 1)),    FieldRef(FieldRef(0), FieldRef(1, 1)),
+  };
+  for (const auto& ref : equivalent_refs) {
+    ARROW_SCOPED_TRACE("ref = ", ref.ToString());
+
+    Project({field_ref(ref)}, {"value"});
+    auto batch = SingleBatch(fragment);
+    AssertBatchesEqual(*expected_batch, *batch);
+  }
+}
+
 INSTANTIATE_TEST_SUITE_P(TestScan, TestParquetFileFormatScan,
                          ::testing::ValuesIn(TestFormatParams::Values()),
                          TestFormatParams::ToTestNameString);
diff --git a/cpp/src/arrow/dataset/test_util_internal.h b/cpp/src/arrow/dataset/test_util_internal.h
index 80dd4adbf9..51d39d532c 100644
--- a/cpp/src/arrow/dataset/test_util_internal.h
+++ b/cpp/src/arrow/dataset/test_util_internal.h
@@ -516,16 +516,20 @@ class FileFormatFixtureMixin : public ::testing::Test {
     SetProjection(opts_.get(), std::move(projection));
   }
 
+  void Project(std::vector<compute::Expression> exprs, std::vector<std::string> names) {
+    ASSERT_OK_AND_ASSIGN(auto projection,
+                         ProjectionDescr::FromExpressions(
+                             std::move(exprs), std::move(names), *opts_->dataset_schema));
+    SetProjection(opts_.get(), std::move(projection));
+  }
+
   void ProjectNested(std::vector<std::string> names) {
     std::vector<compute::Expression> exprs;
     for (const auto& name : names) {
       ASSERT_OK_AND_ASSIGN(auto ref, FieldRef::FromDotPath(name));
       exprs.push_back(field_ref(ref));
     }
-    ASSERT_OK_AND_ASSIGN(
-        auto descr, ProjectionDescr::FromExpressions(std::move(exprs), std::move(names),
-                                                     *opts_->dataset_schema));
-    SetProjection(opts_.get(), std::move(descr));
+    Project(std::move(exprs), std::move(names));
   }
 
   // Shared test cases
diff --git a/cpp/src/arrow/type.h b/cpp/src/arrow/type.h
index 48228d43ef..d218789f68 100644
--- a/cpp/src/arrow/type.h
+++ b/cpp/src/arrow/type.h
@@ -1823,6 +1823,20 @@ class ARROW_EXPORT FieldRef : public util::EqualityComparable<FieldRef> {
     return true;
   }
 
+  /// \brief Return true if this ref is a name or a nested sequence of only names
+  ///
+  /// Useful for determining if iteration is possible without recursion or inner loops
+  bool IsNameSequence() const {
+    if (IsName()) return true;
+    if (const auto* nested = nested_refs()) {
+      for (const auto& ref : *nested) {
+        if (!ref.IsName()) return false;
+      }
+      return !nested->empty();
+    }
+    return false;
+  }
+
   const FieldPath* field_path() const {
     return IsFieldPath() ? &std::get<FieldPath>(impl_) : NULLPTR;
   }