You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2023/06/24 00:04:20 UTC
[arrow] branch main updated: GH-35579: [C++] Support non-named FieldRefs in Parquet scanner (#35798)
This is an automated email from the ASF dual-hosted git repository.
westonpace pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/main by this push:
new 10eedbe63c GH-35579: [C++] Support non-named FieldRefs in Parquet scanner (#35798)
10eedbe63c is described below
commit 10eedbe63c71f4cf8f0621f3a2304ab3168a2ae5
Author: Ben Harkins <60...@users.noreply.github.com>
AuthorDate: Fri Jun 23 20:04:14 2023 -0400
GH-35579: [C++] Support non-named FieldRefs in Parquet scanner (#35798)
### Rationale for this change
When setting projections/filters for the file system scanner, the Parquet implementation requires that all materialized `FieldRef`s be position-independent (containing only names). However, it may be useful to support index-based field lookups as well - assuming the dataset schema is known.
### What changes are included in this PR?
Adds a translation step for field refs prior to looking them up in the fragment schema. A known dataset schema is required to do this reliably, however (since the fragment schema may be a sub/superset of the dataset schema) - so in the absence of one, we fall back to the existing behavior.
### Are these changes tested?
Yes (tests are included)
### Are there any user-facing changes?
Yes
* Closes: #35579
Authored-by: benibus <bp...@gmx.com>
Signed-off-by: Weston Pace <we...@gmail.com>
---
cpp/src/arrow/dataset/file_parquet.cc | 30 ++++++++++++++++++++++++-
cpp/src/arrow/dataset/file_parquet_test.cc | 36 ++++++++++++++++++++++++++++++
cpp/src/arrow/dataset/test_util_internal.h | 12 ++++++----
cpp/src/arrow/type.h | 14 ++++++++++++
4 files changed, 87 insertions(+), 5 deletions(-)
diff --git a/cpp/src/arrow/dataset/file_parquet.cc b/cpp/src/arrow/dataset/file_parquet.cc
index 3a9931cf2a..c30441d911 100644
--- a/cpp/src/arrow/dataset/file_parquet.cc
+++ b/cpp/src/arrow/dataset/file_parquet.cc
@@ -224,6 +224,28 @@ Status ResolveOneFieldRef(
return Status::OK();
}
+// Converts a field ref into a position-independent ref (containing only a sequence of
+// names) based on the dataset schema. Returns `false` if no conversion was needed.
+Result<FieldRef> MaybeConvertFieldRef(FieldRef ref, const Schema& dataset_schema) {
+ if (ARROW_PREDICT_TRUE(ref.IsNameSequence())) {
+ return std::move(ref);
+ }
+
+ ARROW_ASSIGN_OR_RAISE(auto path, ref.FindOne(dataset_schema));
+ std::vector<FieldRef> named_refs;
+ named_refs.reserve(path.indices().size());
+
+ const FieldVector* child_fields = &dataset_schema.fields();
+ for (auto index : path) {
+ const auto& child_field = *(*child_fields)[index];
+ named_refs.emplace_back(child_field.name());
+ child_fields = &child_field.type()->fields();
+ }
+
+ return named_refs.size() == 1 ? std::move(named_refs[0])
+ : FieldRef(std::move(named_refs));
+}
+
// Compute the column projection based on the scan options
Result<std::vector<int>> InferColumnProjection(const parquet::arrow::FileReader& reader,
const ScanOptions& options) {
@@ -248,7 +270,13 @@ Result<std::vector<int>> InferColumnProjection(const parquet::arrow::FileReader&
}
std::vector<int> columns_selection;
- for (const auto& ref : field_refs) {
+ for (auto& ref : field_refs) {
+ // In the (unlikely) absence of a known dataset schema, we require that all
+ // materialized refs are named.
+ if (options.dataset_schema) {
+ ARROW_ASSIGN_OR_RAISE(
+ ref, MaybeConvertFieldRef(std::move(ref), *options.dataset_schema));
+ }
RETURN_NOT_OK(ResolveOneFieldRef(manifest, ref, field_lookup, duplicate_fields,
&columns_selection));
}
diff --git a/cpp/src/arrow/dataset/file_parquet_test.cc b/cpp/src/arrow/dataset/file_parquet_test.cc
index 570e028695..42f923f0e6 100644
--- a/cpp/src/arrow/dataset/file_parquet_test.cc
+++ b/cpp/src/arrow/dataset/file_parquet_test.cc
@@ -678,6 +678,42 @@ TEST_P(TestParquetFileFormatScan, PredicatePushdownRowGroupFragmentsUsingStringC
CountRowGroupsInFragment(fragment, {0, 3}, equal(field_ref("x"), literal("a")));
}
+// Tests projection with nested/indexed FieldRefs.
+// https://github.com/apache/arrow/issues/35579
+TEST_P(TestParquetFileFormatScan, ProjectWithNonNamedFieldRefs) {
+ auto table_schema = schema(
+ {field("info", struct_({field("name", utf8()),
+ field("data", struct_({field("amount", float64()),
+ field("percent", float32())}))}))});
+ auto table = TableFromJSON(table_schema, {R"([
+ {"info": {"name": "a", "data": {"amount": 10.3, "percent": 0.1}}},
+ {"info": {"name": "b", "data": {"amount": 11.6, "percent": 0.2}}},
+ {"info": {"name": "c", "data": {"amount": 12.9, "percent": 0.3}}},
+ {"info": {"name": "d", "data": {"amount": 14.2, "percent": 0.4}}},
+ {"info": {"name": "e", "data": {"amount": 15.5, "percent": 0.5}}},
+ {"info": {"name": "f", "data": {"amount": 16.8, "percent": 0.6}}}])"});
+ ASSERT_OK_AND_ASSIGN(auto expected_batch, table->CombineChunksToBatch());
+
+ TableBatchReader reader(*table);
+ SetSchema(reader.schema()->fields());
+
+ auto source = GetFileSource(&reader);
+ ASSERT_OK_AND_ASSIGN(auto fragment, format_->MakeFragment(*source));
+
+ std::vector<FieldRef> equivalent_refs = {
+ FieldRef("info", "data", "percent"), FieldRef("info", 1, 1),
+ FieldRef(0, 1, "percent"), FieldRef(0, 1, 1),
+ FieldRef(0, FieldRef("data", 1)), FieldRef(FieldRef(0), FieldRef(1, 1)),
+ };
+ for (const auto& ref : equivalent_refs) {
+ ARROW_SCOPED_TRACE("ref = ", ref.ToString());
+
+ Project({field_ref(ref)}, {"value"});
+ auto batch = SingleBatch(fragment);
+ AssertBatchesEqual(*expected_batch, *batch);
+ }
+}
+
INSTANTIATE_TEST_SUITE_P(TestScan, TestParquetFileFormatScan,
::testing::ValuesIn(TestFormatParams::Values()),
TestFormatParams::ToTestNameString);
diff --git a/cpp/src/arrow/dataset/test_util_internal.h b/cpp/src/arrow/dataset/test_util_internal.h
index 80dd4adbf9..51d39d532c 100644
--- a/cpp/src/arrow/dataset/test_util_internal.h
+++ b/cpp/src/arrow/dataset/test_util_internal.h
@@ -516,16 +516,20 @@ class FileFormatFixtureMixin : public ::testing::Test {
SetProjection(opts_.get(), std::move(projection));
}
+ void Project(std::vector<compute::Expression> exprs, std::vector<std::string> names) {
+ ASSERT_OK_AND_ASSIGN(auto projection,
+ ProjectionDescr::FromExpressions(
+ std::move(exprs), std::move(names), *opts_->dataset_schema));
+ SetProjection(opts_.get(), std::move(projection));
+ }
+
void ProjectNested(std::vector<std::string> names) {
std::vector<compute::Expression> exprs;
for (const auto& name : names) {
ASSERT_OK_AND_ASSIGN(auto ref, FieldRef::FromDotPath(name));
exprs.push_back(field_ref(ref));
}
- ASSERT_OK_AND_ASSIGN(
- auto descr, ProjectionDescr::FromExpressions(std::move(exprs), std::move(names),
- *opts_->dataset_schema));
- SetProjection(opts_.get(), std::move(descr));
+ Project(std::move(exprs), std::move(names));
}
// Shared test cases
diff --git a/cpp/src/arrow/type.h b/cpp/src/arrow/type.h
index 48228d43ef..d218789f68 100644
--- a/cpp/src/arrow/type.h
+++ b/cpp/src/arrow/type.h
@@ -1823,6 +1823,20 @@ class ARROW_EXPORT FieldRef : public util::EqualityComparable<FieldRef> {
return true;
}
+ /// \brief Return true if this ref is a name or a nested sequence of only names
+ ///
+ /// Useful for determining if iteration is possible without recursion or inner loops
+ bool IsNameSequence() const {
+ if (IsName()) return true;
+ if (const auto* nested = nested_refs()) {
+ for (const auto& ref : *nested) {
+ if (!ref.IsName()) return false;
+ }
+ return !nested->empty();
+ }
+ return false;
+ }
+
const FieldPath* field_path() const {
return IsFieldPath() ? &std::get<FieldPath>(impl_) : NULLPTR;
}