You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2022/05/12 13:57:51 UTC

[GitHub] [arrow] vibhatha opened a new pull request, #13130: ARROW-15591: [C++] Add support for aggregation to the Substrait consumer [WIP]

vibhatha opened a new pull request, #13130:
URL: https://github.com/apache/arrow/pull/13130

   Draft PR: The feature is not yet fully implemented. Working on figuring out a few missing pieces and ways to implement it clearly. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] westonpace commented on pull request #13130: ARROW-15591: [C++] Add support for aggregation to the Substrait consumer

Posted by GitBox <gi...@apache.org>.
westonpace commented on PR #13130:
URL: https://github.com/apache/arrow/pull/13130#issuecomment-1171742125

   So I studied this a bit more and I also had a very poor understanding of the aggregate rel in the past.  Substrait's aggregate node is considerably more flexible than Acero's current implementation.  There will be some limitations.
   
    * Substrait's aggregate node supports "grouping sets" which is the ability to group by multiple sets of keys at the same time.  For example: https://www.db-fiddle.com/f/4QQnTGkyENZtxBKrWtimAT/0  Acero does not have support for grouping sets.
      * If `AggregateRel::groupings` has more than 1 element then we should reject the plan.
      * If someone needed to polyfill this behavior they could do so with a `UNION ALL` I believe (although I don't think we yet support the union node)
    * Substrait's grouping keys can be arbitrary expressions.  Acero's grouping keys are `FieldRef`.
      * All instances of `AggregateRel::groupings[0]::grouping_expressions` must be direct references or else we reject the plan.
      * If someone needed to polyfill this behavior they could do so by first running a projection so that the expressions become fields to reference.
    * Substrait's measures support filtering.  Acero does not support this.
      * If any `AggregateRel::measures::filter` is specified then we should reject the plan.
      * I do not believe this can be fixed with a straightforward polyfill, we should add a JIRA.
    * Substrait's measures are potentially non-unary functions
      * If any `AggregateRel::measures::measure::arguments` has size != 1 then we should reject the plan.
      * I do not believe this can be fixed with a straightforward polyfill, however, there are not yet any standard aggregate functions defined which are non-unary.
    * Substrait's measures take in arguments as expressions
      * If any `AggregateRel::measures::measure::arguments` is not a direct reference then we should reject the plan.
      * If someone needed to polyfill this behavior they could do so by first running a projection so that the expressions become fields to reference.
   
   So, the mapping between Acero's AggregateNodeOptions and Substrait's AggregateRel should be:
   
   *  `AggregateNodeOptions::keys[N] == AggregateRel::groupings[0]::grouping_expressions[N]::selection::struct_field::field`
   * `AggregateNodeOptions::aggregates[N]::function == AggregateRel::measures[N]::measure::function_reference`
   * `AggregateNodeOptions::aggregates[N]::options == IGNORE FOR NOW`
   * `AggregateNodeOptions::aggregates[N]::target == AggregateRel::measures[N]::measure::arguments[0]::selection::struct_field::field`
   * `AggregateNodeOptions::aggregates[N]::name == ALWAYS EMPTY`


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] vibhatha commented on pull request #13130: ARROW-15591: [C++] Add support for aggregation to the Substrait consumer

Posted by GitBox <gi...@apache.org>.
vibhatha commented on PR #13130:
URL: https://github.com/apache/arrow/pull/13130#issuecomment-1194892969

   > Thanks for sticking with this. Apologies for the delay in reviewing.
   
   No worries! Thank you for the support, glad this is in. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] vibhatha commented on pull request #13130: ARROW-15591: [C++] Add support for aggregation to the Substrait consumer

Posted by GitBox <gi...@apache.org>.
vibhatha commented on PR #13130:
URL: https://github.com/apache/arrow/pull/13130#issuecomment-1187701516

   @westonpace this looks okay now. WDYT?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] westonpace commented on a diff in pull request #13130: ARROW-15591: [C++] Add support for aggregation to the Substrait consumer

Posted by GitBox <gi...@apache.org>.
westonpace commented on code in PR #13130:
URL: https://github.com/apache/arrow/pull/13130#discussion_r914295005


##########
cpp/src/arrow/engine/substrait/relation_internal.cc:
##########
@@ -309,6 +309,75 @@ Result<compute::Declaration> FromProto(const substrait::Rel& rel,
       join_dec.inputs.emplace_back(std::move(right));
       return std::move(join_dec);
     }
