You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ic...@apache.org on 2023/04/20 12:45:17 UTC
[arrow] branch main updated: GH-35004: [C++] Remove RelationInfo (#35005)
This is an automated email from the ASF dual-hosted git repository.
icexelloss pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/main by this push:
new d5866ec3fa GH-35004: [C++] Remove RelationInfo (#35005)
d5866ec3fa is described below
commit d5866ec3fa3cff4dbceb5d7838039485cff101c5
Author: rtpsw <rt...@hotmail.com>
AuthorDate: Thu Apr 20 15:45:03 2023 +0300
GH-35004: [C++] Remove RelationInfo (#35005)
See #35004
* Closes: #35004
Lead-authored-by: Yaron Gvili <rt...@hotmail.com>
Co-authored-by: rtpsw <rt...@hotmail.com>
Signed-off-by: Li Jin <ic...@gmail.com>
---
cpp/src/arrow/acero/asof_join_node.cc | 22 ++-----
cpp/src/arrow/acero/asof_join_node.h | 7 +-
cpp/src/arrow/engine/substrait/options.cc | 75 +++++++++-------------
cpp/src/arrow/engine/substrait/options.h | 8 +--
cpp/src/arrow/engine/substrait/relation.h | 16 -----
.../arrow/engine/substrait/relation_internal.cc | 52 +++++----------
cpp/src/arrow/engine/substrait/serde_test.cc | 12 ++--
7 files changed, 61 insertions(+), 131 deletions(-)
diff --git a/cpp/src/arrow/acero/asof_join_node.cc b/cpp/src/arrow/acero/asof_join_node.cc
index 4d0f69fca5..e48d6c3bf9 100644
--- a/cpp/src/arrow/acero/asof_join_node.cc
+++ b/cpp/src/arrow/acero/asof_join_node.cc
@@ -1315,25 +1315,19 @@ class AsofJoinNode : public ExecNode {
/// \brief Make the output schema of an as-of-join node
///
- /// Optionally, also provides the field output indices for this node.
- /// \see arrow::engine::RelationInfo
- ///
/// \param[in] input_schema the schema of each input to the node
/// \param[in] indices_of_on_key the on-key index of each input to the node
/// \param[in] indices_of_by_key the by-key indices of each input to the node
- /// \param[out] field_output_indices the output index of each field
static arrow::Result<std::shared_ptr<Schema>> MakeOutputSchema(
const std::vector<std::shared_ptr<Schema>> input_schema,
const std::vector<col_index_t>& indices_of_on_key,
- const std::vector<std::vector<col_index_t>>& indices_of_by_key,
- std::vector<int>* field_output_indices = nullptr) {
+ const std::vector<std::vector<col_index_t>>& indices_of_by_key) {
std::vector<std::shared_ptr<arrow::Field>> fields;
size_t n_by = indices_of_by_key.size() == 0 ? 0 : indices_of_by_key[0].size();
const DataType* on_key_type = NULLPTR;
std::vector<const DataType*> by_key_type(n_by, NULLPTR);
// Take all non-key, non-time RHS fields
- int output_field_idx = 0;
for (size_t j = 0; j < input_schema.size(); ++j) {
const auto& on_field_ix = indices_of_on_key[j];
const auto& by_field_ix = indices_of_by_key[j];
@@ -1367,30 +1361,22 @@ class AsofJoinNode : public ExecNode {
for (int i = 0; i < input_schema[j]->num_fields(); ++i) {
const auto field = input_schema[j]->field(i);
- bool as_output; // true if the field appears as an output
- int final_output_idx; // the final output index for the field
+ bool as_output; // true if the field appears as an output
if (i == on_field_ix) {
ARROW_RETURN_NOT_OK(is_valid_on_field(field));
// Only add on field from the left table
as_output = (j == 0);
- final_output_idx = as_output ? output_field_idx++ : indices_of_on_key[0];
} else if (std_has(by_field_ix, i)) {
ARROW_RETURN_NOT_OK(is_valid_by_field(field));
// Only add by field from the left table
as_output = (j == 0);
- final_output_idx = as_output ? output_field_idx++
- : indices_of_by_key[0][std_index(by_field_ix, i)];
} else {
ARROW_RETURN_NOT_OK(is_valid_data_field(field));
as_output = true;
- final_output_idx = output_field_idx++;
}
if (as_output) {
fields.push_back(field);
}
- if (field_output_indices) {
- field_output_indices->push_back(final_output_idx);
- }
}
}
return std::make_shared<arrow::Schema>(fields);
@@ -1604,13 +1590,13 @@ namespace asofjoin {
Result<std::shared_ptr<Schema>> MakeOutputSchema(
const std::vector<std::shared_ptr<Schema>>& input_schema,
- const std::vector<AsofJoinKeys>& input_keys, std::vector<int>* field_output_indices) {
+ const std::vector<AsofJoinKeys>& input_keys) {
ARROW_ASSIGN_OR_RAISE(std::vector<col_index_t> indices_of_on_key,
AsofJoinNode::GetIndicesOfOnKey(input_schema, input_keys));
ARROW_ASSIGN_OR_RAISE(std::vector<std::vector<col_index_t>> indices_of_by_key,
AsofJoinNode::GetIndicesOfByKey(input_schema, input_keys));
return AsofJoinNode::MakeOutputSchema(input_schema, indices_of_on_key,
- indices_of_by_key, field_output_indices);
+ indices_of_by_key);
}
} // namespace asofjoin
diff --git a/cpp/src/arrow/acero/asof_join_node.h b/cpp/src/arrow/acero/asof_join_node.h
index b2ad2edc4a..6a0ce8fd38 100644
--- a/cpp/src/arrow/acero/asof_join_node.h
+++ b/cpp/src/arrow/acero/asof_join_node.h
@@ -30,16 +30,11 @@ using AsofJoinKeys = AsofJoinNodeOptions::Keys;
/// \brief Make the output schema of an as-of-join node
///
-/// Optionally, also provides the field output indices for this node.
-/// \see arrow::engine::RelationInfo
-///
/// \param[in] input_schema the schema of each input to the node
/// \param[in] input_keys the key of each input to the node
-/// \param[out] field_output_indices the output index of each field
ARROW_ACERO_EXPORT Result<std::shared_ptr<Schema>> MakeOutputSchema(
const std::vector<std::shared_ptr<Schema>>& input_schema,
- const std::vector<AsofJoinKeys>& input_keys,
- std::vector<int>* field_output_indices = NULLPTR);
+ const std::vector<AsofJoinKeys>& input_keys);
} // namespace asofjoin
} // namespace acero
diff --git a/cpp/src/arrow/engine/substrait/options.cc b/cpp/src/arrow/engine/substrait/options.cc
index 0a1af6fce1..67fc4f329d 100644
--- a/cpp/src/arrow/engine/substrait/options.cc
+++ b/cpp/src/arrow/engine/substrait/options.cc
@@ -46,26 +46,26 @@ std::vector<acero::Declaration::Input> MakeDeclarationInputs(
class BaseExtensionProvider : public ExtensionProvider {
public:
- Result<RelationInfo> MakeRel(const ConversionOptions& conv_opts,
- const std::vector<DeclarationInfo>& inputs,
- const ExtensionDetails& ext_details,
- const ExtensionSet& ext_set) override {
+ Result<DeclarationInfo> MakeRel(const ConversionOptions& conv_opts,
+ const std::vector<DeclarationInfo>& inputs,
+ const ExtensionDetails& ext_details,
+ const ExtensionSet& ext_set) override {
auto details = dynamic_cast<const DefaultExtensionDetails&>(ext_details);
return MakeRel(conv_opts, inputs, details.rel, ext_set);
}
- virtual Result<RelationInfo> MakeRel(const ConversionOptions& conv_opts,
- const std::vector<DeclarationInfo>& inputs,
- const google::protobuf::Any& rel,
- const ExtensionSet& ext_set) = 0;
+ virtual Result<DeclarationInfo> MakeRel(const ConversionOptions& conv_opts,
+ const std::vector<DeclarationInfo>& inputs,
+ const google::protobuf::Any& rel,
+ const ExtensionSet& ext_set) = 0;
};
class DefaultExtensionProvider : public BaseExtensionProvider {
public:
- Result<RelationInfo> MakeRel(const ConversionOptions& conv_opts,
- const std::vector<DeclarationInfo>& inputs,
- const google::protobuf::Any& rel,
- const ExtensionSet& ext_set) override {
+ Result<DeclarationInfo> MakeRel(const ConversionOptions& conv_opts,
+ const std::vector<DeclarationInfo>& inputs,
+ const google::protobuf::Any& rel,
+ const ExtensionSet& ext_set) override {
if (rel.Is<substrait_ext::AsOfJoinRel>()) {
substrait_ext::AsOfJoinRel as_of_join_rel;
rel.UnpackTo(&as_of_join_rel);
@@ -86,9 +86,9 @@ class DefaultExtensionProvider : public BaseExtensionProvider {
}
private:
- Result<RelationInfo> MakeAsOfJoinRel(const std::vector<DeclarationInfo>& inputs,
- const substrait_ext::AsOfJoinRel& as_of_join_rel,
- const ExtensionSet& ext_set) {
+ Result<DeclarationInfo> MakeAsOfJoinRel(
+ const std::vector<DeclarationInfo>& inputs,
+ const substrait_ext::AsOfJoinRel& as_of_join_rel, const ExtensionSet& ext_set) {
if (inputs.size() < 2) {
return Status::Invalid("substrait_ext::AsOfJoinNode too few input tables: ",
inputs.size());
@@ -133,24 +133,21 @@ class DefaultExtensionProvider : public BaseExtensionProvider {
for (size_t i = 0; i < inputs.size(); i++) {
input_schema[i] = inputs[i].output_schema;
}
- std::vector<int> field_output_indices;
ARROW_ASSIGN_OR_RAISE(auto schema,
- acero::asofjoin::MakeOutputSchema(input_schema, input_keys,
- &field_output_indices));
+ acero::asofjoin::MakeOutputSchema(input_schema, input_keys));
acero::AsofJoinNodeOptions asofjoin_node_opts{std::move(input_keys), tolerance};
// declaration
auto input_decls = MakeDeclarationInputs(inputs);
- return RelationInfo{
- {acero::Declaration("asofjoin", input_decls, std::move(asofjoin_node_opts)),
- std::move(schema)},
- std::move(field_output_indices)};
+ return DeclarationInfo{
+ acero::Declaration("asofjoin", input_decls, std::move(asofjoin_node_opts)),
+ std::move(schema)};
}
- Result<RelationInfo> MakeNamedTapRel(const ConversionOptions& conv_opts,
- const std::vector<DeclarationInfo>& inputs,
- const substrait_ext::NamedTapRel& named_tap_rel,
- const ExtensionSet& ext_set) {
+ Result<DeclarationInfo> MakeNamedTapRel(const ConversionOptions& conv_opts,
+ const std::vector<DeclarationInfo>& inputs,
+ const substrait_ext::NamedTapRel& named_tap_rel,
+ const ExtensionSet& ext_set) {
if (inputs.size() != 1) {
return Status::Invalid(
"substrait_ext::NamedTapRel requires a single input but got: ", inputs.size());
@@ -169,10 +166,10 @@ class DefaultExtensionProvider : public BaseExtensionProvider {
ARROW_ASSIGN_OR_RAISE(
auto decl, conv_opts.named_tap_provider(named_tap_rel.kind(), input_decls,
named_tap_rel.name(), renamed_schema));
- return RelationInfo{{std::move(decl), std::move(renamed_schema)}, std::nullopt};
+ return DeclarationInfo{std::move(decl), std::move(renamed_schema)};
}
- Result<RelationInfo> MakeSegmentedAggregateRel(
+ Result<DeclarationInfo> MakeSegmentedAggregateRel(
const ConversionOptions& conv_opts, const std::vector<DeclarationInfo>& inputs,
const substrait_ext::SegmentedAggregateRel& seg_agg_rel,
const ExtensionSet& ext_set) {
@@ -211,21 +208,13 @@ class DefaultExtensionProvider : public BaseExtensionProvider {
aggregates.push_back(std::move(aggregate));
}
- ARROW_ASSIGN_OR_RAISE(
- auto output_schema,
- acero::aggregate::MakeOutputSchema(input_schema, keys, segment_keys, aggregates));
-
- ARROW_ASSIGN_OR_RAISE(auto decl_info, internal::MakeAggregateDeclaration(
- std::move(inputs[0].declaration),
- output_schema, std::move(aggregates),
- std::move(keys), std::move(segment_keys)));
-
- size_t out_size = output_schema->num_fields();
- std::vector<int> field_output_indices(out_size);
- for (int i = 0; i < static_cast<int>(out_size); i++) {
- field_output_indices[i] = i;
- }
- return RelationInfo{decl_info, std::move(field_output_indices)};
+ ARROW_ASSIGN_OR_RAISE(auto aggregate_schema,
+ acero::aggregate::MakeOutputSchema(
+ input_schema, keys, /*segment_keys=*/{}, aggregates));
+
+ return internal::MakeAggregateDeclaration(
+ std::move(inputs[0].declaration), std::move(aggregate_schema),
+ std::move(aggregates), std::move(keys), std::move(segment_keys));
}
};
diff --git a/cpp/src/arrow/engine/substrait/options.h b/cpp/src/arrow/engine/substrait/options.h
index e501914dd3..0d66c5eea4 100644
--- a/cpp/src/arrow/engine/substrait/options.h
+++ b/cpp/src/arrow/engine/substrait/options.h
@@ -81,10 +81,10 @@ class ARROW_ENGINE_EXPORT ExtensionDetails {
class ARROW_ENGINE_EXPORT ExtensionProvider {
public:
virtual ~ExtensionProvider() = default;
- virtual Result<RelationInfo> MakeRel(const ConversionOptions& conv_opts,
- const std::vector<DeclarationInfo>& inputs,
- const ExtensionDetails& ext_details,
- const ExtensionSet& ext_set) = 0;
+ virtual Result<DeclarationInfo> MakeRel(const ConversionOptions& conv_opts,
+ const std::vector<DeclarationInfo>& inputs,
+ const ExtensionDetails& ext_details,
+ const ExtensionSet& ext_set) = 0;
};
/// \brief Get the default extension provider
diff --git a/cpp/src/arrow/engine/substrait/relation.h b/cpp/src/arrow/engine/substrait/relation.h
index a7e3605bf6..0be4e03bb3 100644
--- a/cpp/src/arrow/engine/substrait/relation.h
+++ b/cpp/src/arrow/engine/substrait/relation.h
@@ -34,22 +34,6 @@ struct ARROW_ENGINE_EXPORT DeclarationInfo {
std::shared_ptr<Schema> output_schema;
};
-/// Information resulting from converting a Substrait relation.
-///
-/// RelationInfo adds the "output indices" field for the extension to define how the
-/// fields should be mapped to get the standard indices expected by Substrait.
-struct ARROW_ENGINE_EXPORT RelationInfo {
- /// The execution information produced thus far.
- DeclarationInfo decl_info;
- /// A vector of indices, one per input field per input in order, each index referring
- /// to the corresponding field within the output schema, if it is in the output, or -1
- /// otherwise. Each location in this vector is a field input index. This vector is
- /// useful for translating selected field input indices (often from an output mapping in
- /// a Substrait plan) of a join-type relation to their locations in the output schema of
- /// the relation. This vector is undefined if the translation is unsupported.
- std::optional<std::vector<int>> field_output_indices;
-};
-
/// Information resulting from converting a Substrait plan
struct ARROW_ENGINE_EXPORT PlanInfo {
/// The root declaration.
diff --git a/cpp/src/arrow/engine/substrait/relation_internal.cc b/cpp/src/arrow/engine/substrait/relation_internal.cc
index 0336bb3dd1..c19ba18a38 100644
--- a/cpp/src/arrow/engine/substrait/relation_internal.cc
+++ b/cpp/src/arrow/engine/substrait/relation_internal.cc
@@ -162,9 +162,8 @@ Result<DeclarationInfo> ProcessEmit(const substrait::ProjectRel& rel,
no_emit_declr, schema);
}
-Result<DeclarationInfo> ProcessExtensionEmit(
- const DeclarationInfo& no_emit_declr, const std::vector<int>& emit_order,
- const std::vector<int>& field_output_indices) {
+Result<DeclarationInfo> ProcessExtensionEmit(const DeclarationInfo& no_emit_declr,
+ const std::vector<int>& emit_order) {
const std::shared_ptr<Schema>& input_schema = no_emit_declr.output_schema;
std::vector<compute::Expression> proj_field_refs;
proj_field_refs.reserve(emit_order.size());
@@ -172,15 +171,11 @@ Result<DeclarationInfo> ProcessExtensionEmit(
emit_fields.reserve(emit_order.size());
for (int emit_idx : emit_order) {
- if (emit_idx < 0 || static_cast<size_t>(emit_idx) >= field_output_indices.size()) {
+ if (emit_idx < 0 || emit_idx >= input_schema->num_fields()) {
return Status::Invalid("Out of bounds emit index ", emit_idx);
}
- int field_idx = field_output_indices[emit_idx];
- if (field_idx < 0) {
- return Status::Invalid("Non-output emit index ", emit_idx);
- }
- proj_field_refs.push_back(compute::field_ref(FieldRef(field_idx)));
- emit_fields.push_back(input_schema->field(field_idx));
+ proj_field_refs.push_back(compute::field_ref(FieldRef(emit_idx)));
+ emit_fields.push_back(input_schema->field(emit_idx));
}
std::shared_ptr<Schema> emit_schema = schema(std::move(emit_fields));
@@ -192,13 +187,13 @@ Result<DeclarationInfo> ProcessExtensionEmit(
std::move(emit_schema)};
}
-Result<RelationInfo> GetExtensionRelationInfo(const substrait::Rel& rel,
- const ExtensionSet& ext_set,
- const ConversionOptions& conv_opts,
- std::vector<DeclarationInfo>* inputs_arg) {
+Result<DeclarationInfo> GetExtensionInfo(const substrait::Rel& rel,
+ const ExtensionSet& ext_set,
+ const ConversionOptions& conv_opts,
+ std::vector<DeclarationInfo>* inputs_arg) {
if (inputs_arg == nullptr) {
std::vector<DeclarationInfo> inputs_tmp;
- return GetExtensionRelationInfo(rel, ext_set, conv_opts, &inputs_tmp);
+ return GetExtensionInfo(rel, ext_set, conv_opts, &inputs_tmp);
}
std::vector<DeclarationInfo>& inputs = *inputs_arg;
inputs.clear();
@@ -788,44 +783,27 @@ Result<DeclarationInfo> FromProto(const substrait::Rel& rel, const ExtensionSet&
case substrait::Rel::RelTypeCase::kExtensionMulti: {
std::vector<DeclarationInfo> ext_rel_inputs;
ARROW_ASSIGN_OR_RAISE(
- auto ext_rel_info,
- GetExtensionRelationInfo(rel, ext_set, conversion_options, &ext_rel_inputs));
- const auto& ext_decl_info = ext_rel_info.decl_info;
+ auto ext_decl_info,
+ GetExtensionInfo(rel, ext_set, conversion_options, &ext_rel_inputs));
auto ext_common_opt = GetExtensionRelCommon(rel);
bool has_emit = ext_common_opt && ext_common_opt->emit_kind_case() ==
substrait::RelCommon::EmitKindCase::kEmit;
- if (!ext_rel_info.field_output_indices) {
- if (!has_emit) {
- return ext_decl_info;
- }
- return Status::NotImplemented("Emit not supported by ",
- ext_decl_info.declaration.factory_name);
- }
// Set up the emit order - an ordered list of indices that specifies an output
// mapping as expected by Substrait. This is a sublist of [0..N), where N is the
// total number of input fields across all inputs of the relation, that selects
// from these input fields.
- std::vector<int> emit_order;
if (has_emit) {
+ std::vector<int> emit_order;
// the emit order is defined in the Substrait plan - pick it up
const auto& emit_info = ext_common_opt->emit();
emit_order.reserve(emit_info.output_mapping_size());
for (const auto& emit_idx : emit_info.output_mapping()) {
emit_order.push_back(emit_idx);
}
+ return ProcessExtensionEmit(std::move(ext_decl_info), emit_order);
} else {
- // the emit order is the default output mapping [0..N)
- int emit_size = 0;
- for (const auto& input : ext_rel_inputs) {
- emit_size += input.output_schema->num_fields();
- }
- emit_order.reserve(emit_size);
- for (int emit_idx = 0; emit_idx < emit_size; emit_idx++) {
- emit_order.push_back(emit_idx);
- }
+ return ext_decl_info;
}
- return ProcessExtensionEmit(ext_decl_info, emit_order,
- *ext_rel_info.field_output_indices);
}
case substrait::Rel::RelTypeCase::kSet: {
diff --git a/cpp/src/arrow/engine/substrait/serde_test.cc b/cpp/src/arrow/engine/substrait/serde_test.cc
index b84bde8080..3a96618880 100644
--- a/cpp/src/arrow/engine/substrait/serde_test.cc
+++ b/cpp/src/arrow/engine/substrait/serde_test.cc
@@ -4356,7 +4356,7 @@ TEST(Substrait, PlanWithAsOfJoinExtension) {
"extension_multi": {
"common": {
"emit": {
- "outputMapping": [0, 1, 2, 5]
+ "outputMapping": [0, 1, 2, 3]
}
},
"inputs": [
@@ -5156,7 +5156,7 @@ TEST(Substrait, PlanWithExtension) {
"extension_multi": {
"common": {
"emit": {
- "outputMapping": [0, 1, 2, 5]
+ "outputMapping": [0, 1, 2, 3]
}
},
"inputs": [
@@ -5475,7 +5475,7 @@ TEST(Substrait, AsOfJoinDefaultEmit) {
}
}
},
- "names": ["time", "key", "value1", "time2", "key2", "value2"]
+ "names": ["time", "key", "value1", "value2"]
}
}],
"expectedTypeUrls": []
@@ -5507,12 +5507,10 @@ TEST(Substrait, AsOfJoinDefaultEmit) {
ASSERT_OK_AND_ASSIGN(auto buf, internal::SubstraitFromJSON("Plan", substrait_json));
auto out_schema = schema({field("time", int32()), field("key", int32()),
- field("value1", float64()), field("time2", int32()),
- field("key2", int32()), field("value2", float64())});
+ field("value1", float64()), field("value2", float64())});
auto expected_table = TableFromJSON(
- out_schema,
- {"[[2, 1, 1.1, 2, 1, 1.2], [4, 1, 2.1, 4, 1, 1.2], [6, 2, 3.1, 6, 2, 3.2]]"});
+ out_schema, {"[[2, 1, 1.1, 1.2], [4, 1, 2.1, 1.2], [6, 2, 3.1, 3.2]]"});
CheckRoundTripResult(std::move(expected_table), buf, {}, conversion_options);
}