You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2022/07/06 02:46:08 UTC

[GitHub] [arrow] vibhatha commented on a diff in pull request #13130: ARROW-15591: [C++] Add support for aggregation to the Substrait consumer

vibhatha commented on code in PR #13130:
URL: https://github.com/apache/arrow/pull/13130#discussion_r914365089


##########
cpp/src/arrow/engine/substrait/relation_internal.cc:
##########
@@ -309,6 +309,75 @@ Result<compute::Declaration> FromProto(const substrait::Rel& rel,
       join_dec.inputs.emplace_back(std::move(right));
       return std::move(join_dec);
     }
+    case substrait::Rel::RelTypeCase::kAggregate: {
+      const auto& aggregate = rel.aggregate();
+      RETURN_NOT_OK(CheckRelCommon(aggregate));
+
+      if (!aggregate.has_input()) {
+        return Status::Invalid("substrait::AggregateRel with no input relation");
+      }
+
+      ARROW_ASSIGN_OR_RAISE(auto input, FromProto(aggregate.input(), ext_set));
+
+      if (aggregate.groupings_size() > 1) {
+        return Status::Invalid("Grouping sets not supported.");
+      }
+      std::vector<FieldRef> keys;
+      auto group = aggregate.groupings(0);
+      keys.reserve(group.grouping_expressions_size());
+      for (int exp_id = 0; exp_id < group.grouping_expressions_size(); exp_id++) {
+        const auto& expr = FromProto(group.grouping_expressions(exp_id), ext_set);
+        const auto& field_ref = expr->field_ref();
+        if (field_ref) {
+          keys.emplace_back(std::move(*field_ref));
+        } else {
+          return Status::Invalid(
+              "Only accept a direct reference as the grouping expression for aggregates");
+        }
+      }
+      // denotes how many unique aggregation functions are used
+      // measure_id refers to the corresponding function in the
+      // extensionsion
+      int measure_size = aggregate.measures_size();
+      std::vector<compute::Aggregate> aggregates;
+      aggregates.reserve(measure_size);
+      for (int measure_id = 0; measure_id < measure_size; measure_id++) {
+        const auto& agg_measure = aggregate.measures(measure_id);
+        if (agg_measure.has_measure()) {
+          if (agg_measure.has_filter()) {
+            return Status::Invalid("Aggregate filters are not supported.");
+          }
+          const auto& agg_func = agg_measure.measure();
+          if (agg_func.args_size() != 1) {
+            return Status::Invalid("Aggregate function must be a unary function.");
+          }
+          int func_reference = agg_func.function_reference();
+          ARROW_ASSIGN_OR_RAISE(auto func_record, ext_set.DecodeFunction(func_reference));
+          // aggreagte function name
+          auto func_name = std::string(func_record.id.name);
+          // aggregate output column name
+          std::string agg_col_name =
+              func_name + "(" + std::to_string(func_reference) + ")";

Review Comment:
   You have a good point there, I just added it for some readability. We can ignore it too. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org