+    case substrait::Rel::RelTypeCase::kAggregate: {
+      const auto& aggregate = rel.aggregate();
+      RETURN_NOT_OK(CheckRelCommon(aggregate));
+
+      if (!aggregate.has_input()) {
+        return Status::Invalid("substrait::AggregateRel with no input relation");
+      }
+
+      ARROW_ASSIGN_OR_RAISE(auto input, FromProto(aggregate.input(), ext_set));
+
+      if (aggregate.groupings_size() > 1) {
+        return Status::Invalid("Grouping sets not supported.");
+      }
+      std::vector<FieldRef> keys;
+      auto group = aggregate.groupings(0);
+      keys.reserve(group.grouping_expressions_size());
+      for (int exp_id = 0; exp_id < group.grouping_expressions_size(); exp_id++) {
+        const auto& expr = FromProto(group.grouping_expressions(exp_id), ext_set);

Review Comment:
   Use `ARROW_ASSIGN_OR_RAISE` here instead of using `->` later.  Otherwise, if something goes wrong converting the expression, it will result in an abort (which we don't want) instead of an invalid status (which we would want).



##########
cpp/src/arrow/engine/substrait/relation_internal.cc:
##########
@@ -309,6 +309,75 @@ Result<compute::Declaration> FromProto(const substrait::Rel& rel,
       join_dec.inputs.emplace_back(std::move(right));
       return std::move(join_dec);
     }
+    case substrait::Rel::RelTypeCase::kAggregate: {
+      const auto& aggregate = rel.aggregate();
+      RETURN_NOT_OK(CheckRelCommon(aggregate));
+
+      if (!aggregate.has_input()) {
+        return Status::Invalid("substrait::AggregateRel with no input relation");
+      }
+
+      ARROW_ASSIGN_OR_RAISE(auto input, FromProto(aggregate.input(), ext_set));
+
+      if (aggregate.groupings_size() > 1) {
+        return Status::Invalid("Grouping sets not supported.");
+      }
+      std::vector<FieldRef> keys;
+      auto group = aggregate.groupings(0);
+      keys.reserve(group.grouping_expressions_size());
+      for (int exp_id = 0; exp_id < group.grouping_expressions_size(); exp_id++) {
+        const auto& expr = FromProto(group.grouping_expressions(exp_id), ext_set);
+        const auto& field_ref = expr->field_ref();

Review Comment:
   ```suggestion
           const auto* field_ref = expr->field_ref();
   ```



##########
cpp/src/arrow/engine/substrait/relation_internal.cc:
##########
@@ -309,6 +309,75 @@ Result<compute::Declaration> FromProto(const substrait::Rel& rel,
       join_dec.inputs.emplace_back(std::move(right));
       return std::move(join_dec);
     }
+    case substrait::Rel::RelTypeCase::kAggregate: {
+      const auto& aggregate = rel.aggregate();
+      RETURN_NOT_OK(CheckRelCommon(aggregate));
+
+      if (!aggregate.has_input()) {
+        return Status::Invalid("substrait::AggregateRel with no input relation");
+      }
+
+      ARROW_ASSIGN_OR_RAISE(auto input, FromProto(aggregate.input(), ext_set));
+
+      if (aggregate.groupings_size() > 1) {
+        return Status::Invalid("Grouping sets not supported.");

Review Comment:
   ```suggestion
           return Status::NotImplemented("Grouping sets not supported.");
   ```



##########
cpp/src/arrow/engine/substrait/relation_internal.cc:
##########
@@ -309,6 +309,75 @@ Result<compute::Declaration> FromProto(const substrait::Rel& rel,
       join_dec.inputs.emplace_back(std::move(right));
       return std::move(join_dec);
     }
+    case substrait::Rel::RelTypeCase::kAggregate: {
+      const auto& aggregate = rel.aggregate();
+      RETURN_NOT_OK(CheckRelCommon(aggregate));
+
+      if (!aggregate.has_input()) {
+        return Status::Invalid("substrait::AggregateRel with no input relation");
+      }
+
+      ARROW_ASSIGN_OR_RAISE(auto input, FromProto(aggregate.input(), ext_set));
+
+      if (aggregate.groupings_size() > 1) {
+        return Status::Invalid("Grouping sets not supported.");
+      }
+      std::vector<FieldRef> keys;
+      auto group = aggregate.groupings(0);
+      keys.reserve(group.grouping_expressions_size());
+      for (int exp_id = 0; exp_id < group.grouping_expressions_size(); exp_id++) {
+        const auto& expr = FromProto(group.grouping_expressions(exp_id), ext_set);
+        const auto& field_ref = expr->field_ref();
+        if (field_ref) {
+          keys.emplace_back(std::move(*field_ref));
+        } else {
+          return Status::Invalid(
+              "Only accept a direct reference as the grouping expression for aggregates");
+        }
+      }
+      // denotes how many unique aggregation functions are used
+      // measure_id refers to the corresponding function in the
+      // extensionsion
+      int measure_size = aggregate.measures_size();
+      std::vector<compute::Aggregate> aggregates;
+      aggregates.reserve(measure_size);
+      for (int measure_id = 0; measure_id < measure_size; measure_id++) {
+        const auto& agg_measure = aggregate.measures(measure_id);
+        if (agg_measure.has_measure()) {
+          if (agg_measure.has_filter()) {
+            return Status::Invalid("Aggregate filters are not supported.");

Review Comment:
   ```suggestion
               return Status::NotImplemented("Aggregate filters are not supported.");
   ```



##########
cpp/src/arrow/engine/substrait/relation_internal.cc:
##########
@@ -309,6 +309,75 @@ Result<compute::Declaration> FromProto(const substrait::Rel& rel,
       join_dec.inputs.emplace_back(std::move(right));
       return std::move(join_dec);
     }
+    case substrait::Rel::RelTypeCase::kAggregate: {
+      const auto& aggregate = rel.aggregate();
+      RETURN_NOT_OK(CheckRelCommon(aggregate));
+
+      if (!aggregate.has_input()) {
+        return Status::Invalid("substrait::AggregateRel with no input relation");
+      }
+
+      ARROW_ASSIGN_OR_RAISE(auto input, FromProto(aggregate.input(), ext_set));
+
+      if (aggregate.groupings_size() > 1) {
+        return Status::Invalid("Grouping sets not supported.");
+      }
+      std::vector<FieldRef> keys;
+      auto group = aggregate.groupings(0);
+      keys.reserve(group.grouping_expressions_size());
+      for (int exp_id = 0; exp_id < group.grouping_expressions_size(); exp_id++) {
+        const auto& expr = FromProto(group.grouping_expressions(exp_id), ext_set);
+        const auto& field_ref = expr->field_ref();
+        if (field_ref) {
+          keys.emplace_back(std::move(*field_ref));
+        } else {
+          return Status::Invalid(
+              "Only accept a direct reference as the grouping expression for aggregates");
+        }
+      }
+      // denotes how many unique aggregation functions are used
+      // measure_id refers to the corresponding function in the
+      // extensionsion
+      int measure_size = aggregate.measures_size();
+      std::vector<compute::Aggregate> aggregates;
+      aggregates.reserve(measure_size);
+      for (int measure_id = 0; measure_id < measure_size; measure_id++) {
+        const auto& agg_measure = aggregate.measures(measure_id);
+        if (agg_measure.has_measure()) {
+          if (agg_measure.has_filter()) {
+            return Status::Invalid("Aggregate filters are not supported.");
+          }
+          const auto& agg_func = agg_measure.measure();
+          if (agg_func.args_size() != 1) {
+            return Status::Invalid("Aggregate function must be a unary function.");
+          }
+          int func_reference = agg_func.function_reference();
+          ARROW_ASSIGN_OR_RAISE(auto func_record, ext_set.DecodeFunction(func_reference));
+          // aggreagte function name
+          auto func_name = std::string(func_record.id.name);
+          // aggregate output column name
+          std::string agg_col_name =
+              func_name + "(" + std::to_string(func_reference) + ")";

Review Comment:
   I'm not sure there is any advantage to giving these columns a name vs just leaving them as empty strings



##########
cpp/src/arrow/engine/substrait/extension_set.cc:
##########
@@ -443,8 +443,9 @@ struct DefaultExtensionIdRegistry : ExtensionIdRegistryImpl {
     // ARROW-15535.
     for (util::string_view name : {
              "add",
-             "equal",
-             "is_not_distinct_from",
+             "equal",                 // added to support join operator

Review Comment:
   The comments are probably not needed as this will change significantly in the function mapping PR but they don't hurt.



##########
cpp/src/arrow/engine/substrait/relation_internal.cc:
##########
@@ -309,6 +309,75 @@ Result<compute::Declaration> FromProto(const substrait::Rel& rel,
       join_dec.inputs.emplace_back(std::move(right));
       return std::move(join_dec);
     }
+    case substrait::Rel::RelTypeCase::kAggregate: {
+      const auto& aggregate = rel.aggregate();
+      RETURN_NOT_OK(CheckRelCommon(aggregate));
+
+      if (!aggregate.has_input()) {
+        return Status::Invalid("substrait::AggregateRel with no input relation");
+      }
+
+      ARROW_ASSIGN_OR_RAISE(auto input, FromProto(aggregate.input(), ext_set));
+
+      if (aggregate.groupings_size() > 1) {
+        return Status::Invalid("Grouping sets not supported.");
+      }
+      std::vector<FieldRef> keys;
+      auto group = aggregate.groupings(0);
+      keys.reserve(group.grouping_expressions_size());
+      for (int exp_id = 0; exp_id < group.grouping_expressions_size(); exp_id++) {
+        const auto& expr = FromProto(group.grouping_expressions(exp_id), ext_set);
+        const auto& field_ref = expr->field_ref();
+        if (field_ref) {
+          keys.emplace_back(std::move(*field_ref));
+        } else {
+          return Status::Invalid(
+              "Only accept a direct reference as the grouping expression for aggregates");
+        }
+      }
+      // denotes how many unique aggregation functions are used
+      // measure_id refers to the corresponding function in the
+      // extensionsion

Review Comment:
   ```suggestion
   ```
   I'm not sure this comment is helpful.  Maybe something simple like "A substrait measure is equivalent to an Acero aggregate" but even that seems excessive



##########
cpp/src/arrow/engine/substrait/relation_internal.cc:
##########
@@ -309,6 +309,75 @@ Result<compute::Declaration> FromProto(const substrait::Rel& rel,
       join_dec.inputs.emplace_back(std::move(right));
       return std::move(join_dec);
     }
+    case substrait::Rel::RelTypeCase::kAggregate: {
+      const auto& aggregate = rel.aggregate();
+      RETURN_NOT_OK(CheckRelCommon(aggregate));
+
+      if (!aggregate.has_input()) {
+        return Status::Invalid("substrait::AggregateRel with no input relation");
+      }
+
+      ARROW_ASSIGN_OR_RAISE(auto input, FromProto(aggregate.input(), ext_set));
+
+      if (aggregate.groupings_size() > 1) {
+        return Status::Invalid("Grouping sets not supported.");
+      }
+      std::vector<FieldRef> keys;
+      auto group = aggregate.groupings(0);
+      keys.reserve(group.grouping_expressions_size());
+      for (int exp_id = 0; exp_id < group.grouping_expressions_size(); exp_id++) {
+        const auto& expr = FromProto(group.grouping_expressions(exp_id), ext_set);
+        const auto& field_ref = expr->field_ref();
+        if (field_ref) {
+          keys.emplace_back(std::move(*field_ref));
+        } else {
+          return Status::Invalid(
+              "Only accept a direct reference as the grouping expression for aggregates");

Review Comment:
   ```suggestion
                 "The grouping expression for an aggregate must be a direct reference.");
   ```
   Minor nit: wording



##########
cpp/src/arrow/engine/substrait/relation_internal.cc:
##########
@@ -309,6 +309,75 @@ Result<compute::Declaration> FromProto(const substrait::Rel& rel,
       join_dec.inputs.emplace_back(std::move(right));
       return std::move(join_dec);
     }
+    case substrait::Rel::RelTypeCase::kAggregate: {
+      const auto& aggregate = rel.aggregate();
+      RETURN_NOT_OK(CheckRelCommon(aggregate));
+
+      if (!aggregate.has_input()) {
+        return Status::Invalid("substrait::AggregateRel with no input relation");
+      }
+
+      ARROW_ASSIGN_OR_RAISE(auto input, FromProto(aggregate.input(), ext_set));
+
+      if (aggregate.groupings_size() > 1) {
+        return Status::Invalid("Grouping sets not supported.");
+      }
+      std::vector<FieldRef> keys;
+      auto group = aggregate.groupings(0);
+      keys.reserve(group.grouping_expressions_size());
+      for (int exp_id = 0; exp_id < group.grouping_expressions_size(); exp_id++) {
+        const auto& expr = FromProto(group.grouping_expressions(exp_id), ext_set);
+        const auto& field_ref = expr->field_ref();
+        if (field_ref) {
+          keys.emplace_back(std::move(*field_ref));
+        } else {
+          return Status::Invalid(
+              "Only accept a direct reference as the grouping expression for aggregates");
+        }
+      }
+      // denotes how many unique aggregation functions are used
+      // measure_id refers to the corresponding function in the
+      // extensionsion
+      int measure_size = aggregate.measures_size();
+      std::vector<compute::Aggregate> aggregates;
+      aggregates.reserve(measure_size);
+      for (int measure_id = 0; measure_id < measure_size; measure_id++) {
+        const auto& agg_measure = aggregate.measures(measure_id);
+        if (agg_measure.has_measure()) {
+          if (agg_measure.has_filter()) {
+            return Status::Invalid("Aggregate filters are not supported.");
+          }
+          const auto& agg_func = agg_measure.measure();
+          if (agg_func.args_size() != 1) {
+            return Status::Invalid("Aggregate function must be a unary function.");

Review Comment:
   ```suggestion
               return Status::NotImplemented("Aggregate function must be a unary function.");
   ```



##########
cpp/src/arrow/engine/substrait/relation_internal.cc:
##########
@@ -309,6 +309,75 @@ Result<compute::Declaration> FromProto(const substrait::Rel& rel,
       join_dec.inputs.emplace_back(std::move(right));
       return std::move(join_dec);
     }
+    case substrait::Rel::RelTypeCase::kAggregate: {
+      const auto& aggregate = rel.aggregate();
+      RETURN_NOT_OK(CheckRelCommon(aggregate));
+
+      if (!aggregate.has_input()) {
+        return Status::Invalid("substrait::AggregateRel with no input relation");
+      }
+
+      ARROW_ASSIGN_OR_RAISE(auto input, FromProto(aggregate.input(), ext_set));
+
+      if (aggregate.groupings_size() > 1) {
+        return Status::Invalid("Grouping sets not supported.");
+      }
+      std::vector<FieldRef> keys;
+      auto group = aggregate.groupings(0);
+      keys.reserve(group.grouping_expressions_size());
+      for (int exp_id = 0; exp_id < group.grouping_expressions_size(); exp_id++) {
+        const auto& expr = FromProto(group.grouping_expressions(exp_id), ext_set);
+        const auto& field_ref = expr->field_ref();
+        if (field_ref) {
+          keys.emplace_back(std::move(*field_ref));
+        } else {
+          return Status::Invalid(
+              "Only accept a direct reference as the grouping expression for aggregates");
+        }
+      }
+      // denotes how many unique aggregation functions are used
+      // measure_id refers to the corresponding function in the
+      // extensionsion
+      int measure_size = aggregate.measures_size();
+      std::vector<compute::Aggregate> aggregates;
+      aggregates.reserve(measure_size);
+      for (int measure_id = 0; measure_id < measure_size; measure_id++) {
+        const auto& agg_measure = aggregate.measures(measure_id);
+        if (agg_measure.has_measure()) {
+          if (agg_measure.has_filter()) {
+            return Status::Invalid("Aggregate filters are not supported.");
+          }
+          const auto& agg_func = agg_measure.measure();
+          if (agg_func.args_size() != 1) {
+            return Status::Invalid("Aggregate function must be a unary function.");
+          }
+          int func_reference = agg_func.function_reference();
+          ARROW_ASSIGN_OR_RAISE(auto func_record, ext_set.DecodeFunction(func_reference));
+          // aggreagte function name
+          auto func_name = std::string(func_record.id.name);
+          // aggregate output column name
+          std::string agg_col_name =
+              func_name + "(" + std::to_string(func_reference) + ")";
+          // aggregate target
+          ARROW_ASSIGN_OR_RAISE(auto field_expr, FromProto(agg_func.args(0), ext_set));
+          auto target = field_expr.field_ref();
+          if (!target) {
+            return Status::Invalid(
+                "Only accept a direct reference as the aggregate expression.");
+          }
+          // TODO: Implement function options in Substrait

Review Comment:
   We shouldn't have a TODO without some kind of JIRA reference.  I'm not sure we need a TODO here though.  Maybe just get rid of this line and keep the next line about setting function options to nullptr.



##########
cpp/src/arrow/engine/substrait/relation_internal.cc:
##########
@@ -309,6 +309,75 @@ Result<compute::Declaration> FromProto(const substrait::Rel& rel,
       join_dec.inputs.emplace_back(std::move(right));
       return std::move(join_dec);
     }
+    case substrait::Rel::RelTypeCase::kAggregate: {
+      const auto& aggregate = rel.aggregate();
+      RETURN_NOT_OK(CheckRelCommon(aggregate));
+
+      if (!aggregate.has_input()) {
+        return Status::Invalid("substrait::AggregateRel with no input relation");
+      }
+
+      ARROW_ASSIGN_OR_RAISE(auto input, FromProto(aggregate.input(), ext_set));
+
+      if (aggregate.groupings_size() > 1) {
+        return Status::Invalid("Grouping sets not supported.");
+      }
+      std::vector<FieldRef> keys;
+      auto group = aggregate.groupings(0);
+      keys.reserve(group.grouping_expressions_size());
+      for (int exp_id = 0; exp_id < group.grouping_expressions_size(); exp_id++) {
+        const auto& expr = FromProto(group.grouping_expressions(exp_id), ext_set);
+        const auto& field_ref = expr->field_ref();
+        if (field_ref) {
+          keys.emplace_back(std::move(*field_ref));
+        } else {
+          return Status::Invalid(
+              "Only accept a direct reference as the grouping expression for aggregates");
+        }
+      }
+      // denotes how many unique aggregation functions are used
+      // measure_id refers to the corresponding function in the
+      // extensionsion
+      int measure_size = aggregate.measures_size();
+      std::vector<compute::Aggregate> aggregates;
+      aggregates.reserve(measure_size);
+      for (int measure_id = 0; measure_id < measure_size; measure_id++) {
+        const auto& agg_measure = aggregate.measures(measure_id);
+        if (agg_measure.has_measure()) {
+          if (agg_measure.has_filter()) {
+            return Status::Invalid("Aggregate filters are not supported.");
+          }
+          const auto& agg_func = agg_measure.measure();
+          if (agg_func.args_size() != 1) {
+            return Status::Invalid("Aggregate function must be a unary function.");
+          }
+          int func_reference = agg_func.function_reference();
+          ARROW_ASSIGN_OR_RAISE(auto func_record, ext_set.DecodeFunction(func_reference));
+          // aggreagte function name
+          auto func_name = std::string(func_record.id.name);
+          // aggregate output column name
+          std::string agg_col_name =
+              func_name + "(" + std::to_string(func_reference) + ")";
+          // aggregate target
+          ARROW_ASSIGN_OR_RAISE(auto field_expr, FromProto(agg_func.args(0), ext_set));
+          auto target = field_expr.field_ref();
+          if (!target) {
+            return Status::Invalid(
+                "Only accept a direct reference as the aggregate expression.");

Review Comment:
   ```suggestion
                   "The input expression to an aggregate function must be a direct reference.");
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] westonpace commented on a diff in pull request #13130: ARROW-15591: [C++] Add support for aggregation to the Substrait consumer

Posted by GitBox <gi...@apache.org>.
westonpace commented on code in PR #13130:
URL: https://github.com/apache/arrow/pull/13130#discussion_r918293275


##########
cpp/src/arrow/engine/substrait/relation_internal.cc:
##########
@@ -308,6 +308,71 @@ Result<compute::Declaration> FromProto(const substrait::Rel& rel,
       join_dec.inputs.emplace_back(std::move(right));
       return std::move(join_dec);
     }
+    case substrait::Rel::RelTypeCase::kAggregate: {
+      const auto& aggregate = rel.aggregate();
+      RETURN_NOT_OK(CheckRelCommon(aggregate));
+
+      if (!aggregate.has_input()) {
+        return Status::Invalid("substrait::AggregateRel with no input relation");
+      }
+
+      ARROW_ASSIGN_OR_RAISE(auto input, FromProto(aggregate.input(), ext_set));
+
+      if (aggregate.groupings_size() > 1) {
+        return Status::NotImplemented("Grouping sets not supported.");
+      }
+      std::vector<FieldRef> keys;
+      auto group = aggregate.groupings(0);
+      keys.reserve(group.grouping_expressions_size());
+      for (int exp_id = 0; exp_id < group.grouping_expressions_size(); exp_id++) {
+        ARROW_ASSIGN_OR_RAISE(auto expr,
+                              FromProto(group.grouping_expressions(exp_id), ext_set));
+        const auto* field_ref = expr.field_ref();
+        if (field_ref) {
+          keys.emplace_back(std::move(*field_ref));
+        } else {
+          return Status::Invalid(
+              "The grouping expression for an aggregate must be a direct reference.");
+        }
+      }
+
+      int measure_size = aggregate.measures_size();
+      std::vector<compute::Aggregate> aggregates;
+      aggregates.reserve(measure_size);
+      for (int measure_id = 0; measure_id < measure_size; measure_id++) {
+        const auto& agg_measure = aggregate.measures(measure_id);
+        if (agg_measure.has_measure()) {
+          if (agg_measure.has_filter()) {
+            return Status::NotImplemented("Aggregate filters are not supported.");
+          }
+          const auto& agg_func = agg_measure.measure();
+          if (agg_func.arguments_size() != 1) {
+            return Status::NotImplemented("Aggregate function must be a unary function.");
+          }
+          int func_reference = agg_func.function_reference();
+          ARROW_ASSIGN_OR_RAISE(auto func_record, ext_set.DecodeFunction(func_reference));
+          // aggreagte function name
+          auto func_name = std::string(func_record.id.name);
+          // aggregate target
+          auto subs_func_args = agg_func.arguments(0);
+          ARROW_ASSIGN_OR_RAISE(auto field_expr,
+                                FromProto(subs_func_args.value(), ext_set));
+          auto target = field_expr.field_ref();
+          if (!target) {
+            return Status::Invalid(
+                "The input expression to an aggregate function must be a direct "
+                "reference.");
+          }
+          aggregates.emplace_back(compute::Aggregate{std::move(func_name), NULLPTR,
+                                                     std::move(*target), std::move("")});

Review Comment:
   ```suggestion
             aggregates.emplace_back(compute::Aggregate{std::move(func_name), NULLPTR,
                                                        std::move(*target), std::move("")});
   ```
   
   Minor nit.  It might be better to use:
   
   ```
             // If you are going to create the instance yourself you can just use push_back
             aggregates.push_back(compute::Aggregate{std::move(func_name), NULLPTR,
                                                        std::move(*target), std::move("")});
   ```
   
   or...
   
   ```
             // If you are using emplace_back you do not need to create the instance
             // yourself.
             aggregates.emplace_back(std::move(func_name), NULLPTR,
                                                        std::move(*target), std::move(""));
   ```



##########
cpp/src/arrow/engine/substrait/relation_internal.cc:
##########
@@ -308,6 +308,71 @@ Result<compute::Declaration> FromProto(const substrait::Rel& rel,
       join_dec.inputs.emplace_back(std::move(right));
       return std::move(join_dec);
     }
+    case substrait::Rel::RelTypeCase::kAggregate: {
+      const auto& aggregate = rel.aggregate();
+      RETURN_NOT_OK(CheckRelCommon(aggregate));
+
+      if (!aggregate.has_input()) {
+        return Status::Invalid("substrait::AggregateRel with no input relation");
+      }
+
+      ARROW_ASSIGN_OR_RAISE(auto input, FromProto(aggregate.input(), ext_set));
+
+      if (aggregate.groupings_size() > 1) {
+        return Status::NotImplemented("Grouping sets not supported.");

Review Comment:
   ```suggestion
           return Status::NotImplemented("Grouping sets not supported.  AggregateRel::groupings may not have more than one item");
   ```
   
   Minor nit: If someone gets this error they might not immediately realize how to modify the substrait plan to support Acero.



##########
cpp/src/arrow/engine/substrait/serde_test.cc:
##########
@@ -1383,5 +1383,350 @@ TEST(Substrait, JoinPlanInvalidKeys) {
   }
 }
 
+TEST(Substrait, AggregateBase) {

Review Comment:
   ```suggestion
   TEST(Substrait, AggregateBasic) {
   ```
   
   Minor nit: `Base` might lead one to think that this is meant to be extended somehow.



##########
cpp/src/arrow/engine/substrait/serde_test.cc:
##########
@@ -1383,5 +1383,350 @@ TEST(Substrait, JoinPlanInvalidKeys) {
   }
 }
 
+TEST(Substrait, AggregateBase) {
+  ASSERT_OK_AND_ASSIGN(auto buf, internal::SubstraitFromJSON("Plan", R"({
+    "relations": [{
+      "rel": {
+        "aggregate": {
+          "input": {
+            "read": {
+              "base_schema": {
+                "names": ["A", "B", "C"],
+                "struct": {
+                  "types": [{
+                    "i32": {}
+                  }, {
+                    "i32": {}
+                  }, {
+                    "i32": {}
+                  }]
+                }
+              },
+              "local_files": { 
+                "items": [
+                  {
+                    "uri_file": "file:///tmp/dat.parquet",
+                    "parquet": {}
+                  }
+                ]
+              }
+            }
+          },
+          "groupings": [{
+            "groupingExpressions": [{
+              "selection": {
+                "directReference": {
+                  "structField": {
+                    "field": 0
+                  }
+                }
+              }
+            }]
+          }],
+          "measures": [{
+            "measure": {
+              "functionReference": 0,
+              "arguments": [{
+                "value": {
+                  "selection": {
+                    "directReference": {
+                      "structField": {
+                        "field": 1
+                      }
+                    }
+                  }
+                }
+            }],
+              "sorts": [],
+              "phase": "AGGREGATION_PHASE_INITIAL_TO_RESULT",
+              "outputType": {
+                "i64": {}
+              }
+            }
+          }]
+        }
+      }
+    }],
+    "extensionUris": [{
+      "extension_uri_anchor": 0,
+      "uri": "https://github.com/apache/arrow/blob/master/format/substrait/extension_types.yaml"
+    }],
+    "extensions": [{
+      "extension_function": {
+        "extension_uri_reference": 0,
+        "function_anchor": 0,
+        "name": "count"

Review Comment:
   This plan wouldn't actually work I think.  Since there is a grouping it will be a hash aggregate and will need to use `hash_count` instead of `count`.  However, we are not running the test end-to-end so I think we get away with it.  Still, might be nice to update it to be accurate.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] vibhatha commented on a diff in pull request #13130: ARROW-15591: [C++] Add support for aggregation to the Substrait consumer

Posted by GitBox <gi...@apache.org>.
vibhatha commented on code in PR #13130:
URL: https://github.com/apache/arrow/pull/13130#discussion_r919761437


##########
cpp/src/arrow/engine/substrait/relation_internal.cc:
##########
@@ -308,6 +308,71 @@ Result<compute::Declaration> FromProto(const substrait::Rel& rel,
       join_dec.inputs.emplace_back(std::move(right));
       return std::move(join_dec);
     }
+    case substrait::Rel::RelTypeCase::kAggregate: {
+      const auto& aggregate = rel.aggregate();
+      RETURN_NOT_OK(CheckRelCommon(aggregate));
+
+      if (!aggregate.has_input()) {
+        return Status::Invalid("substrait::AggregateRel with no input relation");
+      }
+
+      ARROW_ASSIGN_OR_RAISE(auto input, FromProto(aggregate.input(), ext_set));
+
+      if (aggregate.groupings_size() > 1) {
+        return Status::NotImplemented("Grouping sets not supported.");
+      }
+      std::vector<FieldRef> keys;
+      auto group = aggregate.groupings(0);
+      keys.reserve(group.grouping_expressions_size());
+      for (int exp_id = 0; exp_id < group.grouping_expressions_size(); exp_id++) {
+        ARROW_ASSIGN_OR_RAISE(auto expr,
+                              FromProto(group.grouping_expressions(exp_id), ext_set));
+        const auto* field_ref = expr.field_ref();
+        if (field_ref) {
+          keys.emplace_back(std::move(*field_ref));
+        } else {
+          return Status::Invalid(
+              "The grouping expression for an aggregate must be a direct reference.");
+        }
+      }
+
+      int measure_size = aggregate.measures_size();
+      std::vector<compute::Aggregate> aggregates;
+      aggregates.reserve(measure_size);
+      for (int measure_id = 0; measure_id < measure_size; measure_id++) {
+        const auto& agg_measure = aggregate.measures(measure_id);
+        if (agg_measure.has_measure()) {
+          if (agg_measure.has_filter()) {
+            return Status::NotImplemented("Aggregate filters are not supported.");
+          }
+          const auto& agg_func = agg_measure.measure();
+          if (agg_func.arguments_size() != 1) {
+            return Status::NotImplemented("Aggregate function must be a unary function.");
+          }
+          int func_reference = agg_func.function_reference();
+          ARROW_ASSIGN_OR_RAISE(auto func_record, ext_set.DecodeFunction(func_reference));
+          // aggreagte function name
+          auto func_name = std::string(func_record.id.name);
+          // aggregate target
+          auto subs_func_args = agg_func.arguments(0);
+          ARROW_ASSIGN_OR_RAISE(auto field_expr,
+                                FromProto(subs_func_args.value(), ext_set));
+          auto target = field_expr.field_ref();
+          if (!target) {
+            return Status::Invalid(
+                "The input expression to an aggregate function must be a direct "
+                "reference.");
+          }
+          aggregates.emplace_back(compute::Aggregate{std::move(func_name), NULLPTR,
+                                                     std::move(*target), std::move("")});

Review Comment:
   Can we do it without the move constructor deifned?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] github-actions[bot] commented on pull request #13130: ARROW-15591: [C++] Add support for aggregation to the Substrait consumer [WIP]

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on PR #13130:
URL: https://github.com/apache/arrow/pull/13130#issuecomment-1125030496

   https://issues.apache.org/jira/browse/ARROW-15591


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] ursabot commented on pull request #13130: ARROW-15591: [C++] Add support for aggregation to the Substrait consumer

Posted by GitBox <gi...@apache.org>.
ursabot commented on PR #13130:
URL: https://github.com/apache/arrow/pull/13130#issuecomment-1195235288

   Benchmark runs are scheduled for baseline = 898e12e67759de3df8e6c3ca75e3701c58ab50b3 and contender = 87cefe80c7126a6bf91b809159f39c9aa2a8db61. 87cefe80c7126a6bf91b809159f39c9aa2a8db61 is a master commit associated with this PR. Results will be available as each benchmark for each run completes.
   Conbench compare runs links:
   [Failed :arrow_down:0.0% :arrow_up:0.0%] [ec2-t3-xlarge-us-east-2](https://conbench.ursa.dev/compare/runs/2699dd1e3eda46dbb0176f300558f1e7...af69d38b1add4eb6ae046bc0528b1d34/)
   [Finished :arrow_down:0.41% :arrow_up:0.0%] [test-mac-arm](https://conbench.ursa.dev/compare/runs/586bb8fafe8f4f87b41f283594a0d1fb...c8921a35d4b740e5a97fe17cb385016a/)
   [Finished :arrow_down:0.0% :arrow_up:0.0%] [ursa-i9-9960x](https://conbench.ursa.dev/compare/runs/6ceff0b16f69450cbcad1111e8f95ee5...f2de322a9eeb4fd38fa16d76a0ccb630/)
   [Finished :arrow_down:0.64% :arrow_up:0.04%] [ursa-thinkcentre-m75q](https://conbench.ursa.dev/compare/runs/610f65c7d7ac433d81c6f4971b5de63d...caa268d4e9504335806e349636f31607/)
   Buildkite builds:
   [Failed] [`87cefe80` ec2-t3-xlarge-us-east-2](https://buildkite.com/apache-arrow/arrow-bci-benchmark-on-ec2-t3-xlarge-us-east-2/builds/1198)
   [Finished] [`87cefe80` test-mac-arm](https://buildkite.com/apache-arrow/arrow-bci-benchmark-on-test-mac-arm/builds/1210)
   [Finished] [`87cefe80` ursa-i9-9960x](https://buildkite.com/apache-arrow/arrow-bci-benchmark-on-ursa-i9-9960x/builds/1192)
   [Finished] [`87cefe80` ursa-thinkcentre-m75q](https://buildkite.com/apache-arrow/arrow-bci-benchmark-on-ursa-thinkcentre-m75q/builds/1212)
   [Failed] [`898e12e6` ec2-t3-xlarge-us-east-2](https://buildkite.com/apache-arrow/arrow-bci-benchmark-on-ec2-t3-xlarge-us-east-2/builds/1197)
   [Finished] [`898e12e6` test-mac-arm](https://buildkite.com/apache-arrow/arrow-bci-benchmark-on-test-mac-arm/builds/1209)
   [Finished] [`898e12e6` ursa-i9-9960x](https://buildkite.com/apache-arrow/arrow-bci-benchmark-on-ursa-i9-9960x/builds/1191)
   [Finished] [`898e12e6` ursa-thinkcentre-m75q](https://buildkite.com/apache-arrow/arrow-bci-benchmark-on-ursa-thinkcentre-m75q/builds/1211)
   Supported benchmarks:
   ec2-t3-xlarge-us-east-2: Supported benchmark langs: Python, R. Runs only benchmarks with cloud = True
   test-mac-arm: Supported benchmark langs: C++, Python, R
   ursa-i9-9960x: Supported benchmark langs: Python, R, JavaScript
   ursa-thinkcentre-m75q: Supported benchmark langs: C++, Java
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] vibhatha commented on pull request #13130: ARROW-15591: [C++] Add support for aggregation to the Substrait consumer

Posted by GitBox <gi...@apache.org>.
vibhatha commented on PR #13130:
URL: https://github.com/apache/arrow/pull/13130#issuecomment-1174485067

   cc @westonpace 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] westonpace merged pull request #13130: ARROW-15591: [C++] Add support for aggregation to the Substrait consumer

Posted by GitBox <gi...@apache.org>.
westonpace merged PR #13130:
URL: https://github.com/apache/arrow/pull/13130


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] vibhatha commented on a diff in pull request #13130: ARROW-15591: [C++] Add support for aggregation to the Substrait consumer

Posted by GitBox <gi...@apache.org>.
vibhatha commented on code in PR #13130:
URL: https://github.com/apache/arrow/pull/13130#discussion_r914415771


##########
cpp/src/arrow/engine/substrait/relation_internal.cc:
##########
@@ -309,6 +309,75 @@ Result<compute::Declaration> FromProto(const substrait::Rel& rel,
       join_dec.inputs.emplace_back(std::move(right));
       return std::move(join_dec);
     }
+    case substrait::Rel::RelTypeCase::kAggregate: {
+      const auto& aggregate = rel.aggregate();
+      RETURN_NOT_OK(CheckRelCommon(aggregate));
+
+      if (!aggregate.has_input()) {
+        return Status::Invalid("substrait::AggregateRel with no input relation");
+      }
+
+      ARROW_ASSIGN_OR_RAISE(auto input, FromProto(aggregate.input(), ext_set));
+
+      if (aggregate.groupings_size() > 1) {
+        return Status::Invalid("Grouping sets not supported.");
+      }
+      std::vector<FieldRef> keys;
+      auto group = aggregate.groupings(0);
+      keys.reserve(group.grouping_expressions_size());
+      for (int exp_id = 0; exp_id < group.grouping_expressions_size(); exp_id++) {
+        const auto& expr = FromProto(group.grouping_expressions(exp_id), ext_set);

Review Comment:
   Ah, thanks for noting this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] westonpace commented on a diff in pull request #13130: ARROW-15591: [C++] Add support for aggregation to the Substrait consumer

Posted by GitBox <gi...@apache.org>.
westonpace commented on code in PR #13130:
URL: https://github.com/apache/arrow/pull/13130#discussion_r923297309


##########
cpp/src/arrow/engine/substrait/serde_test.cc:
##########
@@ -1383,5 +1383,350 @@ TEST(Substrait, JoinPlanInvalidKeys) {
   }
 }
 
+TEST(Substrait, AggregateBasic) {
+  ASSERT_OK_AND_ASSIGN(auto buf, internal::SubstraitFromJSON("Plan", R"({
+    "relations": [{
+      "rel": {
+        "aggregate": {
+          "input": {
+            "read": {
+              "base_schema": {
+                "names": ["A", "B", "C"],
+                "struct": {
+                  "types": [{
+                    "i32": {}
+                  }, {
+                    "i32": {}
+                  }, {
+                    "i32": {}
+                  }]
+                }
+              },
+              "local_files": { 
+                "items": [
+                  {
+                    "uri_file": "file:///tmp/dat.parquet",
+                    "parquet": {}
+                  }
+                ]
+              }
+            }
+          },
+          "groupings": [{
+            "groupingExpressions": [{
+              "selection": {
+                "directReference": {
+                  "structField": {
+                    "field": 0
+                  }
+                }
+              }
+            }]
+          }],
+          "measures": [{
+            "measure": {
+              "functionReference": 0,
+              "arguments": [{
+                "value": {
+                  "selection": {
+                    "directReference": {
+                      "structField": {
+                        "field": 1
+                      }
+                    }
+                  }
+                }
+            }],
+              "sorts": [],
+              "phase": "AGGREGATION_PHASE_INITIAL_TO_RESULT",
+              "outputType": {
+                "i64": {}
+              }
+            }
+          }]
+        }
+      }
+    }],
+    "extensionUris": [{
+      "extension_uri_anchor": 0,
+      "uri": "https://github.com/apache/arrow/blob/master/format/substrait/extension_types.yaml"
+    }],
+    "extensions": [{
+      "extension_function": {
+        "extension_uri_reference": 0,
+        "function_anchor": 0,
+        "name": "hash_count"
+      }
+    }],
+  })"));
+
+  auto sp_ext_id_reg = substrait::MakeExtensionIdRegistry();
+  ExtensionIdRegistry* ext_id_reg = sp_ext_id_reg.get();
+  // invalid before registration
+  ExtensionSet ext_set_invalid(ext_id_reg);
+  ASSERT_OK_AND_ASSIGN(auto sink_decls, DeserializePlans(
+                                            *buf, [] { return kNullConsumer; },
+                                            ext_id_reg, &ext_set_invalid));

Review Comment:
   ```suggestion
     ASSERT_OK_AND_ASSIGN(auto sink_decls, DeserializePlans(
                                               *buf, [] { return kNullConsumer; });
   ```
   
   The third argument (`ext_id_reg`) is optional if you want to just use the default extension registry (which should be fine for this test).  The fourth argument `&ext_set_invalid` is an out-parameter, and also optional, so not needed for this test.



##########
cpp/src/arrow/engine/substrait/serde_test.cc:
##########
@@ -1383,5 +1383,350 @@ TEST(Substrait, JoinPlanInvalidKeys) {
   }
 }
 
+TEST(Substrait, AggregateBasic) {
+  ASSERT_OK_AND_ASSIGN(auto buf, internal::SubstraitFromJSON("Plan", R"({
+    "relations": [{
+      "rel": {
+        "aggregate": {
+          "input": {
+            "read": {
+              "base_schema": {
+                "names": ["A", "B", "C"],
+                "struct": {
+                  "types": [{
+                    "i32": {}
+                  }, {
+                    "i32": {}
+                  }, {
+                    "i32": {}
+                  }]
+                }
+              },
+              "local_files": { 
+                "items": [
+                  {
+                    "uri_file": "file:///tmp/dat.parquet",
+                    "parquet": {}
+                  }
+                ]
+              }
+            }
+          },
+          "groupings": [{
+            "groupingExpressions": [{
+              "selection": {
+                "directReference": {
+                  "structField": {
+                    "field": 0
+                  }
+                }
+              }
+            }]
+          }],
+          "measures": [{
+            "measure": {
+              "functionReference": 0,
+              "arguments": [{
+                "value": {
+                  "selection": {
+                    "directReference": {
+                      "structField": {
+                        "field": 1
+                      }
+                    }
+                  }
+                }
+            }],
+              "sorts": [],
+              "phase": "AGGREGATION_PHASE_INITIAL_TO_RESULT",
+              "outputType": {
+                "i64": {}
+              }
+            }
+          }]
+        }
+      }
+    }],
+    "extensionUris": [{
+      "extension_uri_anchor": 0,
+      "uri": "https://github.com/apache/arrow/blob/master/format/substrait/extension_types.yaml"
+    }],
+    "extensions": [{
+      "extension_function": {
+        "extension_uri_reference": 0,
+        "function_anchor": 0,
+        "name": "hash_count"
+      }
+    }],
+  })"));
+
+  auto sp_ext_id_reg = substrait::MakeExtensionIdRegistry();
+  ExtensionIdRegistry* ext_id_reg = sp_ext_id_reg.get();
+  // invalid before registration
+  ExtensionSet ext_set_invalid(ext_id_reg);
+  ASSERT_OK_AND_ASSIGN(auto sink_decls, DeserializePlans(
+                                            *buf, [] { return kNullConsumer; },
+                                            ext_id_reg, &ext_set_invalid));
+  auto agg_decl = sink_decls[0].inputs[0];
+
+  const auto& agg_rel = agg_decl.get<compute::Declaration>();
+
+  const auto& agg_options =
+      checked_cast<const compute::AggregateNodeOptions&>(*agg_rel->options);
+
+  EXPECT_EQ(agg_rel->factory_name, "aggregate");
+  EXPECT_EQ(agg_options.aggregates[0].name, "");
+  EXPECT_EQ(agg_options.aggregates[0].function, "hash_count");
+}
+
+TEST(Substrait, AggregateInvalidRel) {
+  ASSERT_OK_AND_ASSIGN(auto buf, internal::SubstraitFromJSON("Plan", R"({
+    "relations": [{
+      "rel": {
+        "aggregate": {
+        }
+      }
+    }],
+    "extensionUris": [{
+      "extension_uri_anchor": 0,
+      "uri": "https://github.com/apache/arrow/blob/master/format/substrait/extension_types.yaml"
+    }],
+    "extensions": [{
+      "extension_function": {
+        "extension_uri_reference": 0,
+        "function_anchor": 0,
+        "name": "hash_count"
+      }
+    }],
+  })"));
+
+  auto sp_ext_id_reg = substrait::MakeExtensionIdRegistry();
+  ExtensionIdRegistry* ext_id_reg = sp_ext_id_reg.get();
+  // invalid before registration
+  ExtensionSet ext_set_invalid(ext_id_reg);
+  ASSERT_RAISES(Invalid,
+                DeserializePlans(
+                    *buf, [] { return kNullConsumer; }, ext_id_reg, &ext_set_invalid));

Review Comment:
   ```suggestion
     ASSERT_RAISES(Invalid,
                   DeserializePlans(
                       *buf, [] { return kNullConsumer; }));
   ```



##########
cpp/src/arrow/engine/substrait/serde_test.cc:
##########
@@ -1383,5 +1383,350 @@ TEST(Substrait, JoinPlanInvalidKeys) {
   }
 }
 
+TEST(Substrait, AggregateBasic) {
+  ASSERT_OK_AND_ASSIGN(auto buf, internal::SubstraitFromJSON("Plan", R"({
+    "relations": [{
+      "rel": {
+        "aggregate": {
+          "input": {
+            "read": {
+              "base_schema": {
+                "names": ["A", "B", "C"],
+                "struct": {
+                  "types": [{
+                    "i32": {}
+                  }, {
+                    "i32": {}
+                  }, {
+                    "i32": {}
+                  }]
+                }
+              },
+              "local_files": { 
+                "items": [
+                  {
+                    "uri_file": "file:///tmp/dat.parquet",
+                    "parquet": {}
+                  }
+                ]
+              }
+            }
+          },
+          "groupings": [{
+            "groupingExpressions": [{
+              "selection": {
+                "directReference": {
+                  "structField": {
+                    "field": 0
+                  }
+                }
+              }
+            }]
+          }],
+          "measures": [{
+            "measure": {
+              "functionReference": 0,
+              "arguments": [{
+                "value": {
+                  "selection": {
+                    "directReference": {
+                      "structField": {
+                        "field": 1
+                      }
+                    }
+                  }
+                }
+            }],
+              "sorts": [],
+              "phase": "AGGREGATION_PHASE_INITIAL_TO_RESULT",
+              "outputType": {
+                "i64": {}
+              }
+            }
+          }]
+        }
+      }
+    }],
+    "extensionUris": [{
+      "extension_uri_anchor": 0,
+      "uri": "https://github.com/apache/arrow/blob/master/format/substrait/extension_types.yaml"
+    }],
+    "extensions": [{
+      "extension_function": {
+        "extension_uri_reference": 0,
+        "function_anchor": 0,
+        "name": "hash_count"
+      }
+    }],
+  })"));
+
+  auto sp_ext_id_reg = substrait::MakeExtensionIdRegistry();
+  ExtensionIdRegistry* ext_id_reg = sp_ext_id_reg.get();
+  // invalid before registration
+  ExtensionSet ext_set_invalid(ext_id_reg);
+  ASSERT_OK_AND_ASSIGN(auto sink_decls, DeserializePlans(
+                                            *buf, [] { return kNullConsumer; },
+                                            ext_id_reg, &ext_set_invalid));
+  auto agg_decl = sink_decls[0].inputs[0];
+
+  const auto& agg_rel = agg_decl.get<compute::Declaration>();
+
+  const auto& agg_options =
+      checked_cast<const compute::AggregateNodeOptions&>(*agg_rel->options);
+
+  EXPECT_EQ(agg_rel->factory_name, "aggregate");
+  EXPECT_EQ(agg_options.aggregates[0].name, "");
+  EXPECT_EQ(agg_options.aggregates[0].function, "hash_count");
+}
+
+TEST(Substrait, AggregateInvalidRel) {
+  ASSERT_OK_AND_ASSIGN(auto buf, internal::SubstraitFromJSON("Plan", R"({
+    "relations": [{
+      "rel": {
+        "aggregate": {
+        }
+      }
+    }],
+    "extensionUris": [{
+      "extension_uri_anchor": 0,
+      "uri": "https://github.com/apache/arrow/blob/master/format/substrait/extension_types.yaml"
+    }],
+    "extensions": [{
+      "extension_function": {
+        "extension_uri_reference": 0,
+        "function_anchor": 0,
+        "name": "hash_count"
+      }
+    }],
+  })"));
+
+  auto sp_ext_id_reg = substrait::MakeExtensionIdRegistry();
+  ExtensionIdRegistry* ext_id_reg = sp_ext_id_reg.get();
+  // invalid before registration
+  ExtensionSet ext_set_invalid(ext_id_reg);
+  ASSERT_RAISES(Invalid,
+                DeserializePlans(
+                    *buf, [] { return kNullConsumer; }, ext_id_reg, &ext_set_invalid));
+}
+
+TEST(Substrait, AggregateInvalidFunction) {
+  ASSERT_OK_AND_ASSIGN(auto buf, internal::SubstraitFromJSON("Plan", R"({
+    "relations": [{
+      "rel": {
+        "aggregate": {
+          "input": {
+            "read": {
+              "base_schema": {
+                "names": ["A", "B", "C"],
+                "struct": {
+                  "types": [{
+                    "i32": {}
+                  }, {
+                    "i32": {}
+                  }, {
+                    "i32": {}
+                  }]
+                }
+              },
+              "local_files": { 
+                "items": [
+                  {
+                    "uri_file": "file:///tmp/dat.parquet",
+                    "parquet": {}
+                  }
+                ]
+              }
+            }
+          },
+          "groupings": [{
+            "groupingExpressions": [{
+              "selection": {
+                "directReference": {
+                  "structField": {
+                    "field": 0
+                  }
+                }
+              }
+            }]
+          }],
+          "measures": [{
+          }]
+        }
+      }
+    }],
+    "extensionUris": [{
+      "extension_uri_anchor": 0,
+      "uri": "https://github.com/apache/arrow/blob/master/format/substrait/extension_types.yaml"
+    }],
+    "extensions": [{
+      "extension_function": {
+        "extension_uri_reference": 0,
+        "function_anchor": 0,
+        "name": "hash_count"
+      }
+    }],
+  })"));
+
+  auto sp_ext_id_reg = substrait::MakeExtensionIdRegistry();
+  ExtensionIdRegistry* ext_id_reg = sp_ext_id_reg.get();
+  // invalid before registration
+  ExtensionSet ext_set_invalid(ext_id_reg);
+  ASSERT_RAISES(Invalid,
+                DeserializePlans(
+                    *buf, [] { return kNullConsumer; }, ext_id_reg, &ext_set_invalid));

Review Comment:
   ```suggestion
     ASSERT_RAISES(Invalid,
                   DeserializePlans(
                       *buf, [] { return kNullConsumer; }));
   ```



##########
cpp/src/arrow/engine/substrait/serde_test.cc:
##########
@@ -1383,5 +1383,350 @@ TEST(Substrait, JoinPlanInvalidKeys) {
   }
 }
 
+TEST(Substrait, AggregateBasic) {
+  ASSERT_OK_AND_ASSIGN(auto buf, internal::SubstraitFromJSON("Plan", R"({
+    "relations": [{
+      "rel": {
+        "aggregate": {
+          "input": {
+            "read": {
+              "base_schema": {
+                "names": ["A", "B", "C"],
+                "struct": {
+                  "types": [{
+                    "i32": {}
+                  }, {
+                    "i32": {}
+                  }, {
+                    "i32": {}
+                  }]
+                }
+              },
+              "local_files": { 
+                "items": [
+                  {
+                    "uri_file": "file:///tmp/dat.parquet",
+                    "parquet": {}
+                  }
+                ]
+              }
+            }
+          },
+          "groupings": [{
+            "groupingExpressions": [{
+              "selection": {
+                "directReference": {
+                  "structField": {
+                    "field": 0
+                  }
+                }
+              }
+            }]
+          }],
+          "measures": [{
+            "measure": {
+              "functionReference": 0,
+              "arguments": [{
+                "value": {
+                  "selection": {
+                    "directReference": {
+                      "structField": {
+                        "field": 1
+                      }
+                    }
+                  }
+                }
+            }],
+              "sorts": [],
+              "phase": "AGGREGATION_PHASE_INITIAL_TO_RESULT",
+              "outputType": {
+                "i64": {}
+              }
+            }
+          }]
+        }
+      }
+    }],
+    "extensionUris": [{
+      "extension_uri_anchor": 0,
+      "uri": "https://github.com/apache/arrow/blob/master/format/substrait/extension_types.yaml"
+    }],
+    "extensions": [{
+      "extension_function": {
+        "extension_uri_reference": 0,
+        "function_anchor": 0,
+        "name": "hash_count"
+      }
+    }],
+  })"));
+
+  auto sp_ext_id_reg = substrait::MakeExtensionIdRegistry();
+  ExtensionIdRegistry* ext_id_reg = sp_ext_id_reg.get();
+  // invalid before registration
+  ExtensionSet ext_set_invalid(ext_id_reg);
+  ASSERT_OK_AND_ASSIGN(auto sink_decls, DeserializePlans(
+                                            *buf, [] { return kNullConsumer; },
+                                            ext_id_reg, &ext_set_invalid));
+  auto agg_decl = sink_decls[0].inputs[0];
+
+  const auto& agg_rel = agg_decl.get<compute::Declaration>();
+
+  const auto& agg_options =
+      checked_cast<const compute::AggregateNodeOptions&>(*agg_rel->options);
+
+  EXPECT_EQ(agg_rel->factory_name, "aggregate");
+  EXPECT_EQ(agg_options.aggregates[0].name, "");
+  EXPECT_EQ(agg_options.aggregates[0].function, "hash_count");
+}
+
+TEST(Substrait, AggregateInvalidRel) {
+  ASSERT_OK_AND_ASSIGN(auto buf, internal::SubstraitFromJSON("Plan", R"({
+    "relations": [{
+      "rel": {
+        "aggregate": {
+        }
+      }
+    }],
+    "extensionUris": [{
+      "extension_uri_anchor": 0,
+      "uri": "https://github.com/apache/arrow/blob/master/format/substrait/extension_types.yaml"
+    }],
+    "extensions": [{
+      "extension_function": {
+        "extension_uri_reference": 0,
+        "function_anchor": 0,
+        "name": "hash_count"
+      }
+    }],
+  })"));
+
+  auto sp_ext_id_reg = substrait::MakeExtensionIdRegistry();
+  ExtensionIdRegistry* ext_id_reg = sp_ext_id_reg.get();
+  // invalid before registration
+  ExtensionSet ext_set_invalid(ext_id_reg);
+  ASSERT_RAISES(Invalid,
+                DeserializePlans(
+                    *buf, [] { return kNullConsumer; }, ext_id_reg, &ext_set_invalid));
+}
+
+TEST(Substrait, AggregateInvalidFunction) {
+  ASSERT_OK_AND_ASSIGN(auto buf, internal::SubstraitFromJSON("Plan", R"({
+    "relations": [{
+      "rel": {
+        "aggregate": {
+          "input": {
+            "read": {
+              "base_schema": {
+                "names": ["A", "B", "C"],
+                "struct": {
+                  "types": [{
+                    "i32": {}
+                  }, {
+                    "i32": {}
+                  }, {
+                    "i32": {}
+                  }]
+                }
+              },
+              "local_files": { 
+                "items": [
+                  {
+                    "uri_file": "file:///tmp/dat.parquet",
+                    "parquet": {}
+                  }
+                ]
+              }
+            }
+          },
+          "groupings": [{
+            "groupingExpressions": [{
+              "selection": {
+                "directReference": {
+                  "structField": {
+                    "field": 0
+                  }
+                }
+              }
+            }]
+          }],
+          "measures": [{
+          }]
+        }
+      }
+    }],
+    "extensionUris": [{
+      "extension_uri_anchor": 0,
+      "uri": "https://github.com/apache/arrow/blob/master/format/substrait/extension_types.yaml"
+    }],
+    "extensions": [{
+      "extension_function": {
+        "extension_uri_reference": 0,
+        "function_anchor": 0,
+        "name": "hash_count"
+      }
+    }],
+  })"));
+
+  auto sp_ext_id_reg = substrait::MakeExtensionIdRegistry();
+  ExtensionIdRegistry* ext_id_reg = sp_ext_id_reg.get();
+  // invalid before registration
+  ExtensionSet ext_set_invalid(ext_id_reg);
+  ASSERT_RAISES(Invalid,
+                DeserializePlans(
+                    *buf, [] { return kNullConsumer; }, ext_id_reg, &ext_set_invalid));
+}
+
+TEST(Substrait, AggregateInvalidAggFuncArgs) {
+  ASSERT_OK_AND_ASSIGN(auto buf, internal::SubstraitFromJSON("Plan", R"({
+    "relations": [{
+      "rel": {
+        "aggregate": {
+          "input": {
+            "read": {
+              "base_schema": {
+                "names": ["A", "B", "C"],
+                "struct": {
+                  "types": [{
+                    "i32": {}
+                  }, {
+                    "i32": {}
+                  }, {
+                    "i32": {}
+                  }]
+                }
+              },
+              "local_files": { 
+                "items": [
+                  {
+                    "uri_file": "file:///tmp/dat.parquet",
+                    "parquet": {}
+                  }
+                ]
+              }
+            }
+          },
+          "groupings": [{
+            "groupingExpressions": [{
+              "selection": {
+                "directReference": {
+                  "structField": {
+                    "field": 0
+                  }
+                }
+              }
+            }]
+          }],
+          "measures": [{
+            "measure": {
+              "functionReference": 0,
+              "args": [],
+              "sorts": [],
+              "phase": "AGGREGATION_PHASE_INITIAL_TO_RESULT",
+              "outputType": {
+                "i64": {}
+              }
+            }
+          }]
+        }
+      }
+    }],
+    "extensionUris": [{
+      "extension_uri_anchor": 0,
+      "uri": "https://github.com/apache/arrow/blob/master/format/substrait/extension_types.yaml"
+    }],
+    "extensions": [{
+      "extension_function": {
+        "extension_uri_reference": 0,
+        "function_anchor": 0,
+        "name": "hash_count"
+      }
+    }],
+  })"));
+
+  auto sp_ext_id_reg = substrait::MakeExtensionIdRegistry();
+  ExtensionIdRegistry* ext_id_reg = sp_ext_id_reg.get();
+  // invalid before registration
+  ExtensionSet ext_set_invalid(ext_id_reg);
+  ASSERT_RAISES(NotImplemented,
+                DeserializePlans(
+                    *buf, [] { return kNullConsumer; }, ext_id_reg, &ext_set_invalid));
+}
+
+TEST(Substrait, AggregateWithFilter) {
+  ASSERT_OK_AND_ASSIGN(auto buf, internal::SubstraitFromJSON("Plan", R"({
+    "relations": [{
+      "rel": {
+        "aggregate": {
+          "input": {
+            "read": {
+              "base_schema": {
+                "names": ["A", "B", "C"],
+                "struct": {
+                  "types": [{
+                    "i32": {}
+                  }, {
+                    "i32": {}
+                  }, {
+                    "i32": {}
+                  }]
+                }
+              },
+              "local_files": { 
+                "items": [
+                  {
+                    "uri_file": "file:///tmp/dat.parquet",
+                    "parquet": {}
+                  }
+                ]
+              }
+            }
+          },
+          "groupings": [{
+            "groupingExpressions": [{
+              "selection": {
+                "directReference": {
+                  "structField": {
+                    "field": 0
+                  }
+                }
+              }
+            }]
+          }],
+          "measures": [{
+            "measure": {
+              "functionReference": 0,
+              "args": [],
+              "sorts": [],
+              "phase": "AGGREGATION_PHASE_INITIAL_TO_RESULT",
+              "outputType": {
+                "i64": {}
+              }
+            }
+          }]
+        }
+      }
+    }],
+    "extensionUris": [{
+      "extension_uri_anchor": 0,
+      "uri": "https://github.com/apache/arrow/blob/master/format/substrait/extension_types.yaml"
+    }],
+    "extensions": [{
+      "extension_function": {
+        "extension_uri_reference": 0,
+        "function_anchor": 0,
+        "name": "equal"
+      }
+    }],
+  })"));
+
+  auto sp_ext_id_reg = substrait::MakeExtensionIdRegistry();
+  ExtensionIdRegistry* ext_id_reg = sp_ext_id_reg.get();
+  // invalid before registration
+  ExtensionSet ext_set_invalid(ext_id_reg);
+  ASSERT_RAISES(NotImplemented,
+                DeserializePlans(
+                    *buf, [] { return kNullConsumer; }, ext_id_reg, &ext_set_invalid));

Review Comment:
   ```suggestion
     ASSERT_RAISES(NotImplemented,
                   DeserializePlans(
                       *buf, [] { return kNullConsumer; }));
   ```



##########
cpp/src/arrow/engine/substrait/serde_test.cc:
##########
@@ -1383,5 +1383,350 @@ TEST(Substrait, JoinPlanInvalidKeys) {
   }
 }
 
+TEST(Substrait, AggregateBasic) {
+  ASSERT_OK_AND_ASSIGN(auto buf, internal::SubstraitFromJSON("Plan", R"({
+    "relations": [{
+      "rel": {
+        "aggregate": {
+          "input": {
+            "read": {
+              "base_schema": {
+                "names": ["A", "B", "C"],
+                "struct": {
+                  "types": [{
+                    "i32": {}
+                  }, {
+                    "i32": {}
+                  }, {
+                    "i32": {}
+                  }]
+                }
+              },
+              "local_files": { 
+                "items": [
+                  {
+                    "uri_file": "file:///tmp/dat.parquet",
+                    "parquet": {}
+                  }
+                ]
+              }
+            }
+          },
+          "groupings": [{
+            "groupingExpressions": [{
+              "selection": {
+                "directReference": {
+                  "structField": {
+                    "field": 0
+                  }
+                }
+              }
+            }]
+          }],
+          "measures": [{
+            "measure": {
+              "functionReference": 0,
+              "arguments": [{
+                "value": {
+                  "selection": {
+                    "directReference": {
+                      "structField": {
+                        "field": 1
+                      }
+                    }
+                  }
+                }
+            }],
+              "sorts": [],
+              "phase": "AGGREGATION_PHASE_INITIAL_TO_RESULT",
+              "outputType": {
+                "i64": {}
+              }
+            }
+          }]
+        }
+      }
+    }],
+    "extensionUris": [{
+      "extension_uri_anchor": 0,
+      "uri": "https://github.com/apache/arrow/blob/master/format/substrait/extension_types.yaml"
+    }],
+    "extensions": [{
+      "extension_function": {
+        "extension_uri_reference": 0,
+        "function_anchor": 0,
+        "name": "hash_count"
+      }
+    }],
+  })"));
+
+  auto sp_ext_id_reg = substrait::MakeExtensionIdRegistry();
+  ExtensionIdRegistry* ext_id_reg = sp_ext_id_reg.get();
+  // invalid before registration
+  ExtensionSet ext_set_invalid(ext_id_reg);
+  ASSERT_OK_AND_ASSIGN(auto sink_decls, DeserializePlans(
+                                            *buf, [] { return kNullConsumer; },
+                                            ext_id_reg, &ext_set_invalid));
+  auto agg_decl = sink_decls[0].inputs[0];
+
+  const auto& agg_rel = agg_decl.get<compute::Declaration>();
+
+  const auto& agg_options =
+      checked_cast<const compute::AggregateNodeOptions&>(*agg_rel->options);
+
+  EXPECT_EQ(agg_rel->factory_name, "aggregate");
+  EXPECT_EQ(agg_options.aggregates[0].name, "");
+  EXPECT_EQ(agg_options.aggregates[0].function, "hash_count");
+}
+
+TEST(Substrait, AggregateInvalidRel) {
+  ASSERT_OK_AND_ASSIGN(auto buf, internal::SubstraitFromJSON("Plan", R"({
+    "relations": [{
+      "rel": {
+        "aggregate": {
+        }
+      }
+    }],
+    "extensionUris": [{
+      "extension_uri_anchor": 0,
+      "uri": "https://github.com/apache/arrow/blob/master/format/substrait/extension_types.yaml"
+    }],
+    "extensions": [{
+      "extension_function": {
+        "extension_uri_reference": 0,
+        "function_anchor": 0,
+        "name": "hash_count"
+      }
+    }],
+  })"));
+
+  auto sp_ext_id_reg = substrait::MakeExtensionIdRegistry();
+  ExtensionIdRegistry* ext_id_reg = sp_ext_id_reg.get();
+  // invalid before registration
+  ExtensionSet ext_set_invalid(ext_id_reg);
+  ASSERT_RAISES(Invalid,
+                DeserializePlans(
+                    *buf, [] { return kNullConsumer; }, ext_id_reg, &ext_set_invalid));
+}
+
+TEST(Substrait, AggregateInvalidFunction) {
+  ASSERT_OK_AND_ASSIGN(auto buf, internal::SubstraitFromJSON("Plan", R"({
+    "relations": [{
+      "rel": {
+        "aggregate": {
+          "input": {
+            "read": {
+              "base_schema": {
+                "names": ["A", "B", "C"],
+                "struct": {
+                  "types": [{
+                    "i32": {}
+                  }, {
+                    "i32": {}
+                  }, {
+                    "i32": {}
+                  }]
+                }
+              },
+              "local_files": { 
+                "items": [
+                  {
+                    "uri_file": "file:///tmp/dat.parquet",
+                    "parquet": {}
+                  }
+                ]
+              }
+            }
+          },
+          "groupings": [{
+            "groupingExpressions": [{
+              "selection": {
+                "directReference": {
+                  "structField": {
+                    "field": 0
+                  }
+                }
+              }
+            }]
+          }],
+          "measures": [{
+          }]
+        }
+      }
+    }],
+    "extensionUris": [{
+      "extension_uri_anchor": 0,
+      "uri": "https://github.com/apache/arrow/blob/master/format/substrait/extension_types.yaml"
+    }],
+    "extensions": [{
+      "extension_function": {
+        "extension_uri_reference": 0,
+        "function_anchor": 0,
+        "name": "hash_count"
+      }
+    }],
+  })"));
+
+  auto sp_ext_id_reg = substrait::MakeExtensionIdRegistry();
+  ExtensionIdRegistry* ext_id_reg = sp_ext_id_reg.get();
+  // invalid before registration
+  ExtensionSet ext_set_invalid(ext_id_reg);
+  ASSERT_RAISES(Invalid,
+                DeserializePlans(
+                    *buf, [] { return kNullConsumer; }, ext_id_reg, &ext_set_invalid));
+}
+
+TEST(Substrait, AggregateInvalidAggFuncArgs) {
+  ASSERT_OK_AND_ASSIGN(auto buf, internal::SubstraitFromJSON("Plan", R"({
+    "relations": [{
+      "rel": {
+        "aggregate": {
+          "input": {
+            "read": {
+              "base_schema": {
+                "names": ["A", "B", "C"],
+                "struct": {
+                  "types": [{
+                    "i32": {}
+                  }, {
+                    "i32": {}
+                  }, {
+                    "i32": {}
+                  }]
+                }
+              },
+              "local_files": { 
+                "items": [
+                  {
+                    "uri_file": "file:///tmp/dat.parquet",
+                    "parquet": {}
+                  }
+                ]
+              }
+            }
+          },
+          "groupings": [{
+            "groupingExpressions": [{
+              "selection": {
+                "directReference": {
+                  "structField": {
+                    "field": 0
+                  }
+                }
+              }
+            }]
+          }],
+          "measures": [{
+            "measure": {
+              "functionReference": 0,
+              "args": [],
+              "sorts": [],
+              "phase": "AGGREGATION_PHASE_INITIAL_TO_RESULT",
+              "outputType": {
+                "i64": {}
+              }
+            }
+          }]
+        }
+      }
+    }],
+    "extensionUris": [{
+      "extension_uri_anchor": 0,
+      "uri": "https://github.com/apache/arrow/blob/master/format/substrait/extension_types.yaml"
+    }],
+    "extensions": [{
+      "extension_function": {
+        "extension_uri_reference": 0,
+        "function_anchor": 0,
+        "name": "hash_count"
+      }
+    }],
+  })"));
+
+  auto sp_ext_id_reg = substrait::MakeExtensionIdRegistry();
+  ExtensionIdRegistry* ext_id_reg = sp_ext_id_reg.get();
+  // invalid before registration
+  ExtensionSet ext_set_invalid(ext_id_reg);
+  ASSERT_RAISES(NotImplemented,
+                DeserializePlans(
+                    *buf, [] { return kNullConsumer; }, ext_id_reg, &ext_set_invalid));

Review Comment:
   ```suggestion
     ASSERT_RAISES(NotImplemented,
                   DeserializePlans(
                       *buf, [] { return kNullConsumer; }));
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] vibhatha commented on pull request #13130: ARROW-15591: [C++] Add support for aggregation to the Substrait consumer

Posted by GitBox <gi...@apache.org>.
vibhatha commented on PR #13130:
URL: https://github.com/apache/arrow/pull/13130#issuecomment-1187254843

   > I noticed we can probably simplify the unit tests just a bit. Otherwise this is good to go.
   
   I will update the test cases.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] vibhatha commented on a diff in pull request #13130: ARROW-15591: [C++] Add support for aggregation to the Substrait consumer

Posted by GitBox <gi...@apache.org>.
vibhatha commented on code in PR #13130:
URL: https://github.com/apache/arrow/pull/13130#discussion_r914365089


##########
cpp/src/arrow/engine/substrait/relation_internal.cc:
##########
@@ -309,6 +309,75 @@ Result<compute::Declaration> FromProto(const substrait::Rel& rel,
       join_dec.inputs.emplace_back(std::move(right));
       return std::move(join_dec);
     }
+    case substrait::Rel::RelTypeCase::kAggregate: {
+      const auto& aggregate = rel.aggregate();
+      RETURN_NOT_OK(CheckRelCommon(aggregate));
+
+      if (!aggregate.has_input()) {
+        return Status::Invalid("substrait::AggregateRel with no input relation");
+      }
+
+      ARROW_ASSIGN_OR_RAISE(auto input, FromProto(aggregate.input(), ext_set));
+
+      if (aggregate.groupings_size() > 1) {
+        return Status::Invalid("Grouping sets not supported.");
+      }
+      std::vector<FieldRef> keys;
+      auto group = aggregate.groupings(0);
+      keys.reserve(group.grouping_expressions_size());
+      for (int exp_id = 0; exp_id < group.grouping_expressions_size(); exp_id++) {
+        const auto& expr = FromProto(group.grouping_expressions(exp_id), ext_set);
+        const auto& field_ref = expr->field_ref();
+        if (field_ref) {
+          keys.emplace_back(std::move(*field_ref));
+        } else {
+          return Status::Invalid(
+              "Only accept a direct reference as the grouping expression for aggregates");
+        }
+      }
+      // denotes how many unique aggregation functions are used
+      // measure_id refers to the corresponding function in the
+      // extensionsion
+      int measure_size = aggregate.measures_size();
+      std::vector<compute::Aggregate> aggregates;
+      aggregates.reserve(measure_size);
+      for (int measure_id = 0; measure_id < measure_size; measure_id++) {
+        const auto& agg_measure = aggregate.measures(measure_id);
+        if (agg_measure.has_measure()) {
+          if (agg_measure.has_filter()) {
+            return Status::Invalid("Aggregate filters are not supported.");
+          }
+          const auto& agg_func = agg_measure.measure();
+          if (agg_func.args_size() != 1) {
+            return Status::Invalid("Aggregate function must be a unary function.");
+          }
+          int func_reference = agg_func.function_reference();
+          ARROW_ASSIGN_OR_RAISE(auto func_record, ext_set.DecodeFunction(func_reference));
+          // aggreagte function name
+          auto func_name = std::string(func_record.id.name);
+          // aggregate output column name
+          std::string agg_col_name =
+              func_name + "(" + std::to_string(func_reference) + ")";

Review Comment:
   You have a good point there, I just added it for some readability. We can ignore it too. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] vibhatha commented on pull request #13130: ARROW-15591: [C++] Add support for aggregation to the Substrait consumer

Posted by GitBox <gi...@apache.org>.
vibhatha commented on PR #13130:
URL: https://github.com/apache/arrow/pull/13130#issuecomment-1175716618

   > This looks correct. I have a few questions. Maybe it would be good to also add an end-to-end test now that we have Substrait working in python?
   
   @westonpace I was also thinking about this. I sort of roughly sketched it mentally like this. 
   
   Create a test suite for substrait in Python where we use the `testing` dataset or any generated data to do the dataflow operations and add unit tests for each relation in a set of PRs. Since we have incorporated more relations now, I think a few small PRs can help with this. I will go ahead and add a JIRA to record this. By the way we were thinking about saving space for the lengthy strings required for creating Substrait plans. Is it in action now or if not we can integrate it later on. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org