You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ic...@apache.org on 2023/04/20 12:45:17 UTC

[arrow] branch main updated: GH-35004: [C++] Remove RelationInfo (#35005)

This is an automated email from the ASF dual-hosted git repository.

icexelloss pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/main by this push:
     new d5866ec3fa GH-35004: [C++] Remove RelationInfo (#35005)
d5866ec3fa is described below

commit d5866ec3fa3cff4dbceb5d7838039485cff101c5
Author: rtpsw <rt...@hotmail.com>
AuthorDate: Thu Apr 20 15:45:03 2023 +0300

    GH-35004: [C++] Remove RelationInfo (#35005)
    
    See #35004
    * Closes: #35004
    
    Lead-authored-by: Yaron Gvili <rt...@hotmail.com>
    Co-authored-by: rtpsw <rt...@hotmail.com>
    Signed-off-by: Li Jin <ic...@gmail.com>
---
 cpp/src/arrow/acero/asof_join_node.cc              | 22 ++-----
 cpp/src/arrow/acero/asof_join_node.h               |  7 +-
 cpp/src/arrow/engine/substrait/options.cc          | 75 +++++++++-------------
 cpp/src/arrow/engine/substrait/options.h           |  8 +--
 cpp/src/arrow/engine/substrait/relation.h          | 16 -----
 .../arrow/engine/substrait/relation_internal.cc    | 52 +++++----------
 cpp/src/arrow/engine/substrait/serde_test.cc       | 12 ++--
 7 files changed, 61 insertions(+), 131 deletions(-)

diff --git a/cpp/src/arrow/acero/asof_join_node.cc b/cpp/src/arrow/acero/asof_join_node.cc
index 4d0f69fca5..e48d6c3bf9 100644
--- a/cpp/src/arrow/acero/asof_join_node.cc
+++ b/cpp/src/arrow/acero/asof_join_node.cc
@@ -1315,25 +1315,19 @@ class AsofJoinNode : public ExecNode {
 
   /// \brief Make the output schema of an as-of-join node
   ///
-  /// Optionally, also provides the field output indices for this node.
-  /// \see arrow::engine::RelationInfo
-  ///
   /// \param[in] input_schema the schema of each input to the node
   /// \param[in] indices_of_on_key the on-key index of each input to the node
   /// \param[in] indices_of_by_key the by-key indices of each input to the node
-  /// \param[out] field_output_indices the output index of each field
   static arrow::Result<std::shared_ptr<Schema>> MakeOutputSchema(
       const std::vector<std::shared_ptr<Schema>> input_schema,
       const std::vector<col_index_t>& indices_of_on_key,
-      const std::vector<std::vector<col_index_t>>& indices_of_by_key,
-      std::vector<int>* field_output_indices = nullptr) {
+      const std::vector<std::vector<col_index_t>>& indices_of_by_key) {
     std::vector<std::shared_ptr<arrow::Field>> fields;
 
     size_t n_by = indices_of_by_key.size() == 0 ? 0 : indices_of_by_key[0].size();
     const DataType* on_key_type = NULLPTR;
     std::vector<const DataType*> by_key_type(n_by, NULLPTR);
     // Take all non-key, non-time RHS fields
-    int output_field_idx = 0;
     for (size_t j = 0; j < input_schema.size(); ++j) {
       const auto& on_field_ix = indices_of_on_key[j];
       const auto& by_field_ix = indices_of_by_key[j];
@@ -1367,30 +1361,22 @@ class AsofJoinNode : public ExecNode {
 
       for (int i = 0; i < input_schema[j]->num_fields(); ++i) {
         const auto field = input_schema[j]->field(i);
-        bool as_output;        // true if the field appears as an output
-        int final_output_idx;  // the final output index for the field
+        bool as_output;  // true if the field appears as an output
         if (i == on_field_ix) {
           ARROW_RETURN_NOT_OK(is_valid_on_field(field));
           // Only add on field from the left table
           as_output = (j == 0);
-          final_output_idx = as_output ? output_field_idx++ : indices_of_on_key[0];
         } else if (std_has(by_field_ix, i)) {
           ARROW_RETURN_NOT_OK(is_valid_by_field(field));
           // Only add by field from the left table
           as_output = (j == 0);
-          final_output_idx = as_output ? output_field_idx++
-                                       : indices_of_by_key[0][std_index(by_field_ix, i)];
         } else {
           ARROW_RETURN_NOT_OK(is_valid_data_field(field));
           as_output = true;
-          final_output_idx = output_field_idx++;
         }
         if (as_output) {
           fields.push_back(field);
         }
-        if (field_output_indices) {
-          field_output_indices->push_back(final_output_idx);
-        }
       }
     }
     return std::make_shared<arrow::Schema>(fields);
@@ -1604,13 +1590,13 @@ namespace asofjoin {
 
 Result<std::shared_ptr<Schema>> MakeOutputSchema(
     const std::vector<std::shared_ptr<Schema>>& input_schema,
-    const std::vector<AsofJoinKeys>& input_keys, std::vector<int>* field_output_indices) {
+    const std::vector<AsofJoinKeys>& input_keys) {
   ARROW_ASSIGN_OR_RAISE(std::vector<col_index_t> indices_of_on_key,
                         AsofJoinNode::GetIndicesOfOnKey(input_schema, input_keys));
   ARROW_ASSIGN_OR_RAISE(std::vector<std::vector<col_index_t>> indices_of_by_key,
                         AsofJoinNode::GetIndicesOfByKey(input_schema, input_keys));
   return AsofJoinNode::MakeOutputSchema(input_schema, indices_of_on_key,
-                                        indices_of_by_key, field_output_indices);
+                                        indices_of_by_key);
 }
 
 }  // namespace asofjoin
diff --git a/cpp/src/arrow/acero/asof_join_node.h b/cpp/src/arrow/acero/asof_join_node.h
index b2ad2edc4a..6a0ce8fd38 100644
--- a/cpp/src/arrow/acero/asof_join_node.h
+++ b/cpp/src/arrow/acero/asof_join_node.h
@@ -30,16 +30,11 @@ using AsofJoinKeys = AsofJoinNodeOptions::Keys;
 
 /// \brief Make the output schema of an as-of-join node
 ///
-/// Optionally, also provides the field output indices for this node.
-/// \see arrow::engine::RelationInfo
-///
 /// \param[in] input_schema the schema of each input to the node
 /// \param[in] input_keys the key of each input to the node
-/// \param[out] field_output_indices the output index of each field
 ARROW_ACERO_EXPORT Result<std::shared_ptr<Schema>> MakeOutputSchema(
     const std::vector<std::shared_ptr<Schema>>& input_schema,
-    const std::vector<AsofJoinKeys>& input_keys,
-    std::vector<int>* field_output_indices = NULLPTR);
+    const std::vector<AsofJoinKeys>& input_keys);
 
 }  // namespace asofjoin
 }  // namespace acero
diff --git a/cpp/src/arrow/engine/substrait/options.cc b/cpp/src/arrow/engine/substrait/options.cc
index 0a1af6fce1..67fc4f329d 100644
--- a/cpp/src/arrow/engine/substrait/options.cc
+++ b/cpp/src/arrow/engine/substrait/options.cc
@@ -46,26 +46,26 @@ std::vector<acero::Declaration::Input> MakeDeclarationInputs(
 
 class BaseExtensionProvider : public ExtensionProvider {
  public:
-  Result<RelationInfo> MakeRel(const ConversionOptions& conv_opts,
-                               const std::vector<DeclarationInfo>& inputs,
-                               const ExtensionDetails& ext_details,
-                               const ExtensionSet& ext_set) override {
+  Result<DeclarationInfo> MakeRel(const ConversionOptions& conv_opts,
+                                  const std::vector<DeclarationInfo>& inputs,
+                                  const ExtensionDetails& ext_details,
+                                  const ExtensionSet& ext_set) override {
     auto details = dynamic_cast<const DefaultExtensionDetails&>(ext_details);
     return MakeRel(conv_opts, inputs, details.rel, ext_set);
   }
 
-  virtual Result<RelationInfo> MakeRel(const ConversionOptions& conv_opts,
-                                       const std::vector<DeclarationInfo>& inputs,
-                                       const google::protobuf::Any& rel,
-                                       const ExtensionSet& ext_set) = 0;
+  virtual Result<DeclarationInfo> MakeRel(const ConversionOptions& conv_opts,
+                                          const std::vector<DeclarationInfo>& inputs,
+                                          const google::protobuf::Any& rel,
+                                          const ExtensionSet& ext_set) = 0;
 };
 
 class DefaultExtensionProvider : public BaseExtensionProvider {
  public:
-  Result<RelationInfo> MakeRel(const ConversionOptions& conv_opts,
-                               const std::vector<DeclarationInfo>& inputs,
-                               const google::protobuf::Any& rel,
-                               const ExtensionSet& ext_set) override {
+  Result<DeclarationInfo> MakeRel(const ConversionOptions& conv_opts,
+                                  const std::vector<DeclarationInfo>& inputs,
+                                  const google::protobuf::Any& rel,
+                                  const ExtensionSet& ext_set) override {
     if (rel.Is<substrait_ext::AsOfJoinRel>()) {
       substrait_ext::AsOfJoinRel as_of_join_rel;
       rel.UnpackTo(&as_of_join_rel);
@@ -86,9 +86,9 @@ class DefaultExtensionProvider : public BaseExtensionProvider {
   }
 
  private:
-  Result<RelationInfo> MakeAsOfJoinRel(const std::vector<DeclarationInfo>& inputs,
-                                       const substrait_ext::AsOfJoinRel& as_of_join_rel,
-                                       const ExtensionSet& ext_set) {
+  Result<DeclarationInfo> MakeAsOfJoinRel(
+      const std::vector<DeclarationInfo>& inputs,
+      const substrait_ext::AsOfJoinRel& as_of_join_rel, const ExtensionSet& ext_set) {
     if (inputs.size() < 2) {
       return Status::Invalid("substrait_ext::AsOfJoinNode too few input tables: ",
                              inputs.size());
@@ -133,24 +133,21 @@ class DefaultExtensionProvider : public BaseExtensionProvider {
     for (size_t i = 0; i < inputs.size(); i++) {
       input_schema[i] = inputs[i].output_schema;
     }
-    std::vector<int> field_output_indices;
     ARROW_ASSIGN_OR_RAISE(auto schema,
-                          acero::asofjoin::MakeOutputSchema(input_schema, input_keys,
-                                                            &field_output_indices));
+                          acero::asofjoin::MakeOutputSchema(input_schema, input_keys));
     acero::AsofJoinNodeOptions asofjoin_node_opts{std::move(input_keys), tolerance};
 
     // declaration
     auto input_decls = MakeDeclarationInputs(inputs);
-    return RelationInfo{
-        {acero::Declaration("asofjoin", input_decls, std::move(asofjoin_node_opts)),
-         std::move(schema)},
-        std::move(field_output_indices)};
+    return DeclarationInfo{
+        acero::Declaration("asofjoin", input_decls, std::move(asofjoin_node_opts)),
+        std::move(schema)};
   }
 
-  Result<RelationInfo> MakeNamedTapRel(const ConversionOptions& conv_opts,
-                                       const std::vector<DeclarationInfo>& inputs,
-                                       const substrait_ext::NamedTapRel& named_tap_rel,
-                                       const ExtensionSet& ext_set) {
+  Result<DeclarationInfo> MakeNamedTapRel(const ConversionOptions& conv_opts,
+                                          const std::vector<DeclarationInfo>& inputs,
+                                          const substrait_ext::NamedTapRel& named_tap_rel,
+                                          const ExtensionSet& ext_set) {
     if (inputs.size() != 1) {
       return Status::Invalid(
           "substrait_ext::NamedTapRel requires a single input but got: ", inputs.size());
@@ -169,10 +166,10 @@ class DefaultExtensionProvider : public BaseExtensionProvider {
     ARROW_ASSIGN_OR_RAISE(
         auto decl, conv_opts.named_tap_provider(named_tap_rel.kind(), input_decls,
                                                 named_tap_rel.name(), renamed_schema));
-    return RelationInfo{{std::move(decl), std::move(renamed_schema)}, std::nullopt};
+    return DeclarationInfo{std::move(decl), std::move(renamed_schema)};
   }
 
-  Result<RelationInfo> MakeSegmentedAggregateRel(
+  Result<DeclarationInfo> MakeSegmentedAggregateRel(
       const ConversionOptions& conv_opts, const std::vector<DeclarationInfo>& inputs,
       const substrait_ext::SegmentedAggregateRel& seg_agg_rel,
       const ExtensionSet& ext_set) {
@@ -211,21 +208,13 @@ class DefaultExtensionProvider : public BaseExtensionProvider {
       aggregates.push_back(std::move(aggregate));
     }
 
-    ARROW_ASSIGN_OR_RAISE(
-        auto output_schema,
-        acero::aggregate::MakeOutputSchema(input_schema, keys, segment_keys, aggregates));
-
-    ARROW_ASSIGN_OR_RAISE(auto decl_info, internal::MakeAggregateDeclaration(
-                                              std::move(inputs[0].declaration),
-                                              output_schema, std::move(aggregates),
-                                              std::move(keys), std::move(segment_keys)));
-
-    size_t out_size = output_schema->num_fields();
-    std::vector<int> field_output_indices(out_size);
-    for (int i = 0; i < static_cast<int>(out_size); i++) {
-      field_output_indices[i] = i;
-    }
-    return RelationInfo{decl_info, std::move(field_output_indices)};
+    ARROW_ASSIGN_OR_RAISE(auto aggregate_schema,
+                          acero::aggregate::MakeOutputSchema(
+                              input_schema, keys, /*segment_keys=*/{}, aggregates));
+
+    return internal::MakeAggregateDeclaration(
+        std::move(inputs[0].declaration), std::move(aggregate_schema),
+        std::move(aggregates), std::move(keys), std::move(segment_keys));
   }
 };
 
diff --git a/cpp/src/arrow/engine/substrait/options.h b/cpp/src/arrow/engine/substrait/options.h
index e501914dd3..0d66c5eea4 100644
--- a/cpp/src/arrow/engine/substrait/options.h
+++ b/cpp/src/arrow/engine/substrait/options.h
@@ -81,10 +81,10 @@ class ARROW_ENGINE_EXPORT ExtensionDetails {
 class ARROW_ENGINE_EXPORT ExtensionProvider {
  public:
   virtual ~ExtensionProvider() = default;
-  virtual Result<RelationInfo> MakeRel(const ConversionOptions& conv_opts,
-                                       const std::vector<DeclarationInfo>& inputs,
-                                       const ExtensionDetails& ext_details,
-                                       const ExtensionSet& ext_set) = 0;
+  virtual Result<DeclarationInfo> MakeRel(const ConversionOptions& conv_opts,
+                                          const std::vector<DeclarationInfo>& inputs,
+                                          const ExtensionDetails& ext_details,
+                                          const ExtensionSet& ext_set) = 0;
 };
 
 /// \brief Get the default extension provider
diff --git a/cpp/src/arrow/engine/substrait/relation.h b/cpp/src/arrow/engine/substrait/relation.h
index a7e3605bf6..0be4e03bb3 100644
--- a/cpp/src/arrow/engine/substrait/relation.h
+++ b/cpp/src/arrow/engine/substrait/relation.h
@@ -34,22 +34,6 @@ struct ARROW_ENGINE_EXPORT DeclarationInfo {
   std::shared_ptr<Schema> output_schema;
 };
 
-/// Information resulting from converting a Substrait relation.
-///
-/// RelationInfo adds the "output indices" field for the extension to define how the
-/// fields should be mapped to get the standard indices expected by Substrait.
-struct ARROW_ENGINE_EXPORT RelationInfo {
-  /// The execution information produced thus far.
-  DeclarationInfo decl_info;
-  /// A vector of indices, one per input field per input in order, each index referring
-  /// to the corresponding field within the output schema, if it is in the output, or -1
-  /// otherwise. Each location in this vector is a field input index. This vector is
-  /// useful for translating selected field input indices (often from an output mapping in
-  /// a Substrait plan) of a join-type relation to their locations in the output schema of
-  /// the relation. This vector is undefined if the translation is unsupported.
-  std::optional<std::vector<int>> field_output_indices;
-};
-
 /// Information resulting from converting a Substrait plan
 struct ARROW_ENGINE_EXPORT PlanInfo {
   /// The root declaration.
diff --git a/cpp/src/arrow/engine/substrait/relation_internal.cc b/cpp/src/arrow/engine/substrait/relation_internal.cc
index 0336bb3dd1..c19ba18a38 100644
--- a/cpp/src/arrow/engine/substrait/relation_internal.cc
+++ b/cpp/src/arrow/engine/substrait/relation_internal.cc
@@ -162,9 +162,8 @@ Result<DeclarationInfo> ProcessEmit(const substrait::ProjectRel& rel,
                             no_emit_declr, schema);
 }
 
-Result<DeclarationInfo> ProcessExtensionEmit(
-    const DeclarationInfo& no_emit_declr, const std::vector<int>& emit_order,
-    const std::vector<int>& field_output_indices) {
+Result<DeclarationInfo> ProcessExtensionEmit(const DeclarationInfo& no_emit_declr,
+                                             const std::vector<int>& emit_order) {
   const std::shared_ptr<Schema>& input_schema = no_emit_declr.output_schema;
   std::vector<compute::Expression> proj_field_refs;
   proj_field_refs.reserve(emit_order.size());
@@ -172,15 +171,11 @@ Result<DeclarationInfo> ProcessExtensionEmit(
   emit_fields.reserve(emit_order.size());
 
   for (int emit_idx : emit_order) {
-    if (emit_idx < 0 || static_cast<size_t>(emit_idx) >= field_output_indices.size()) {
+    if (emit_idx < 0 || emit_idx >= input_schema->num_fields()) {
       return Status::Invalid("Out of bounds emit index ", emit_idx);
     }
-    int field_idx = field_output_indices[emit_idx];
-    if (field_idx < 0) {
-      return Status::Invalid("Non-output emit index ", emit_idx);
-    }
-    proj_field_refs.push_back(compute::field_ref(FieldRef(field_idx)));
-    emit_fields.push_back(input_schema->field(field_idx));
+    proj_field_refs.push_back(compute::field_ref(FieldRef(emit_idx)));
+    emit_fields.push_back(input_schema->field(emit_idx));
   }
 
   std::shared_ptr<Schema> emit_schema = schema(std::move(emit_fields));
@@ -192,13 +187,13 @@ Result<DeclarationInfo> ProcessExtensionEmit(
       std::move(emit_schema)};
 }
 
-Result<RelationInfo> GetExtensionRelationInfo(const substrait::Rel& rel,
-                                              const ExtensionSet& ext_set,
-                                              const ConversionOptions& conv_opts,
-                                              std::vector<DeclarationInfo>* inputs_arg) {
+Result<DeclarationInfo> GetExtensionInfo(const substrait::Rel& rel,
+                                         const ExtensionSet& ext_set,
+                                         const ConversionOptions& conv_opts,
+                                         std::vector<DeclarationInfo>* inputs_arg) {
   if (inputs_arg == nullptr) {
     std::vector<DeclarationInfo> inputs_tmp;
-    return GetExtensionRelationInfo(rel, ext_set, conv_opts, &inputs_tmp);
+    return GetExtensionInfo(rel, ext_set, conv_opts, &inputs_tmp);
   }
   std::vector<DeclarationInfo>& inputs = *inputs_arg;
   inputs.clear();
@@ -788,44 +783,27 @@ Result<DeclarationInfo> FromProto(const substrait::Rel& rel, const ExtensionSet&
     case substrait::Rel::RelTypeCase::kExtensionMulti: {
       std::vector<DeclarationInfo> ext_rel_inputs;
       ARROW_ASSIGN_OR_RAISE(
-          auto ext_rel_info,
-          GetExtensionRelationInfo(rel, ext_set, conversion_options, &ext_rel_inputs));
-      const auto& ext_decl_info = ext_rel_info.decl_info;
+          auto ext_decl_info,
+          GetExtensionInfo(rel, ext_set, conversion_options, &ext_rel_inputs));
       auto ext_common_opt = GetExtensionRelCommon(rel);
       bool has_emit = ext_common_opt && ext_common_opt->emit_kind_case() ==
                                             substrait::RelCommon::EmitKindCase::kEmit;
-      if (!ext_rel_info.field_output_indices) {
-        if (!has_emit) {
-          return ext_decl_info;
-        }
-        return Status::NotImplemented("Emit not supported by ",
-                                      ext_decl_info.declaration.factory_name);
-      }
       // Set up the emit order - an ordered list of indices that specifies an output
       // mapping as expected by Substrait. This is a sublist of [0..N), where N is the
       // total number of input fields across all inputs of the relation, that selects
       // from these input fields.
-      std::vector<int> emit_order;
       if (has_emit) {
+        std::vector<int> emit_order;
         // the emit order is defined in the Substrait plan - pick it up
         const auto& emit_info = ext_common_opt->emit();
         emit_order.reserve(emit_info.output_mapping_size());
         for (const auto& emit_idx : emit_info.output_mapping()) {
           emit_order.push_back(emit_idx);
         }
+        return ProcessExtensionEmit(std::move(ext_decl_info), emit_order);
       } else {
-        // the emit order is the default output mapping [0..N)
-        int emit_size = 0;
-        for (const auto& input : ext_rel_inputs) {
-          emit_size += input.output_schema->num_fields();
-        }
-        emit_order.reserve(emit_size);
-        for (int emit_idx = 0; emit_idx < emit_size; emit_idx++) {
-          emit_order.push_back(emit_idx);
-        }
+        return ext_decl_info;
       }
-      return ProcessExtensionEmit(ext_decl_info, emit_order,
-                                  *ext_rel_info.field_output_indices);
     }
 
     case substrait::Rel::RelTypeCase::kSet: {
diff --git a/cpp/src/arrow/engine/substrait/serde_test.cc b/cpp/src/arrow/engine/substrait/serde_test.cc
index b84bde8080..3a96618880 100644
--- a/cpp/src/arrow/engine/substrait/serde_test.cc
+++ b/cpp/src/arrow/engine/substrait/serde_test.cc
@@ -4356,7 +4356,7 @@ TEST(Substrait, PlanWithAsOfJoinExtension) {
           "extension_multi": {
             "common": {
               "emit": {
-                "outputMapping": [0, 1, 2, 5]
+                "outputMapping": [0, 1, 2, 3]
               }
             },
             "inputs": [
@@ -5156,7 +5156,7 @@ TEST(Substrait, PlanWithExtension) {
           "extension_multi": {
             "common": {
               "emit": {
-                "outputMapping": [0, 1, 2, 5]
+                "outputMapping": [0, 1, 2, 3]
               }
             },
             "inputs": [
@@ -5475,7 +5475,7 @@ TEST(Substrait, AsOfJoinDefaultEmit) {
             }
           }
         },
-        "names": ["time", "key", "value1", "time2", "key2", "value2"]
+        "names": ["time", "key", "value1", "value2"]
       }
     }],
     "expectedTypeUrls": []
@@ -5507,12 +5507,10 @@ TEST(Substrait, AsOfJoinDefaultEmit) {
   ASSERT_OK_AND_ASSIGN(auto buf, internal::SubstraitFromJSON("Plan", substrait_json));
 
   auto out_schema = schema({field("time", int32()), field("key", int32()),
-                            field("value1", float64()), field("time2", int32()),
-                            field("key2", int32()), field("value2", float64())});
+                            field("value1", float64()), field("value2", float64())});
 
   auto expected_table = TableFromJSON(
-      out_schema,
-      {"[[2, 1, 1.1, 2, 1, 1.2], [4, 1, 2.1, 4, 1, 1.2], [6, 2, 3.1, 6, 2, 3.2]]"});
+      out_schema, {"[[2, 1, 1.1, 1.2], [4, 1, 2.1, 1.2], [6, 2, 3.1, 3.2]]"});
   CheckRoundTripResult(std::move(expected_table), buf, {}, conversion_options);
 }