You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2022/07/06 00:16:32 UTC

[GitHub] [arrow] westonpace commented on a diff in pull request #13130: ARROW-15591: [C++] Add support for aggregation to the Substrait consumer

westonpace commented on code in PR #13130:
URL: https://github.com/apache/arrow/pull/13130#discussion_r914295005


##########
cpp/src/arrow/engine/substrait/relation_internal.cc:
##########
@@ -309,6 +309,75 @@ Result<compute::Declaration> FromProto(const substrait::Rel& rel,
       join_dec.inputs.emplace_back(std::move(right));
       return std::move(join_dec);
     }
+    case substrait::Rel::RelTypeCase::kAggregate: {
+      const auto& aggregate = rel.aggregate();
+      RETURN_NOT_OK(CheckRelCommon(aggregate));
+
+      if (!aggregate.has_input()) {
+        return Status::Invalid("substrait::AggregateRel with no input relation");
+      }
+
+      ARROW_ASSIGN_OR_RAISE(auto input, FromProto(aggregate.input(), ext_set));
+
+      if (aggregate.groupings_size() > 1) {
+        return Status::Invalid("Grouping sets not supported.");
+      }
+      std::vector<FieldRef> keys;
+      auto group = aggregate.groupings(0);
+      keys.reserve(group.grouping_expressions_size());
+      for (int exp_id = 0; exp_id < group.grouping_expressions_size(); exp_id++) {
+        const auto& expr = FromProto(group.grouping_expressions(exp_id), ext_set);

Review Comment:
   Use `ARROW_ASSIGN_OR_RAISE` here instead of using `->` later.  Otherwise, if something goes wrong converting the expression, it will result in an abort (which we don't want) instead of an invalid status (which we would want).



##########
cpp/src/arrow/engine/substrait/relation_internal.cc:
##########
@@ -309,6 +309,75 @@ Result<compute::Declaration> FromProto(const substrait::Rel& rel,
       join_dec.inputs.emplace_back(std::move(right));
       return std::move(join_dec);
     }
+    case substrait::Rel::RelTypeCase::kAggregate: {
+      const auto& aggregate = rel.aggregate();
+      RETURN_NOT_OK(CheckRelCommon(aggregate));
+
+      if (!aggregate.has_input()) {
+        return Status::Invalid("substrait::AggregateRel with no input relation");
+      }
+
+      ARROW_ASSIGN_OR_RAISE(auto input, FromProto(aggregate.input(), ext_set));
+
+      if (aggregate.groupings_size() > 1) {
+        return Status::Invalid("Grouping sets not supported.");
+      }
+      std::vector<FieldRef> keys;
+      auto group = aggregate.groupings(0);
+      keys.reserve(group.grouping_expressions_size());
+      for (int exp_id = 0; exp_id < group.grouping_expressions_size(); exp_id++) {
+        const auto& expr = FromProto(group.grouping_expressions(exp_id), ext_set);
+        const auto& field_ref = expr->field_ref();

Review Comment:
   ```suggestion
           const auto* field_ref = expr->field_ref();
   ```



##########
cpp/src/arrow/engine/substrait/relation_internal.cc:
##########
@@ -309,6 +309,75 @@ Result<compute::Declaration> FromProto(const substrait::Rel& rel,
       join_dec.inputs.emplace_back(std::move(right));
       return std::move(join_dec);
     }
+    case substrait::Rel::RelTypeCase::kAggregate: {
+      const auto& aggregate = rel.aggregate();
+      RETURN_NOT_OK(CheckRelCommon(aggregate));
+
+      if (!aggregate.has_input()) {
+        return Status::Invalid("substrait::AggregateRel with no input relation");
+      }
+
+      ARROW_ASSIGN_OR_RAISE(auto input, FromProto(aggregate.input(), ext_set));
+
+      if (aggregate.groupings_size() > 1) {
+        return Status::Invalid("Grouping sets not supported.");

Review Comment:
   ```suggestion
           return Status::NotImplemented("Grouping sets not supported.");
   ```



##########
cpp/src/arrow/engine/substrait/relation_internal.cc:
##########
@@ -309,6 +309,75 @@ Result<compute::Declaration> FromProto(const substrait::Rel& rel,
       join_dec.inputs.emplace_back(std::move(right));
       return std::move(join_dec);
     }
+    case substrait::Rel::RelTypeCase::kAggregate: {
+      const auto& aggregate = rel.aggregate();
+      RETURN_NOT_OK(CheckRelCommon(aggregate));
+
+      if (!aggregate.has_input()) {
+        return Status::Invalid("substrait::AggregateRel with no input relation");
+      }
+
+      ARROW_ASSIGN_OR_RAISE(auto input, FromProto(aggregate.input(), ext_set));
+
+      if (aggregate.groupings_size() > 1) {
+        return Status::Invalid("Grouping sets not supported.");
+      }
+      std::vector<FieldRef> keys;
+      auto group = aggregate.groupings(0);
+      keys.reserve(group.grouping_expressions_size());
+      for (int exp_id = 0; exp_id < group.grouping_expressions_size(); exp_id++) {
+        const auto& expr = FromProto(group.grouping_expressions(exp_id), ext_set);
+        const auto& field_ref = expr->field_ref();
+        if (field_ref) {
+          keys.emplace_back(std::move(*field_ref));
+        } else {
+          return Status::Invalid(
+              "Only accept a direct reference as the grouping expression for aggregates");
+        }
+      }
+      // denotes how many unique aggregation functions are used
+      // measure_id refers to the corresponding function in the
+      // extensionsion
+      int measure_size = aggregate.measures_size();
+      std::vector<compute::Aggregate> aggregates;
+      aggregates.reserve(measure_size);
+      for (int measure_id = 0; measure_id < measure_size; measure_id++) {
+        const auto& agg_measure = aggregate.measures(measure_id);
+        if (agg_measure.has_measure()) {
+          if (agg_measure.has_filter()) {
+            return Status::Invalid("Aggregate filters are not supported.");

Review Comment:
   ```suggestion
               return Status::NotImplemented("Aggregate filters are not supported.");
   ```



##########
cpp/src/arrow/engine/substrait/relation_internal.cc:
##########
@@ -309,6 +309,75 @@ Result<compute::Declaration> FromProto(const substrait::Rel& rel,
       join_dec.inputs.emplace_back(std::move(right));
       return std::move(join_dec);
     }
+    case substrait::Rel::RelTypeCase::kAggregate: {
+      const auto& aggregate = rel.aggregate();
+      RETURN_NOT_OK(CheckRelCommon(aggregate));
+
+      if (!aggregate.has_input()) {
+        return Status::Invalid("substrait::AggregateRel with no input relation");
+      }
+
+      ARROW_ASSIGN_OR_RAISE(auto input, FromProto(aggregate.input(), ext_set));
+
+      if (aggregate.groupings_size() > 1) {
+        return Status::Invalid("Grouping sets not supported.");
+      }
+      std::vector<FieldRef> keys;
+      auto group = aggregate.groupings(0);
+      keys.reserve(group.grouping_expressions_size());
+      for (int exp_id = 0; exp_id < group.grouping_expressions_size(); exp_id++) {
+        const auto& expr = FromProto(group.grouping_expressions(exp_id), ext_set);
+        const auto& field_ref = expr->field_ref();
+        if (field_ref) {
+          keys.emplace_back(std::move(*field_ref));
+        } else {
+          return Status::Invalid(
+              "Only accept a direct reference as the grouping expression for aggregates");
+        }
+      }
+      // denotes how many unique aggregation functions are used
+      // measure_id refers to the corresponding function in the
+      // extensionsion
+      int measure_size = aggregate.measures_size();
+      std::vector<compute::Aggregate> aggregates;
+      aggregates.reserve(measure_size);
+      for (int measure_id = 0; measure_id < measure_size; measure_id++) {
+        const auto& agg_measure = aggregate.measures(measure_id);
+        if (agg_measure.has_measure()) {
+          if (agg_measure.has_filter()) {
+            return Status::Invalid("Aggregate filters are not supported.");
+          }
+          const auto& agg_func = agg_measure.measure();
+          if (agg_func.args_size() != 1) {
+            return Status::Invalid("Aggregate function must be a unary function.");
+          }
+          int func_reference = agg_func.function_reference();
+          ARROW_ASSIGN_OR_RAISE(auto func_record, ext_set.DecodeFunction(func_reference));
+          // aggreagte function name
+          auto func_name = std::string(func_record.id.name);
+          // aggregate output column name
+          std::string agg_col_name =
+              func_name + "(" + std::to_string(func_reference) + ")";

Review Comment:
   I'm not sure there is any advantage to giving these columns a name vs just leaving them as empty strings



##########
cpp/src/arrow/engine/substrait/extension_set.cc:
##########
@@ -443,8 +443,9 @@ struct DefaultExtensionIdRegistry : ExtensionIdRegistryImpl {
     // ARROW-15535.
     for (util::string_view name : {
              "add",
-             "equal",
-             "is_not_distinct_from",
+             "equal",                 // added to support join operator

Review Comment:
   The comments are probably not needed as this will change significantly in the function mapping PR but they don't hurt.



##########
cpp/src/arrow/engine/substrait/relation_internal.cc:
##########
@@ -309,6 +309,75 @@ Result<compute::Declaration> FromProto(const substrait::Rel& rel,
       join_dec.inputs.emplace_back(std::move(right));
       return std::move(join_dec);
     }
+    case substrait::Rel::RelTypeCase::kAggregate: {
+      const auto& aggregate = rel.aggregate();
+      RETURN_NOT_OK(CheckRelCommon(aggregate));
+
+      if (!aggregate.has_input()) {
+        return Status::Invalid("substrait::AggregateRel with no input relation");
+      }
+
+      ARROW_ASSIGN_OR_RAISE(auto input, FromProto(aggregate.input(), ext_set));
+
+      if (aggregate.groupings_size() > 1) {
+        return Status::Invalid("Grouping sets not supported.");
+      }
+      std::vector<FieldRef> keys;
+      auto group = aggregate.groupings(0);
+      keys.reserve(group.grouping_expressions_size());
+      for (int exp_id = 0; exp_id < group.grouping_expressions_size(); exp_id++) {
+        const auto& expr = FromProto(group.grouping_expressions(exp_id), ext_set);
+        const auto& field_ref = expr->field_ref();
+        if (field_ref) {
+          keys.emplace_back(std::move(*field_ref));
+        } else {
+          return Status::Invalid(
+              "Only accept a direct reference as the grouping expression for aggregates");
+        }
+      }
+      // denotes how many unique aggregation functions are used
+      // measure_id refers to the corresponding function in the
+      // extensionsion

Review Comment:
   ```suggestion
   ```
   I'm not sure this comment is helpful.  Maybe something simple like "A substrait measure is equivalent to an Acero aggregate" but even that seems excessive



##########
cpp/src/arrow/engine/substrait/relation_internal.cc:
##########
@@ -309,6 +309,75 @@ Result<compute::Declaration> FromProto(const substrait::Rel& rel,
       join_dec.inputs.emplace_back(std::move(right));
       return std::move(join_dec);
     }
+    case substrait::Rel::RelTypeCase::kAggregate: {
+      const auto& aggregate = rel.aggregate();
+      RETURN_NOT_OK(CheckRelCommon(aggregate));
+
+      if (!aggregate.has_input()) {
+        return Status::Invalid("substrait::AggregateRel with no input relation");
+      }
+
+      ARROW_ASSIGN_OR_RAISE(auto input, FromProto(aggregate.input(), ext_set));
+
+      if (aggregate.groupings_size() > 1) {
+        return Status::Invalid("Grouping sets not supported.");
+      }
+      std::vector<FieldRef> keys;
+      auto group = aggregate.groupings(0);
+      keys.reserve(group.grouping_expressions_size());
+      for (int exp_id = 0; exp_id < group.grouping_expressions_size(); exp_id++) {
+        const auto& expr = FromProto(group.grouping_expressions(exp_id), ext_set);
+        const auto& field_ref = expr->field_ref();
+        if (field_ref) {
+          keys.emplace_back(std::move(*field_ref));
+        } else {
+          return Status::Invalid(
+              "Only accept a direct reference as the grouping expression for aggregates");

Review Comment:
   ```suggestion
                 "The grouping expression for an aggregate must be a direct reference.");
   ```
   Minor nit: wording



##########
cpp/src/arrow/engine/substrait/relation_internal.cc:
##########
@@ -309,6 +309,75 @@ Result<compute::Declaration> FromProto(const substrait::Rel& rel,
       join_dec.inputs.emplace_back(std::move(right));
       return std::move(join_dec);
     }
+    case substrait::Rel::RelTypeCase::kAggregate: {
+      const auto& aggregate = rel.aggregate();
+      RETURN_NOT_OK(CheckRelCommon(aggregate));
+
+      if (!aggregate.has_input()) {
+        return Status::Invalid("substrait::AggregateRel with no input relation");
+      }
+
+      ARROW_ASSIGN_OR_RAISE(auto input, FromProto(aggregate.input(), ext_set));
+
+      if (aggregate.groupings_size() > 1) {
+        return Status::Invalid("Grouping sets not supported.");
+      }
+      std::vector<FieldRef> keys;
+      auto group = aggregate.groupings(0);
+      keys.reserve(group.grouping_expressions_size());
+      for (int exp_id = 0; exp_id < group.grouping_expressions_size(); exp_id++) {
+        const auto& expr = FromProto(group.grouping_expressions(exp_id), ext_set);
+        const auto& field_ref = expr->field_ref();
+        if (field_ref) {
+          keys.emplace_back(std::move(*field_ref));
+        } else {
+          return Status::Invalid(
+              "Only accept a direct reference as the grouping expression for aggregates");
+        }
+      }
+      // denotes how many unique aggregation functions are used
+      // measure_id refers to the corresponding function in the
+      // extensionsion
+      int measure_size = aggregate.measures_size();
+      std::vector<compute::Aggregate> aggregates;
+      aggregates.reserve(measure_size);
+      for (int measure_id = 0; measure_id < measure_size; measure_id++) {
+        const auto& agg_measure = aggregate.measures(measure_id);
+        if (agg_measure.has_measure()) {
+          if (agg_measure.has_filter()) {
+            return Status::Invalid("Aggregate filters are not supported.");
+          }
+          const auto& agg_func = agg_measure.measure();
+          if (agg_func.args_size() != 1) {
+            return Status::Invalid("Aggregate function must be a unary function.");

Review Comment:
   ```suggestion
               return Status::NotImplemented("Aggregate function must be a unary function.");
   ```



##########
cpp/src/arrow/engine/substrait/relation_internal.cc:
##########
@@ -309,6 +309,75 @@ Result<compute::Declaration> FromProto(const substrait::Rel& rel,
       join_dec.inputs.emplace_back(std::move(right));
       return std::move(join_dec);
     }
+    case substrait::Rel::RelTypeCase::kAggregate: {
+      const auto& aggregate = rel.aggregate();
+      RETURN_NOT_OK(CheckRelCommon(aggregate));
+
+      if (!aggregate.has_input()) {
+        return Status::Invalid("substrait::AggregateRel with no input relation");
+      }
+
+      ARROW_ASSIGN_OR_RAISE(auto input, FromProto(aggregate.input(), ext_set));
+
+      if (aggregate.groupings_size() > 1) {
+        return Status::Invalid("Grouping sets not supported.");
+      }
+      std::vector<FieldRef> keys;
+      auto group = aggregate.groupings(0);
+      keys.reserve(group.grouping_expressions_size());
+      for (int exp_id = 0; exp_id < group.grouping_expressions_size(); exp_id++) {
+        const auto& expr = FromProto(group.grouping_expressions(exp_id), ext_set);
+        const auto& field_ref = expr->field_ref();
+        if (field_ref) {
+          keys.emplace_back(std::move(*field_ref));
+        } else {
+          return Status::Invalid(
+              "Only accept a direct reference as the grouping expression for aggregates");
+        }
+      }
+      // denotes how many unique aggregation functions are used
+      // measure_id refers to the corresponding function in the
+      // extensionsion
+      int measure_size = aggregate.measures_size();
+      std::vector<compute::Aggregate> aggregates;
+      aggregates.reserve(measure_size);
+      for (int measure_id = 0; measure_id < measure_size; measure_id++) {
+        const auto& agg_measure = aggregate.measures(measure_id);
+        if (agg_measure.has_measure()) {
+          if (agg_measure.has_filter()) {
+            return Status::Invalid("Aggregate filters are not supported.");
+          }
+          const auto& agg_func = agg_measure.measure();
+          if (agg_func.args_size() != 1) {
+            return Status::Invalid("Aggregate function must be a unary function.");
+          }
+          int func_reference = agg_func.function_reference();
+          ARROW_ASSIGN_OR_RAISE(auto func_record, ext_set.DecodeFunction(func_reference));
+          // aggreagte function name
+          auto func_name = std::string(func_record.id.name);
+          // aggregate output column name
+          std::string agg_col_name =
+              func_name + "(" + std::to_string(func_reference) + ")";
+          // aggregate target
+          ARROW_ASSIGN_OR_RAISE(auto field_expr, FromProto(agg_func.args(0), ext_set));
+          auto target = field_expr.field_ref();
+          if (!target) {
+            return Status::Invalid(
+                "Only accept a direct reference as the aggregate expression.");
+          }
+          // TODO: Implement function options in Substrait

Review Comment:
   We shouldn't have a TODO without some kind of JIRA reference.  I'm not sure we need a TODO here though.  Maybe just get rid of this line and keep the next line about setting function options to nullptr.



##########
cpp/src/arrow/engine/substrait/relation_internal.cc:
##########
@@ -309,6 +309,75 @@ Result<compute::Declaration> FromProto(const substrait::Rel& rel,
       join_dec.inputs.emplace_back(std::move(right));
       return std::move(join_dec);
     }
+    case substrait::Rel::RelTypeCase::kAggregate: {
+      const auto& aggregate = rel.aggregate();
+      RETURN_NOT_OK(CheckRelCommon(aggregate));
+
+      if (!aggregate.has_input()) {
+        return Status::Invalid("substrait::AggregateRel with no input relation");
+      }
+
+      ARROW_ASSIGN_OR_RAISE(auto input, FromProto(aggregate.input(), ext_set));
+
+      if (aggregate.groupings_size() > 1) {
+        return Status::Invalid("Grouping sets not supported.");
+      }
+      std::vector<FieldRef> keys;
+      auto group = aggregate.groupings(0);
+      keys.reserve(group.grouping_expressions_size());
+      for (int exp_id = 0; exp_id < group.grouping_expressions_size(); exp_id++) {
+        const auto& expr = FromProto(group.grouping_expressions(exp_id), ext_set);
+        const auto& field_ref = expr->field_ref();
+        if (field_ref) {
+          keys.emplace_back(std::move(*field_ref));
+        } else {
+          return Status::Invalid(
+              "Only accept a direct reference as the grouping expression for aggregates");
+        }
+      }
+      // denotes how many unique aggregation functions are used
+      // measure_id refers to the corresponding function in the
+      // extensionsion
+      int measure_size = aggregate.measures_size();
+      std::vector<compute::Aggregate> aggregates;
+      aggregates.reserve(measure_size);
+      for (int measure_id = 0; measure_id < measure_size; measure_id++) {
+        const auto& agg_measure = aggregate.measures(measure_id);
+        if (agg_measure.has_measure()) {
+          if (agg_measure.has_filter()) {
+            return Status::Invalid("Aggregate filters are not supported.");
+          }
+          const auto& agg_func = agg_measure.measure();
+          if (agg_func.args_size() != 1) {
+            return Status::Invalid("Aggregate function must be a unary function.");
+          }
+          int func_reference = agg_func.function_reference();
+          ARROW_ASSIGN_OR_RAISE(auto func_record, ext_set.DecodeFunction(func_reference));
+          // aggreagte function name
+          auto func_name = std::string(func_record.id.name);
+          // aggregate output column name
+          std::string agg_col_name =
+              func_name + "(" + std::to_string(func_reference) + ")";
+          // aggregate target
+          ARROW_ASSIGN_OR_RAISE(auto field_expr, FromProto(agg_func.args(0), ext_set));
+          auto target = field_expr.field_ref();
+          if (!target) {
+            return Status::Invalid(
+                "Only accept a direct reference as the aggregate expression.");

Review Comment:
   ```suggestion
                   "The input expression to an aggregate function must be a direct reference.");
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org