You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by "rtpsw (via GitHub)" <gi...@apache.org> on 2023/04/04 13:49:33 UTC

[GitHub] [arrow] rtpsw opened a new pull request, #34885: GH-34786: [C++] Fix output schema calculated by Substrait consumer for AggregateRel

rtpsw opened a new pull request, #34885:
URL: https://github.com/apache/arrow/pull/34885

   See https://github.com/apache/arrow/issues/34786


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] rtpsw commented on pull request #34885: GH-34786: [C++] Fix output schema calculated by Substrait consumer for AggregateRel

Posted by "rtpsw (via GitHub)" <gi...@apache.org>.
rtpsw commented on PR #34885:
URL: https://github.com/apache/arrow/pull/34885#issuecomment-1501802570

   Replaced by #34904


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] icexelloss commented on a diff in pull request #34885: GH-34786: [C++] Fix output schema calculated by Substrait consumer for AggregateRel

Posted by "icexelloss (via GitHub)" <gi...@apache.org>.
icexelloss commented on code in PR #34885:
URL: https://github.com/apache/arrow/pull/34885#discussion_r1157361535


##########
cpp/src/arrow/acero/aggregate_node.cc:
##########
@@ -560,114 +566,40 @@ class GroupByNode : public ExecNode, public TracedNode {
                                 const ExecNodeOptions& options) {
     RETURN_NOT_OK(ValidateExecNodeInputs(plan, inputs, 1, "GroupByNode"));
 
-    auto input = inputs[0];
     const auto& aggregate_options = checked_cast<const AggregateNodeOptions&>(options);
+    auto aggregates = aggregate_options.aggregates;
     const auto& keys = aggregate_options.keys;
     const auto& segment_keys = aggregate_options.segment_keys;
-    // Copy (need to modify options pointer below)
-    auto aggs = aggregate_options.aggregates;
 
     if (plan->query_context()->exec_context()->executor()->GetCapacity() > 1 &&
         segment_keys.size() > 0) {
       return Status::NotImplemented("Segmented aggregation in a multi-threaded plan");
     }
 
-    // Get input schema
-    auto input_schema = input->output_schema();
-
-    // Find input field indices for key fields
-    std::vector<int> key_field_ids(keys.size());
-    for (size_t i = 0; i < keys.size(); ++i) {
-      ARROW_ASSIGN_OR_RAISE(auto match, keys[i].FindOne(*input_schema));
-      key_field_ids[i] = match[0];
-    }
-
-    // Find input field indices for segment key fields
-    std::vector<int> segment_key_field_ids(segment_keys.size());
-    for (size_t i = 0; i < segment_keys.size(); ++i) {
-      ARROW_ASSIGN_OR_RAISE(auto match, segment_keys[i].FindOne(*input_schema));
-      segment_key_field_ids[i] = match[0];
-    }
-
-    // Check key fields and segment key fields are disjoint
-    std::unordered_set<int> key_field_id_set(key_field_ids.begin(), key_field_ids.end());
-    for (const auto& segment_key_field_id : segment_key_field_ids) {
-      if (key_field_id_set.find(segment_key_field_id) != key_field_id_set.end()) {
-        return Status::Invalid("Group-by aggregation with field '",
-                               input_schema->field(segment_key_field_id)->name(),
-                               "' as both key and segment key");
-      }
-    }
-
-    // Find input field indices for aggregates
-    std::vector<std::vector<int>> agg_src_fieldsets(aggs.size());
-    for (size_t i = 0; i < aggs.size(); ++i) {
-      const auto& target_fieldset = aggs[i].target;
-      for (const auto& target : target_fieldset) {
-        ARROW_ASSIGN_OR_RAISE(auto match, target.FindOne(*input_schema));
-        agg_src_fieldsets[i].push_back(match[0]);
-      }
-    }
-
-    // Build vector of aggregate source field data types
-    std::vector<std::vector<TypeHolder>> agg_src_types(aggs.size());
-    for (size_t i = 0; i < aggs.size(); ++i) {
-      for (const auto& agg_src_field_id : agg_src_fieldsets[i]) {
-        agg_src_types[i].push_back(input_schema->field(agg_src_field_id)->type().get());
-      }
-    }
-
-    // Build vector of segment key field data types
-    std::vector<TypeHolder> segment_key_types(segment_keys.size());
-    for (size_t i = 0; i < segment_keys.size(); ++i) {
-      auto segment_key_field_id = segment_key_field_ids[i];
-      segment_key_types[i] = input_schema->field(segment_key_field_id)->type().get();
-    }
-
-    auto ctx = plan->query_context()->exec_context();
-
-    ARROW_ASSIGN_OR_RAISE(auto segmenter,
-                          RowSegmenter::Make(std::move(segment_key_types),
-                                             /*nullable_keys=*/false, ctx));
-
-    // Construct aggregates
-    ARROW_ASSIGN_OR_RAISE(auto agg_kernels, GetKernels(ctx, aggs, agg_src_types));
-
-    ARROW_ASSIGN_OR_RAISE(auto agg_states,
-                          InitKernels(agg_kernels, ctx, aggs, agg_src_types));
-
-    ARROW_ASSIGN_OR_RAISE(
-        FieldVector agg_result_fields,
-        ResolveKernels(aggs, agg_kernels, agg_states, ctx, agg_src_types));
+    const auto& input_schema = *inputs[0]->output_schema();
+    auto exec_ctx = plan->query_context()->exec_context();
 
-    // Build field vector for output schema
-    FieldVector output_fields{keys.size() + segment_keys.size() + aggs.size()};
+    ARROW_ASSIGN_OR_RAISE(auto args,
+                          aggregate::MakeAggregateNodeArgs(
+                              input_schema, keys, segment_keys, aggregates,
+                              plan->query_context()->max_concurrency(), exec_ctx));
 
-    // Aggregate fields come before key fields to match the behavior of GroupBy function
-    for (size_t i = 0; i < aggs.size(); ++i) {
-      output_fields[i] =
-          agg_result_fields[i]->WithName(aggregate_options.aggregates[i].name);
+    std::vector<const HashAggregateKernel*> kernels;
+    kernels.reserve(args.kernels.size());
+    for (auto kernel : args.kernels) {
+      kernels.push_back(static_cast<const HashAggregateKernel*>(kernel));
     }
-    size_t base = aggs.size();
-    for (size_t i = 0; i < keys.size(); ++i) {
-      int key_field_id = key_field_ids[i];
-      output_fields[base + i] = input_schema->field(key_field_id);
-    }
-    base += keys.size();
-    for (size_t i = 0; i < segment_keys.size(); ++i) {
-      int segment_key_field_id = segment_key_field_ids[i];
-      output_fields[base + i] = input_schema->field(segment_key_field_id);
-    }
-
-    return input->plan()->EmplaceNode<GroupByNode>(
-        input, schema(std::move(output_fields)), std::move(key_field_ids),
-        std::move(segment_key_field_ids), std::move(segmenter), std::move(agg_src_types),
-        std::move(agg_src_fieldsets), std::move(aggs), std::move(agg_kernels));
+    return inputs[0]->plan()->EmplaceNode<GroupByNode>(
+        inputs[0], std::move(args.output_schema), std::move(args.grouping_key_field_ids),
+        std::move(args.segment_key_field_ids), std::move(args.segmenter),
+        std::move(args.kernel_intypes), std::move(args.target_fieldsets),
+        std::move(args.aggregates), std::move(kernels));
   }
 
   Status ResetKernelStates() {
     auto ctx = plan()->query_context()->exec_context();
-    ARROW_RETURN_NOT_OK(InitKernels(agg_kernels_, ctx, aggs_, agg_src_types_));
+    ARROW_RETURN_NOT_OK(InitKernels(InitHashAggregateKernel, agg_kernels_, ctx,

Review Comment:
   Why passing do we need to pass `/*num_states_per_kernel=*/1`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] icexelloss commented on a diff in pull request #34885: GH-34786: [C++] Fix output schema calculated by Substrait consumer for AggregateRel

Posted by "icexelloss (via GitHub)" <gi...@apache.org>.
icexelloss commented on code in PR #34885:
URL: https://github.com/apache/arrow/pull/34885#discussion_r1157355901


##########
cpp/src/arrow/acero/aggregate_node.cc:
##########
@@ -85,8 +86,43 @@ std::vector<TypeHolder> ExtendWithGroupIdType(const std::vector<TypeHolder>& in_
   return aggr_in_types;
 }
 
-Result<const HashAggregateKernel*> GetKernel(ExecContext* ctx, const Aggregate& aggregate,
+void DefaultAggregateOptions(Aggregate* aggregate_ptr,

Review Comment:
   Why do we need to add this?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] github-actions[bot] commented on pull request #34885: GH-34786: [C++] Fix output schema calculated by Substrait consumer for AggregateRel

Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] commented on PR #34885:
URL: https://github.com/apache/arrow/pull/34885#issuecomment-1496010862

   * Closes: #34786


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] rtpsw closed pull request #34885: GH-34786: [C++] Fix output schema calculated by Substrait consumer for AggregateRel

Posted by "rtpsw (via GitHub)" <gi...@apache.org>.
rtpsw closed pull request #34885: GH-34786: [C++] Fix output schema calculated by Substrait consumer for AggregateRel
URL: https://github.com/apache/arrow/pull/34885


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] icexelloss commented on a diff in pull request #34885: GH-34786: [C++] Fix output schema calculated by Substrait consumer for AggregateRel

Posted by "icexelloss (via GitHub)" <gi...@apache.org>.
icexelloss commented on code in PR #34885:
URL: https://github.com/apache/arrow/pull/34885#discussion_r1157371504


##########
cpp/src/arrow/acero/aggregate_node.h:
##########
@@ -0,0 +1,76 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <memory>
+#include <vector>
+
+#include "arrow/acero/visibility.h"
+#include "arrow/compute/api_aggregate.h"
+#include "arrow/compute/row/grouper.h"
+#include "arrow/compute/type_fwd.h"
+#include "arrow/type_fwd.h"
+
+namespace arrow {
+namespace acero {
+namespace aggregate {
+
+using compute::Aggregate;
+using compute::default_exec_context;
+using compute::ExecContext;
+using compute::Kernel;
+using compute::KernelState;
+using compute::RowSegmenter;
+
+struct ARROW_ACERO_EXPORT AggregateNodeArgs {

Review Comment:
   Why do we need this?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] rtpsw commented on pull request #34885: GH-34786: [C++] Fix output schema calculated by Substrait consumer for AggregateRel

Posted by "rtpsw (via GitHub)" <gi...@apache.org>.
rtpsw commented on PR #34885:
URL: https://github.com/apache/arrow/pull/34885#issuecomment-1496464554

   > @rtpsw Not sure I follow what you are doing - seems like a lot of refactor is done. Can you explain your approach?
   
   In this PR, the main goal is a single method `MakeOutputSchema` providing the output schema for an aggregation. The problem is that the original code has two classes , `ScalarAggregateNode` and `GroupByNode`, for aggregation that do not share much code between them for the purpose of constructing the output schema. To prepare the stage, I started with refactoring the original code to make them share code for this purpose.For this, I needed to encapsulate certain differences between them:
   - Get kernel: This is [encapsulated by `GetKernel`](https://github.com/apache/arrow/pull/34885/files#diff-806b2b905d7908f2ce860f38eca2090b9504753ebd72d5fef5f97bd2611359b1R101-R102). The two implementations are `GetScalarAggregateKernel` and `GetHashAggregateKernel`. The latter has the function dispatch on the [types extended with the group-id](https://github.com/apache/arrow/pull/34885/files#diff-806b2b905d7908f2ce860f38eca2090b9504753ebd72d5fef5f97bd2611359b1R81-R87).
   - Init kernel: This is [encapsulated by `InitKernel`](https://github.com/apache/arrow/pull/34885/files#diff-806b2b905d7908f2ce860f38eca2090b9504753ebd72d5fef5f97bd2611359b1R143-R144). The two implementations are `InitScalarAggregateKernel` and `InitHashAggregateKernel`. The latter has the kernel-args configured using the types extended with the group-id.
   - Resolve kernels: This is [encapsulated by `ResolveKernels`](https://github.com/apache/arrow/pull/34885/files#diff-806b2b905d7908f2ce860f38eca2090b9504753ebd72d5fef5f97bd2611359b1R218-R221). The two implementations are `ResolveScalarAggregateKernels` and `ResolveHashAggregateKernels`. The latter resolves each kernel using the types extended with the group-id.
   
   Additional parts of the refactoring are:
   - Adding `MakeAggregateNodeArgs` as a common method for setting up the arguments needed for constructing an aggregation node, whether it is a `ScalarAggregateNode` or a `GroupByNode`.
   - Cleaning up `ScalarAggregateNode::Make` and `GroupByNode::Make` to use the above consistently.
   - Adding `MakeOutputSchema` that uses `MakeAggregateNodeArgs` to return the output schema that the aggregation node is constructed with.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] github-actions[bot] commented on pull request #34885: GH-34786: [C++] Fix output schema calculated by Substrait consumer for AggregateRel

Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] commented on PR #34885:
URL: https://github.com/apache/arrow/pull/34885#issuecomment-1496010928

   :warning: GitHub issue #34786 **has been automatically assigned in GitHub** to PR creator.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] icexelloss commented on a diff in pull request #34885: GH-34786: [C++] Fix output schema calculated by Substrait consumer for AggregateRel

Posted by "icexelloss (via GitHub)" <gi...@apache.org>.
icexelloss commented on code in PR #34885:
URL: https://github.com/apache/arrow/pull/34885#discussion_r1157358949


##########
cpp/src/arrow/acero/aggregate_node.cc:
##########
@@ -122,53 +158,91 @@ Result<std::unique_ptr<KernelState>> InitKernel(const HashAggregateKernel* kerne
   }
 
   ARROW_ASSIGN_OR_RAISE(
-      auto state,
-      kernel->init(&kernel_ctx, KernelInitArgs{kernel, aggr_in_types, options}));
+      auto state, kernel->init(&kernel_ctx, KernelInitArgs{kernel, in_types, options}));
   return std::move(state);
 }
 
-Result<std::vector<const HashAggregateKernel*>> GetKernels(
-    ExecContext* ctx, const std::vector<Aggregate>& aggregates,
+Result<std::unique_ptr<KernelState>> InitHashAggregateKernel(
+    const Kernel* kernel, ExecContext* ctx, const Aggregate& aggregate,
+    const std::vector<TypeHolder>& in_types) {
+  const auto aggr_in_types = ExtendWithGroupIdType(in_types);
+  return InitScalarAggregateKernel(kernel, ctx, aggregate, std::move(aggr_in_types));
+}
+
+Result<std::vector<const Kernel*>> GetKernels(
+    GetKernel get_kernel, ExecContext* ctx, std::vector<Aggregate>* aggregates_ptr,
     const std::vector<std::vector<TypeHolder>>& in_types) {
+  std::vector<Aggregate>& aggregates = *aggregates_ptr;
   if (aggregates.size() != in_types.size()) {
     return Status::Invalid(aggregates.size(), " aggregate functions were specified but ",
                            in_types.size(), " arguments were provided.");
   }
 
-  std::vector<const HashAggregateKernel*> kernels(in_types.size());
+  std::vector<const Kernel*> kernels(in_types.size());
   for (size_t i = 0; i < aggregates.size(); ++i) {
-    ARROW_ASSIGN_OR_RAISE(kernels[i], GetKernel(ctx, aggregates[i], in_types[i]));
+    ARROW_ASSIGN_OR_RAISE(kernels[i], get_kernel(ctx, &aggregates[i], in_types[i]));

Review Comment:
   Why this change?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] icexelloss commented on a diff in pull request #34885: GH-34786: [C++] Fix output schema calculated by Substrait consumer for AggregateRel

Posted by "icexelloss (via GitHub)" <gi...@apache.org>.
icexelloss commented on code in PR #34885:
URL: https://github.com/apache/arrow/pull/34885#discussion_r1157357557


##########
cpp/src/arrow/acero/aggregate_node.cc:
##########
@@ -85,8 +86,43 @@ std::vector<TypeHolder> ExtendWithGroupIdType(const std::vector<TypeHolder>& in_
   return aggr_in_types;
 }
 
-Result<const HashAggregateKernel*> GetKernel(ExecContext* ctx, const Aggregate& aggregate,
+void DefaultAggregateOptions(Aggregate* aggregate_ptr,
+                             const std::shared_ptr<Function> function) {
+  Aggregate& aggregate = *aggregate_ptr;
+  if (aggregate.options == nullptr) {
+    DCHECK(!function->doc().options_required);
+    const auto* default_options = function->default_options();
+    if (default_options) {
+      aggregate.options = default_options->Copy();
+    }
+  }
+}
+
+using GetKernel = std::function<Result<const Kernel*>(ExecContext*, Aggregate*,
+                                                      const std::vector<TypeHolder>&)>;
+
+Result<const Kernel*> GetScalarAggregateKernel(ExecContext* ctx, Aggregate* aggregate_ptr,

Review Comment:
   What is this used for?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] rtpsw commented on pull request #34885: GH-34786: [C++] Fix output schema calculated by Substrait consumer for AggregateRel

Posted by "rtpsw (via GitHub)" <gi...@apache.org>.
rtpsw commented on PR #34885:
URL: https://github.com/apache/arrow/pull/34885#issuecomment-1496011216

   cc @westonpace @icexelloss


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] icexelloss commented on a diff in pull request #34885: GH-34786: [C++] Fix output schema calculated by Substrait consumer for AggregateRel

Posted by "icexelloss (via GitHub)" <gi...@apache.org>.
icexelloss commented on code in PR #34885:
URL: https://github.com/apache/arrow/pull/34885#discussion_r1157360377


##########
cpp/src/arrow/acero/aggregate_node.cc:
##########
@@ -703,7 +635,7 @@ class GroupByNode : public ExecNode, public TracedNode {
                           {"function.kind", std::string(kind_name()) + "::Consume"}});
       auto ctx = plan_->query_context()->exec_context();
       KernelContext kernel_ctx{ctx};
-      kernel_ctx.SetState(state->agg_states[i].get());
+      kernel_ctx.SetState(state->agg_states[i][0].get());

Review Comment:
   Why this change?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] icexelloss commented on pull request #34885: GH-34786: [C++] Fix output schema calculated by Substrait consumer for AggregateRel

Posted by "icexelloss (via GitHub)" <gi...@apache.org>.
icexelloss commented on PR #34885:
URL: https://github.com/apache/arrow/pull/34885#issuecomment-1496097986

   @rtpsw Not sure I follow what you are doing - seems like a lot of refactor is done. Can you explain your approach?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org