You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ic...@apache.org on 2023/06/13 21:48:18 UTC

[arrow] branch main updated: GH-35979: [C++] Refactor Acero scalar and hash aggregation into separate files (#35980)

This is an automated email from the ASF dual-hosted git repository.

icexelloss pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/main by this push:
     new dadf6659b5 GH-35979: [C++] Refactor Acero scalar and hash aggregation into separate files (#35980)
dadf6659b5 is described below

commit dadf6659b5f69eb46eb569c1f04822a10178a8a6
Author: Davide Pasetto <dp...@gmail.com>
AuthorDate: Tue Jun 13 17:48:09 2023 -0400

    GH-35979: [C++] Refactor Acero scalar and hash aggregation into separate files (#35980)
    
    ### Rationale for this change
    
    Some refactoring to simplify relations development and pave the way for implementing window aggregation.
    
    ### What changes are included in this PR?
    
    Existing Acero aggregation (scalar and group-by) sources have been refactored into separate files with no changes.
    
    * Closes: #35979
    
    Authored-by: Davide Pasetto <dp...@gmail.com>
    Signed-off-by: Li Jin <ic...@gmail.com>
---
 cpp/src/arrow/acero/CMakeLists.txt            |    4 +-
 cpp/src/arrow/acero/aggregate_internal.cc     |  268 ++++++
 cpp/src/arrow/acero/aggregate_internal.h      |  358 ++++++++
 cpp/src/arrow/acero/aggregate_node.cc         | 1102 -------------------------
 cpp/src/arrow/acero/groupby_aggregate_node.cc |  447 ++++++++++
 cpp/src/arrow/acero/options.h                 |    2 +-
 cpp/src/arrow/acero/scalar_aggregate_node.cc  |  322 ++++++++
 cpp/src/arrow/acero/test_util_internal.cc     |    6 +-
 8 files changed, 1402 insertions(+), 1107 deletions(-)

diff --git a/cpp/src/arrow/acero/CMakeLists.txt b/cpp/src/arrow/acero/CMakeLists.txt
index b86b353f86..38f8d3fe20 100644
--- a/cpp/src/arrow/acero/CMakeLists.txt
+++ b/cpp/src/arrow/acero/CMakeLists.txt
@@ -29,7 +29,9 @@ endmacro()
 
 set(ARROW_ACERO_SRCS
     accumulation_queue.cc
-    aggregate_node.cc
+    scalar_aggregate_node.cc
+    groupby_aggregate_node.cc
+    aggregate_internal.cc
     asof_join_node.cc
     bloom_filter.cc
     exec_plan.cc
diff --git a/cpp/src/arrow/acero/aggregate_internal.cc b/cpp/src/arrow/acero/aggregate_internal.cc
new file mode 100644
index 0000000000..3cd5491720
--- /dev/null
+++ b/cpp/src/arrow/acero/aggregate_internal.cc
@@ -0,0 +1,268 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <mutex>
+#include <sstream>
+#include <thread>
+#include <unordered_set>
+
+#include "arrow/acero/aggregate_internal.h"
+#include "arrow/acero/aggregate_node.h"
+#include "arrow/acero/exec_plan.h"
+#include "arrow/acero/options.h"
+#include "arrow/compute/exec.h"
+#include "arrow/compute/registry.h"
+#include "arrow/compute/row/grouper.h"
+#include "arrow/datum.h"
+#include "arrow/result.h"
+#include "arrow/util/checked_cast.h"
+#include "arrow/util/logging.h"
+
+namespace arrow {
+
+using internal::checked_cast;
+
+using compute::ExecSpan;
+using compute::ExecValue;
+using compute::Function;
+using compute::FunctionOptions;
+using compute::Grouper;
+using compute::HashAggregateKernel;
+using compute::Kernel;
+using compute::KernelContext;
+using compute::KernelInitArgs;
+using compute::KernelState;
+using compute::RowSegmenter;
+using compute::ScalarAggregateKernel;
+using compute::Segment;
+
+namespace acero {
+
+namespace aggregate {
+
+std::vector<TypeHolder> ExtendWithGroupIdType(const std::vector<TypeHolder>& in_types) {
+  std::vector<TypeHolder> aggr_in_types;
+  aggr_in_types.reserve(in_types.size() + 1);
+  aggr_in_types = in_types;
+  aggr_in_types.emplace_back(uint32());
+  return aggr_in_types;
+}
+
+Result<const HashAggregateKernel*> GetKernel(ExecContext* ctx, const Aggregate& aggregate,
+                                             const std::vector<TypeHolder>& in_types) {
+  const auto aggr_in_types = ExtendWithGroupIdType(in_types);
+  ARROW_ASSIGN_OR_RAISE(auto function,
+                        ctx->func_registry()->GetFunction(aggregate.function));
+  if (function->kind() != Function::HASH_AGGREGATE) {
+    if (function->kind() == Function::SCALAR_AGGREGATE) {
+      return Status::Invalid("The provided function (", aggregate.function,
+                             ") is a scalar aggregate function.  Since there are "
+                             "keys to group by, a hash aggregate function was "
+                             "expected (normally these start with hash_)");
+    }
+    return Status::Invalid("The provided function(", aggregate.function,
+                           ") is not an aggregate function");
+  }
+  ARROW_ASSIGN_OR_RAISE(const Kernel* kernel, function->DispatchExact(aggr_in_types));
+  return static_cast<const HashAggregateKernel*>(kernel);
+}
+
+Result<std::unique_ptr<KernelState>> InitKernel(const HashAggregateKernel* kernel,
+                                                ExecContext* ctx,
+                                                const Aggregate& aggregate,
+                                                const std::vector<TypeHolder>& in_types) {
+  const auto aggr_in_types = ExtendWithGroupIdType(in_types);
+
+  KernelContext kernel_ctx{ctx};
+  const auto* options =
+      arrow::internal::checked_cast<const FunctionOptions*>(aggregate.options.get());
+  if (options == nullptr) {
+    // use known default options for the named function if possible
+    auto maybe_function = ctx->func_registry()->GetFunction(aggregate.function);
+    if (maybe_function.ok()) {
+      options = maybe_function.ValueOrDie()->default_options();
+    }
+  }
+
+  ARROW_ASSIGN_OR_RAISE(
+      auto state,
+      kernel->init(&kernel_ctx, KernelInitArgs{kernel, aggr_in_types, options}));
+  return std::move(state);
+}
+
+Result<std::vector<const HashAggregateKernel*>> GetKernels(
+    ExecContext* ctx, const std::vector<Aggregate>& aggregates,
+    const std::vector<std::vector<TypeHolder>>& in_types) {
+  if (aggregates.size() != in_types.size()) {
+    return Status::Invalid(aggregates.size(), " aggregate functions were specified but ",
+                           in_types.size(), " arguments were provided.");
+  }
+
+  std::vector<const HashAggregateKernel*> kernels(in_types.size());
+  for (size_t i = 0; i < aggregates.size(); ++i) {
+    ARROW_ASSIGN_OR_RAISE(kernels[i], GetKernel(ctx, aggregates[i], in_types[i]));
+  }
+  return kernels;
+}
+
+Result<std::vector<std::unique_ptr<KernelState>>> InitKernels(
+    const std::vector<const HashAggregateKernel*>& kernels, ExecContext* ctx,
+    const std::vector<Aggregate>& aggregates,
+    const std::vector<std::vector<TypeHolder>>& in_types) {
+  std::vector<std::unique_ptr<KernelState>> states(kernels.size());
+  for (size_t i = 0; i < aggregates.size(); ++i) {
+    ARROW_ASSIGN_OR_RAISE(states[i],
+                          InitKernel(kernels[i], ctx, aggregates[i], in_types[i]));
+  }
+  return std::move(states);
+}
+
+Result<FieldVector> ResolveKernels(
+    const std::vector<Aggregate>& aggregates,
+    const std::vector<const HashAggregateKernel*>& kernels,
+    const std::vector<std::unique_ptr<KernelState>>& states, ExecContext* ctx,
+    const std::vector<std::vector<TypeHolder>>& types) {
+  FieldVector fields(types.size());
+
+  for (size_t i = 0; i < kernels.size(); ++i) {
+    KernelContext kernel_ctx{ctx};
+    kernel_ctx.SetState(states[i].get());
+
+    const auto aggr_in_types = ExtendWithGroupIdType(types[i]);
+    ARROW_ASSIGN_OR_RAISE(
+        auto type, kernels[i]->signature->out_type().Resolve(&kernel_ctx, aggr_in_types));
+    fields[i] = field(aggregates[i].function, type.GetSharedPtr());
+  }
+  return fields;
+}
+
+void AggregatesToString(std::stringstream* ss, const Schema& input_schema,
+                        const std::vector<Aggregate>& aggs,
+                        const std::vector<std::vector<int>>& target_fieldsets,
+                        int indent) {
+  *ss << "aggregates=[" << std::endl;
+  for (size_t i = 0; i < aggs.size(); i++) {
+    for (int j = 0; j < indent; ++j) *ss << "  ";
+    *ss << '\t' << aggs[i].function << '(';
+    const auto& target = target_fieldsets[i];
+    if (target.size() == 0) {
+      *ss << "*";
+    } else {
+      *ss << input_schema.field(target[0])->name();
+      for (size_t k = 1; k < target.size(); k++) {
+        *ss << ", " << input_schema.field(target[k])->name();
+      }
+    }
+    if (aggs[i].options) {
+      *ss << ", " << aggs[i].options->ToString();
+    }
+    *ss << ")," << std::endl;
+  }
+  for (int j = 0; j < indent; ++j) *ss << "  ";
+  *ss << ']';
+}
+
+Status ExtractSegmenterValues(std::vector<Datum>* values_ptr,
+                              const ExecBatch& input_batch,
+                              const std::vector<int>& field_ids) {
+  DCHECK_GT(input_batch.length, 0);
+  std::vector<Datum>& values = *values_ptr;
+  int64_t row = input_batch.length - 1;
+  values.clear();
+  values.resize(field_ids.size());
+  for (size_t i = 0; i < field_ids.size(); i++) {
+    const Datum& value = input_batch.values[field_ids[i]];
+    if (value.is_scalar()) {
+      values[i] = value;
+    } else if (value.is_array()) {
+      ARROW_ASSIGN_OR_RAISE(auto scalar, value.make_array()->GetScalar(row));
+      values[i] = scalar;
+    } else {
+      DCHECK(false);
+    }
+  }
+  return Status::OK();
+}
+
+void PlaceFields(ExecBatch& batch, size_t base, std::vector<Datum>& values) {
+  DCHECK_LE(base + values.size(), batch.values.size());
+  for (size_t i = 0; i < values.size(); i++) {
+    batch.values[base + i] = values[i];
+  }
+}
+
+Result<std::shared_ptr<Schema>> MakeOutputSchema(
+    const std::shared_ptr<Schema>& input_schema, const std::vector<FieldRef>& keys,
+    const std::vector<FieldRef>& segment_keys, const std::vector<Aggregate>& aggregates,
+    ExecContext* exec_ctx) {
+  if (keys.empty()) {
+    ARROW_ASSIGN_OR_RAISE(auto args,
+                          ScalarAggregateNode::MakeAggregateNodeArgs(
+                              input_schema, keys, segment_keys, aggregates, exec_ctx,
+                              /*concurrency=*/1, /*is_cpu_parallel=*/false));
+    return std::move(args.output_schema);
+  } else {
+    ARROW_ASSIGN_OR_RAISE(auto args, GroupByNode::MakeAggregateNodeArgs(
+                                         input_schema, keys, segment_keys, aggregates,
+                                         exec_ctx, /*is_cpu_parallel=*/false));
+    return std::move(args.output_schema);
+  }
+}
+
+Result<std::vector<Datum>> ExtractValues(const ExecBatch& input_batch,
+                                         const std::vector<int>& field_ids) {
+  DCHECK_GT(input_batch.length, 0);
+  std::vector<Datum> values;
+  int64_t row = input_batch.length - 1;
+  values.clear();
+  values.resize(field_ids.size());
+  for (size_t i = 0; i < field_ids.size(); i++) {
+    const Datum& value = input_batch.values[field_ids[i]];
+    if (value.is_scalar()) {
+      values[i] = value;
+    } else if (value.is_array()) {
+      ARROW_ASSIGN_OR_RAISE(auto scalar, value.make_array()->GetScalar(row));
+      values[i] = scalar;
+    } else {
+      DCHECK(false);
+    }
+  }
+  return std::move(values);
+}
+
+}  // namespace aggregate
+
+namespace internal {
+
+void RegisterAggregateNode(ExecFactoryRegistry* registry) {
+  DCHECK_OK(registry->AddFactory(
+      "aggregate",
+      [](ExecPlan* plan, std::vector<ExecNode*> inputs,
+         const ExecNodeOptions& options) -> Result<ExecNode*> {
+        const auto& aggregate_options =
+            checked_cast<const AggregateNodeOptions&>(options);
+
+        if (aggregate_options.keys.empty()) {
+          return aggregate::ScalarAggregateNode::Make(plan, std::move(inputs), options);
+        }
+        return aggregate::GroupByNode::Make(plan, std::move(inputs), options);
+      }));
+}
+
+}  // namespace internal
+}  // namespace acero
+}  // namespace arrow
diff --git a/cpp/src/arrow/acero/aggregate_internal.h b/cpp/src/arrow/acero/aggregate_internal.h
new file mode 100644
index 0000000000..01861f0242
--- /dev/null
+++ b/cpp/src/arrow/acero/aggregate_internal.h
@@ -0,0 +1,358 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// This API is EXPERIMENTAL.
+
+#pragma once
+
+#include <forward_list>
+#include <mutex>
+#include <sstream>
+#include <thread>
+#include <unordered_map>
+#include <unordered_set>
+
+#include "arrow/acero/aggregate_node.h"
+#include "arrow/acero/exec_plan.h"
+#include "arrow/acero/options.h"
+#include "arrow/acero/query_context.h"
+#include "arrow/acero/util.h"
+#include "arrow/compute/exec.h"
+#include "arrow/compute/exec_internal.h"
+#include "arrow/compute/registry.h"
+#include "arrow/compute/row/grouper.h"
+#include "arrow/datum.h"
+#include "arrow/result.h"
+#include "arrow/util/checked_cast.h"
+#include "arrow/util/logging.h"
+#include "arrow/util/thread_pool.h"
+#include "arrow/util/tracing_internal.h"
+
+// This file implements both regular and segmented group-by aggregation, which is a
+// generalization of ordered aggregation in which the key columns are not required to be
+// ordered.
+//
+// In (regular) group-by aggregation, the input rows are partitioned into groups using a
+// set of columns called keys, where in a given group each row has the same values for
+// these columns. In segmented group-by aggregation, a second set of columns called
+// segment-keys is used to refine the partitioning. However, segment-keys are different in
+// that they partition only consecutive rows into a single group. Such a partition of
+// consecutive rows is called a segment group. For example, consider a column X with
+// values [A, A, B, A] at row-indices [0, 1, 2]. A regular group-by aggregation with keys
+// [X] yields a row-index partitioning [[0, 1, 3], [2]] whereas a segmented-group-by
+// aggregation with segment-keys [X] yields [[0, 1], [2], [3]].
+//
+// The implementation first segments the input using the segment-keys, then groups by the
+// keys. When a segment group end is reached while scanning the input, output is pushed
+// and the accumulating state is cleared. If no segment-keys are given, then the entire
+// input is taken as one segment group. One batch per segment group is sent to output.
+
+namespace arrow {
+
+using internal::checked_cast;
+
+using compute::ExecSpan;
+using compute::ExecValue;
+using compute::Function;
+using compute::FunctionOptions;
+using compute::Grouper;
+using compute::HashAggregateKernel;
+using compute::Kernel;
+using compute::KernelContext;
+using compute::KernelInitArgs;
+using compute::KernelState;
+using compute::RowSegmenter;
+using compute::ScalarAggregateKernel;
+using compute::Segment;
+
+namespace acero {
+namespace aggregate {
+
+template <typename KernelType>
+struct AggregateNodeArgs {
+  std::shared_ptr<Schema> output_schema;
+  std::vector<int> grouping_key_field_ids;
+  std::vector<int> segment_key_field_ids;
+  std::unique_ptr<RowSegmenter> segmenter;
+  std::vector<std::vector<int>> target_fieldsets;
+  std::vector<Aggregate> aggregates;
+  std::vector<const KernelType*> kernels;
+  std::vector<std::vector<TypeHolder>> kernel_intypes;
+  std::vector<std::vector<std::unique_ptr<KernelState>>> states;
+};
+
+std::vector<TypeHolder> ExtendWithGroupIdType(const std::vector<TypeHolder>& in_types);
+
+Result<const HashAggregateKernel*> GetKernel(ExecContext* ctx, const Aggregate& aggregate,
+                                             const std::vector<TypeHolder>& in_types);
+
+Result<std::unique_ptr<KernelState>> InitKernel(const HashAggregateKernel* kernel,
+                                                ExecContext* ctx,
+                                                const Aggregate& aggregate,
+                                                const std::vector<TypeHolder>& in_types);
+
+Result<std::vector<const HashAggregateKernel*>> GetKernels(
+    ExecContext* ctx, const std::vector<Aggregate>& aggregates,
+    const std::vector<std::vector<TypeHolder>>& in_types);
+
+Result<std::vector<std::unique_ptr<KernelState>>> InitKernels(
+    const std::vector<const HashAggregateKernel*>& kernels, ExecContext* ctx,
+    const std::vector<Aggregate>& aggregates,
+    const std::vector<std::vector<TypeHolder>>& in_types);
+
+Result<FieldVector> ResolveKernels(
+    const std::vector<Aggregate>& aggregates,
+    const std::vector<const HashAggregateKernel*>& kernels,
+    const std::vector<std::unique_ptr<KernelState>>& states, ExecContext* ctx,
+    const std::vector<std::vector<TypeHolder>>& types);
+
+void AggregatesToString(std::stringstream* ss, const Schema& input_schema,
+                        const std::vector<Aggregate>& aggs,
+                        const std::vector<std::vector<int>>& target_fieldsets,
+                        int indent = 0);
+
+// Extract segments from a batch and run the given handler on them.  Note that the
+// handle may be called on open segments which are not yet finished.  Typically a
+// handler should accumulate those open segments until a closed segment is reached.
+template <typename BatchHandler>
+Status HandleSegments(RowSegmenter* segmenter, const ExecBatch& batch,
+                      const std::vector<int>& ids, const BatchHandler& handle_batch) {
+  int64_t offset = 0;
+  ARROW_ASSIGN_OR_RAISE(auto segment_exec_batch, batch.SelectValues(ids));
+  ExecSpan segment_batch(segment_exec_batch);
+
+  while (true) {
+    ARROW_ASSIGN_OR_RAISE(compute::Segment segment,
+                          segmenter->GetNextSegment(segment_batch, offset));
+    if (segment.offset >= segment_batch.length) break;  // condition of no-next-segment
+    ARROW_RETURN_NOT_OK(handle_batch(batch, segment));
+    offset = segment.offset + segment.length;
+  }
+  return Status::OK();
+}
+
+/// @brief Extract values of segment keys from a segment batch
+/// @param[out] values_ptr Vector to store the extracted segment key values
+/// @param[in] input_batch Segment batch. Must have the a constant value for segment key
+/// @param[in] field_ids Segment key field ids
+Status ExtractSegmenterValues(std::vector<Datum>* values_ptr,
+                              const ExecBatch& input_batch,
+                              const std::vector<int>& field_ids);
+
+Result<std::vector<Datum>> ExtractValues(const ExecBatch& input_batch,
+                                         const std::vector<int>& field_ids);
+
+void PlaceFields(ExecBatch& batch, size_t base, std::vector<Datum>& values);
+
+class ScalarAggregateNode : public ExecNode, public TracedNode {
+ public:
+  ScalarAggregateNode(ExecPlan* plan, std::vector<ExecNode*> inputs,
+                      std::shared_ptr<Schema> output_schema,
+                      std::unique_ptr<RowSegmenter> segmenter,
+                      std::vector<int> segment_field_ids,
+                      std::vector<std::vector<int>> target_fieldsets,
+                      std::vector<Aggregate> aggs,
+                      std::vector<const ScalarAggregateKernel*> kernels,
+                      std::vector<std::vector<TypeHolder>> kernel_intypes,
+                      std::vector<std::vector<std::unique_ptr<KernelState>>> states)
+      : ExecNode(plan, std::move(inputs), {"target"},
+                 /*output_schema=*/std::move(output_schema)),
+        TracedNode(this),
+        segmenter_(std::move(segmenter)),
+        segment_field_ids_(std::move(segment_field_ids)),
+        target_fieldsets_(std::move(target_fieldsets)),
+        aggs_(std::move(aggs)),
+        kernels_(std::move(kernels)),
+        kernel_intypes_(std::move(kernel_intypes)),
+        states_(std::move(states)) {}
+
+  static Result<AggregateNodeArgs<ScalarAggregateKernel>> MakeAggregateNodeArgs(
+      const std::shared_ptr<Schema>& input_schema, const std::vector<FieldRef>& keys,
+      const std::vector<FieldRef>& segment_keys, const std::vector<Aggregate>& aggs,
+      ExecContext* exec_ctx, size_t concurrency, bool is_cpu_parallel);
+
+  static Result<ExecNode*> Make(ExecPlan* plan, std::vector<ExecNode*> inputs,
+                                const ExecNodeOptions& options);
+
+  const char* kind_name() const override { return "ScalarAggregateNode"; }
+
+  Status DoConsume(const ExecSpan& batch, size_t thread_index);
+
+  Status InputReceived(ExecNode* input, ExecBatch batch) override;
+
+  Status InputFinished(ExecNode* input, int total_batches) override;
+
+  Status StartProducing() override {
+    NoteStartProducing(ToStringExtra(0));
+    return Status::OK();
+  }
+
+  void PauseProducing(ExecNode* output, int32_t counter) override {
+    inputs_[0]->PauseProducing(this, counter);
+  }
+
+  void ResumeProducing(ExecNode* output, int32_t counter) override {
+    inputs_[0]->ResumeProducing(this, counter);
+  }
+
+  Status StopProducingImpl() override { return Status::OK(); }
+
+ protected:
+  std::string ToStringExtra(int indent) const override;
+
+ private:
+  Status ResetKernelStates();
+
+  Status OutputResult(bool is_last);
+
+  // A segmenter for the segment-keys
+  std::unique_ptr<RowSegmenter> segmenter_;
+  // Field indices corresponding to the segment-keys
+  const std::vector<int> segment_field_ids_;
+  // Holds the value of segment keys of the most recent input batch
+  // The values are updated everytime an input batch is processed
+  std::vector<Datum> segmenter_values_;
+
+  const std::vector<std::vector<int>> target_fieldsets_;
+  const std::vector<Aggregate> aggs_;
+  const std::vector<const ScalarAggregateKernel*> kernels_;
+
+  // Input type holders for each kernel, used for state initialization
+  std::vector<std::vector<TypeHolder>> kernel_intypes_;
+  std::vector<std::vector<std::unique_ptr<KernelState>>> states_;
+
+  AtomicCounter input_counter_;
+  /// \brief Total number of output batches produced
+  int total_output_batches_ = 0;
+};
+
+class GroupByNode : public ExecNode, public TracedNode {
+ public:
+  GroupByNode(ExecNode* input, std::shared_ptr<Schema> output_schema,
+              std::vector<int> key_field_ids, std::vector<int> segment_key_field_ids,
+              std::unique_ptr<RowSegmenter> segmenter,
+              std::vector<std::vector<TypeHolder>> agg_src_types,
+              std::vector<std::vector<int>> agg_src_fieldsets,
+              std::vector<Aggregate> aggs,
+              std::vector<const HashAggregateKernel*> agg_kernels)
+      : ExecNode(input->plan(), {input}, {"groupby"}, std::move(output_schema)),
+        TracedNode(this),
+        segmenter_(std::move(segmenter)),
+        key_field_ids_(std::move(key_field_ids)),
+        segment_key_field_ids_(std::move(segment_key_field_ids)),
+        agg_src_types_(std::move(agg_src_types)),
+        agg_src_fieldsets_(std::move(agg_src_fieldsets)),
+        aggs_(std::move(aggs)),
+        agg_kernels_(std::move(agg_kernels)) {}
+
+  Status Init() override;
+
+  static Result<AggregateNodeArgs<HashAggregateKernel>> MakeAggregateNodeArgs(
+      const std::shared_ptr<Schema>& input_schema, const std::vector<FieldRef>& keys,
+      const std::vector<FieldRef>& segment_keys, const std::vector<Aggregate>& aggs,
+      ExecContext* ctx, const bool is_cpu_parallel);
+
+  static Result<ExecNode*> Make(ExecPlan* plan, std::vector<ExecNode*> inputs,
+                                const ExecNodeOptions& options);
+
+  Status ResetKernelStates();
+
+  const char* kind_name() const override { return "GroupByNode"; }
+
+  Status Consume(ExecSpan batch);
+
+  Status Merge();
+
+  Result<ExecBatch> Finalize();
+
+  Status OutputNthBatch(int64_t n);
+
+  Status OutputResult(bool is_last);
+
+  Status InputReceived(ExecNode* input, ExecBatch batch) override;
+
+  Status InputFinished(ExecNode* input, int total_batches) override;
+
+  Status StartProducing() override {
+    NoteStartProducing(ToStringExtra(0));
+    local_states_.resize(plan_->query_context()->max_concurrency());
+    return Status::OK();
+  }
+
+  void PauseProducing(ExecNode* output, int32_t counter) override {
+    // TODO(ARROW-16260)
+    // Without spillover there is no way to handle backpressure in this node
+  }
+
+  void ResumeProducing(ExecNode* output, int32_t counter) override {
+    // TODO(ARROW-16260)
+    // Without spillover there is no way to handle backpressure in this node
+  }
+
+  Status StopProducingImpl() override { return Status::OK(); }
+
+ protected:
+  std::string ToStringExtra(int indent) const override;
+
+ private:
+  struct ThreadLocalState {
+    std::unique_ptr<Grouper> grouper;
+    std::vector<std::unique_ptr<KernelState>> agg_states;
+  };
+
+  ThreadLocalState* GetLocalState() {
+    size_t thread_index = plan_->query_context()->GetThreadIndex();
+    return &local_states_[thread_index];
+  }
+
+  Status InitLocalStateIfNeeded(ThreadLocalState* state);
+
+  int output_batch_size() const {
+    int result =
+        static_cast<int>(plan_->query_context()->exec_context()->exec_chunksize());
+    if (result < 0) {
+      result = 32 * 1024;
+    }
+    return result;
+  }
+
+  int output_task_group_id_;
+  /// \brief A segmenter for the segment-keys
+  std::unique_ptr<RowSegmenter> segmenter_;
+  /// \brief Holds values of the current batch that were selected for the segment-keys
+  std::vector<Datum> segmenter_values_;
+
+  const std::vector<int> key_field_ids_;
+  /// \brief Field indices corresponding to the segment-keys
+  const std::vector<int> segment_key_field_ids_;
+  /// \brief Types of input fields per aggregate
+  const std::vector<std::vector<TypeHolder>> agg_src_types_;
+  const std::vector<std::vector<int>> agg_src_fieldsets_;
+  const std::vector<Aggregate> aggs_;
+  const std::vector<const HashAggregateKernel*> agg_kernels_;
+
+  AtomicCounter input_counter_;
+  /// \brief Total number of output batches produced
+  int total_output_batches_ = 0;
+
+  std::vector<ThreadLocalState> local_states_;
+  ExecBatch out_data_;
+};
+
+}  // namespace aggregate
+}  // namespace acero
+}  // namespace arrow
diff --git a/cpp/src/arrow/acero/aggregate_node.cc b/cpp/src/arrow/acero/aggregate_node.cc
deleted file mode 100644
index d9120ffa0d..0000000000
--- a/cpp/src/arrow/acero/aggregate_node.cc
+++ /dev/null
@@ -1,1102 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include <mutex>
-#include <sstream>
-#include <thread>
-#include <unordered_map>
-#include <unordered_set>
-
-#include "arrow/acero/aggregate_node.h"
-#include "arrow/acero/exec_plan.h"
-#include "arrow/acero/options.h"
-#include "arrow/acero/query_context.h"
-#include "arrow/acero/util.h"
-#include "arrow/compute/exec.h"
-#include "arrow/compute/exec_internal.h"
-#include "arrow/compute/registry.h"
-#include "arrow/compute/row/grouper.h"
-#include "arrow/datum.h"
-#include "arrow/result.h"
-#include "arrow/util/checked_cast.h"
-#include "arrow/util/logging.h"
-#include "arrow/util/thread_pool.h"
-#include "arrow/util/tracing_internal.h"
-
-// This file implements both regular and segmented group-by aggregation, which is a
-// generalization of ordered aggregation in which the key columns are not required to be
-// ordered.
-//
-// In (regular) group-by aggregation, the input rows are partitioned into groups using a
-// set of columns called keys, where in a given group each row has the same values for
-// these columns. In segmented group-by aggregation, a second set of columns called
-// segment-keys is used to refine the partitioning. However, segment-keys are different in
-// that they partition only consecutive rows into a single group. Such a partition of
-// consecutive rows is called a segment group. For example, consider a column X with
-// values [A, A, B, A] at row-indices [0, 1, 2]. A regular group-by aggregation with keys
-// [X] yields a row-index partitioning [[0, 1, 3], [2]] whereas a segmented-group-by
-// aggregation with segment-keys [X] yields [[0, 1], [1], [3]].
-//
-// The implementation first segments the input using the segment-keys, then groups by the
-// keys. When a segment group end is reached while scanning the input, output is pushed
-// and the accumulating state is cleared. If no segment-keys are given, then the entire
-// input is taken as one segment group. One batch per segment group is sent to output.
-
-namespace arrow {
-
-using internal::checked_cast;
-
-using compute::ExecSpan;
-using compute::ExecValue;
-using compute::Function;
-using compute::FunctionOptions;
-using compute::Grouper;
-using compute::HashAggregateKernel;
-using compute::Kernel;
-using compute::KernelContext;
-using compute::KernelInitArgs;
-using compute::KernelState;
-using compute::RowSegmenter;
-using compute::ScalarAggregateKernel;
-using compute::Segment;
-
-namespace acero {
-
-namespace {
-
-template <typename KernelType>
-struct AggregateNodeArgs {
-  std::shared_ptr<Schema> output_schema;
-  std::vector<int> grouping_key_field_ids;
-  std::vector<int> segment_key_field_ids;
-  std::unique_ptr<RowSegmenter> segmenter;
-  std::vector<std::vector<int>> target_fieldsets;
-  std::vector<Aggregate> aggregates;
-  std::vector<const KernelType*> kernels;
-  std::vector<std::vector<TypeHolder>> kernel_intypes;
-  std::vector<std::vector<std::unique_ptr<KernelState>>> states;
-};
-
-std::vector<TypeHolder> ExtendWithGroupIdType(const std::vector<TypeHolder>& in_types) {
-  std::vector<TypeHolder> aggr_in_types;
-  aggr_in_types.reserve(in_types.size() + 1);
-  aggr_in_types = in_types;
-  aggr_in_types.emplace_back(uint32());
-  return aggr_in_types;
-}
-
-Result<const HashAggregateKernel*> GetKernel(ExecContext* ctx, const Aggregate& aggregate,
-                                             const std::vector<TypeHolder>& in_types) {
-  const auto aggr_in_types = ExtendWithGroupIdType(in_types);
-  ARROW_ASSIGN_OR_RAISE(auto function,
-                        ctx->func_registry()->GetFunction(aggregate.function));
-  if (function->kind() != Function::HASH_AGGREGATE) {
-    if (function->kind() == Function::SCALAR_AGGREGATE) {
-      return Status::Invalid("The provided function (", aggregate.function,
-                             ") is a scalar aggregate function.  Since there are "
-                             "keys to group by, a hash aggregate function was "
-                             "expected (normally these start with hash_)");
-    }
-    return Status::Invalid("The provided function(", aggregate.function,
-                           ") is not an aggregate function");
-  }
-  ARROW_ASSIGN_OR_RAISE(const Kernel* kernel, function->DispatchExact(aggr_in_types));
-  return static_cast<const HashAggregateKernel*>(kernel);
-}
-
-Result<std::unique_ptr<KernelState>> InitKernel(const HashAggregateKernel* kernel,
-                                                ExecContext* ctx,
-                                                const Aggregate& aggregate,
-                                                const std::vector<TypeHolder>& in_types) {
-  const auto aggr_in_types = ExtendWithGroupIdType(in_types);
-
-  KernelContext kernel_ctx{ctx};
-  const auto* options =
-      arrow::internal::checked_cast<const FunctionOptions*>(aggregate.options.get());
-  if (options == nullptr) {
-    // use known default options for the named function if possible
-    auto maybe_function = ctx->func_registry()->GetFunction(aggregate.function);
-    if (maybe_function.ok()) {
-      options = maybe_function.ValueOrDie()->default_options();
-    }
-  }
-
-  ARROW_ASSIGN_OR_RAISE(
-      auto state,
-      kernel->init(&kernel_ctx, KernelInitArgs{kernel, aggr_in_types, options}));
-  return std::move(state);
-}
-
-Result<std::vector<const HashAggregateKernel*>> GetKernels(
-    ExecContext* ctx, const std::vector<Aggregate>& aggregates,
-    const std::vector<std::vector<TypeHolder>>& in_types) {
-  if (aggregates.size() != in_types.size()) {
-    return Status::Invalid(aggregates.size(), " aggregate functions were specified but ",
-                           in_types.size(), " arguments were provided.");
-  }
-
-  std::vector<const HashAggregateKernel*> kernels(in_types.size());
-  for (size_t i = 0; i < aggregates.size(); ++i) {
-    ARROW_ASSIGN_OR_RAISE(kernels[i], GetKernel(ctx, aggregates[i], in_types[i]));
-  }
-  return kernels;
-}
-
-Result<std::vector<std::unique_ptr<KernelState>>> InitKernels(
-    const std::vector<const HashAggregateKernel*>& kernels, ExecContext* ctx,
-    const std::vector<Aggregate>& aggregates,
-    const std::vector<std::vector<TypeHolder>>& in_types) {
-  std::vector<std::unique_ptr<KernelState>> states(kernels.size());
-  for (size_t i = 0; i < aggregates.size(); ++i) {
-    ARROW_ASSIGN_OR_RAISE(states[i],
-                          InitKernel(kernels[i], ctx, aggregates[i], in_types[i]));
-  }
-  return std::move(states);
-}
-
-Result<FieldVector> ResolveKernels(
-    const std::vector<Aggregate>& aggregates,
-    const std::vector<const HashAggregateKernel*>& kernels,
-    const std::vector<std::unique_ptr<KernelState>>& states, ExecContext* ctx,
-    const std::vector<std::vector<TypeHolder>>& types) {
-  FieldVector fields(types.size());
-
-  for (size_t i = 0; i < kernels.size(); ++i) {
-    KernelContext kernel_ctx{ctx};
-    kernel_ctx.SetState(states[i].get());
-
-    const auto aggr_in_types = ExtendWithGroupIdType(types[i]);
-    ARROW_ASSIGN_OR_RAISE(
-        auto type, kernels[i]->signature->out_type().Resolve(&kernel_ctx, aggr_in_types));
-    fields[i] = field(aggregates[i].function, type.GetSharedPtr());
-  }
-  return fields;
-}
-
-void AggregatesToString(std::stringstream* ss, const Schema& input_schema,
-                        const std::vector<Aggregate>& aggs,
-                        const std::vector<std::vector<int>>& target_fieldsets,
-                        int indent = 0) {
-  *ss << "aggregates=[" << std::endl;
-  for (size_t i = 0; i < aggs.size(); i++) {
-    for (int j = 0; j < indent; ++j) *ss << "  ";
-    *ss << '\t' << aggs[i].function << '(';
-    const auto& target = target_fieldsets[i];
-    if (target.size() == 0) {
-      *ss << "*";
-    } else {
-      *ss << input_schema.field(target[0])->name();
-      for (size_t k = 1; k < target.size(); k++) {
-        *ss << ", " << input_schema.field(target[k])->name();
-      }
-    }
-    if (aggs[i].options) {
-      *ss << ", " << aggs[i].options->ToString();
-    }
-    *ss << ")," << std::endl;
-  }
-  for (int j = 0; j < indent; ++j) *ss << "  ";
-  *ss << ']';
-}
-
-// Extract segments from a batch and run the given handler on them.  Note that the
-// handle may be called on open segments which are not yet finished.  Typically a
-// handler should accumulate those open segments until a closed segment is reached.
-template <typename BatchHandler>
-Status HandleSegments(RowSegmenter* segmenter, const ExecBatch& batch,
-                      const std::vector<int>& ids, const BatchHandler& handle_batch) {
-  int64_t offset = 0;
-  ARROW_ASSIGN_OR_RAISE(auto segment_exec_batch, batch.SelectValues(ids));
-  ExecSpan segment_batch(segment_exec_batch);
-
-  while (true) {
-    ARROW_ASSIGN_OR_RAISE(compute::Segment segment,
-                          segmenter->GetNextSegment(segment_batch, offset));
-    if (segment.offset >= segment_batch.length) break;  // condition of no-next-segment
-    ARROW_RETURN_NOT_OK(handle_batch(batch, segment));
-    offset = segment.offset + segment.length;
-  }
-  return Status::OK();
-}
-
-/// @brief Extract values of segment keys from a segment batch
-/// @param[out] values_ptr Vector to store the extracted segment key values
-/// @param[in] input_batch Segment batch. Must have the a constant value for segment key
-/// @param[in] field_ids Segment key field ids
-Status ExtractSegmenterValues(std::vector<Datum>* values_ptr,
-                              const ExecBatch& input_batch,
-                              const std::vector<int>& field_ids) {
-  DCHECK_GT(input_batch.length, 0);
-  std::vector<Datum>& values = *values_ptr;
-  int64_t row = input_batch.length - 1;
-  values.clear();
-  values.resize(field_ids.size());
-  for (size_t i = 0; i < field_ids.size(); i++) {
-    const Datum& value = input_batch.values[field_ids[i]];
-    if (value.is_scalar()) {
-      values[i] = value;
-    } else if (value.is_array()) {
-      ARROW_ASSIGN_OR_RAISE(auto scalar, value.make_array()->GetScalar(row));
-      values[i] = scalar;
-    } else {
-      DCHECK(false);
-    }
-  }
-  return Status::OK();
-}
-
-void PlaceFields(ExecBatch& batch, size_t base, std::vector<Datum>& values) {
-  DCHECK_LE(base + values.size(), batch.values.size());
-  for (size_t i = 0; i < values.size(); i++) {
-    batch.values[base + i] = values[i];
-  }
-}
-
-class ScalarAggregateNode : public ExecNode, public TracedNode {
- public:
-  ScalarAggregateNode(ExecPlan* plan, std::vector<ExecNode*> inputs,
-                      std::shared_ptr<Schema> output_schema,
-                      std::unique_ptr<RowSegmenter> segmenter,
-                      std::vector<int> segment_field_ids,
-                      std::vector<std::vector<int>> target_fieldsets,
-                      std::vector<Aggregate> aggs,
-                      std::vector<const ScalarAggregateKernel*> kernels,
-                      std::vector<std::vector<TypeHolder>> kernel_intypes,
-                      std::vector<std::vector<std::unique_ptr<KernelState>>> states)
-      : ExecNode(plan, std::move(inputs), {"target"},
-                 /*output_schema=*/std::move(output_schema)),
-        TracedNode(this),
-        segmenter_(std::move(segmenter)),
-        segment_field_ids_(std::move(segment_field_ids)),
-        target_fieldsets_(std::move(target_fieldsets)),
-        aggs_(std::move(aggs)),
-        kernels_(std::move(kernels)),
-        kernel_intypes_(std::move(kernel_intypes)),
-        states_(std::move(states)) {}
-
-  static Result<AggregateNodeArgs<ScalarAggregateKernel>> MakeAggregateNodeArgs(
-      const std::shared_ptr<Schema>& input_schema, const std::vector<FieldRef>& keys,
-      const std::vector<FieldRef>& segment_keys, const std::vector<Aggregate>& aggs,
-      ExecContext* exec_ctx, size_t concurrency, bool is_cpu_parallel) {
-    // Copy (need to modify options pointer below)
-    std::vector<Aggregate> aggregates(aggs);
-    std::vector<int> segment_field_ids(segment_keys.size());
-    std::vector<TypeHolder> segment_key_types(segment_keys.size());
-    for (size_t i = 0; i < segment_keys.size(); i++) {
-      ARROW_ASSIGN_OR_RAISE(FieldPath match, segment_keys[i].FindOne(*input_schema));
-      if (match.indices().size() > 1) {
-        // ARROW-18369: Support nested references as segment ids
-        return Status::Invalid("Nested references cannot be used as segment ids");
-      }
-      segment_field_ids[i] = match[0];
-      segment_key_types[i] = input_schema->field(match[0])->type().get();
-    }
-
-    ARROW_ASSIGN_OR_RAISE(auto segmenter,
-                          RowSegmenter::Make(std::move(segment_key_types),
-                                             /*nullable_keys=*/false, exec_ctx));
-
-    std::vector<std::vector<TypeHolder>> kernel_intypes(aggregates.size());
-    std::vector<const ScalarAggregateKernel*> kernels(aggregates.size());
-    std::vector<std::vector<std::unique_ptr<KernelState>>> states(kernels.size());
-    FieldVector fields(kernels.size() + segment_keys.size());
-
-    // Output the segment keys first, followed by the aggregates
-    for (size_t i = 0; i < segment_keys.size(); ++i) {
-      ARROW_ASSIGN_OR_RAISE(fields[i], segment_keys[i].GetOne(*input_schema));
-    }
-
-    std::vector<std::vector<int>> target_fieldsets(kernels.size());
-    std::size_t base = segment_keys.size();
-    for (size_t i = 0; i < kernels.size(); ++i) {
-      const auto& target_fieldset = aggregates[i].target;
-      for (const auto& target : target_fieldset) {
-        ARROW_ASSIGN_OR_RAISE(auto match, FieldRef(target).FindOne(*input_schema));
-        target_fieldsets[i].push_back(match[0]);
-      }
-
-      ARROW_ASSIGN_OR_RAISE(
-          auto function, exec_ctx->func_registry()->GetFunction(aggregates[i].function));
-
-      if (function->kind() != Function::SCALAR_AGGREGATE) {
-        if (function->kind() == Function::HASH_AGGREGATE) {
-          return Status::Invalid("The provided function (", aggregates[i].function,
-                                 ") is a hash aggregate function.  Since there are no "
-                                 "keys to group by, a scalar aggregate function was "
-                                 "expected (normally these do not start with hash_)");
-        }
-        return Status::Invalid("The provided function(", aggregates[i].function,
-                               ") is not an aggregate function");
-      }
-
-      std::vector<TypeHolder> in_types;
-      for (const auto& target : target_fieldsets[i]) {
-        in_types.emplace_back(input_schema->field(target)->type().get());
-      }
-      kernel_intypes[i] = in_types;
-      ARROW_ASSIGN_OR_RAISE(const Kernel* kernel,
-                            function->DispatchExact(kernel_intypes[i]));
-      const ScalarAggregateKernel* agg_kernel =
-          static_cast<const ScalarAggregateKernel*>(kernel);
-      if (is_cpu_parallel && agg_kernel->ordered) {
-        return Status::NotImplemented(
-            "Using ordered aggregator in multiple threaded execution is not supported");
-      }
-
-      kernels[i] = agg_kernel;
-
-      if (aggregates[i].options == nullptr) {
-        DCHECK(!function->doc().options_required);
-        const auto* default_options = function->default_options();
-        if (default_options) {
-          aggregates[i].options = default_options->Copy();
-        }
-      }
-
-      KernelContext kernel_ctx{exec_ctx};
-      states[i].resize(concurrency);
-      RETURN_NOT_OK(Kernel::InitAll(
-          &kernel_ctx,
-          KernelInitArgs{kernels[i], kernel_intypes[i], aggregates[i].options.get()},
-          &states[i]));
-
-      // pick one to resolve the kernel signature
-      kernel_ctx.SetState(states[i][0].get());
-      ARROW_ASSIGN_OR_RAISE(auto out_type, kernels[i]->signature->out_type().Resolve(
-                                               &kernel_ctx, kernel_intypes[i]));
-
-      fields[base + i] = field(aggregates[i].name, out_type.GetSharedPtr());
-    }
-
-    return AggregateNodeArgs<ScalarAggregateKernel>{
-        schema(std::move(fields)),
-        /*grouping_key_field_ids=*/{}, std::move(segment_field_ids),
-        std::move(segmenter),          std::move(target_fieldsets),
-        std::move(aggregates),         std::move(kernels),
-        std::move(kernel_intypes),     std::move(states)};
-  }
-
-  static Result<ExecNode*> Make(ExecPlan* plan, std::vector<ExecNode*> inputs,
-                                const ExecNodeOptions& options) {
-    RETURN_NOT_OK(ValidateExecNodeInputs(plan, inputs, 1, "ScalarAggregateNode"));
-
-    const auto& aggregate_options = checked_cast<const AggregateNodeOptions&>(options);
-    auto aggregates = aggregate_options.aggregates;
-    const auto& keys = aggregate_options.keys;
-    const auto& segment_keys = aggregate_options.segment_keys;
-    const auto concurrency = plan->query_context()->max_concurrency();
-    // We can't use concurrency == 1 because that include I/O concurrency
-    const bool is_cpu_parallel = plan->query_context()->executor()->GetCapacity() > 1;
-
-    if (keys.size() > 0) {
-      return Status::Invalid("Scalar aggregation with some key");
-    }
-    if (is_cpu_parallel && segment_keys.size() > 0) {
-      return Status::NotImplemented("Segmented aggregation in a multi-threaded plan");
-    }
-
-    const auto& input_schema = inputs[0]->output_schema();
-    auto exec_ctx = plan->query_context()->exec_context();
-
-    ARROW_ASSIGN_OR_RAISE(
-        auto args, MakeAggregateNodeArgs(input_schema, keys, segment_keys, aggregates,
-                                         exec_ctx, concurrency, is_cpu_parallel));
-
-    if (is_cpu_parallel) {
-      for (auto& kernel : args.kernels) {
-        if (kernel->ordered) {
-          return Status::NotImplemented(
-              "Using ordered aggregator in multiple threaded execution is not supported");
-        }
-      }
-    }
-
-    return plan->EmplaceNode<ScalarAggregateNode>(
-        plan, std::move(inputs), std::move(args.output_schema), std::move(args.segmenter),
-        std::move(args.segment_key_field_ids), std::move(args.target_fieldsets),
-        std::move(args.aggregates), std::move(args.kernels),
-        std::move(args.kernel_intypes), std::move(args.states));
-  }
-
-  const char* kind_name() const override { return "ScalarAggregateNode"; }
-
-  Status DoConsume(const ExecSpan& batch, size_t thread_index) {
-    for (size_t i = 0; i < kernels_.size(); ++i) {
-      arrow::util::tracing::Span span;
-      START_COMPUTE_SPAN(span, aggs_[i].function,
-                         {{"function.name", aggs_[i].function},
-                          {"function.options",
-                           aggs_[i].options ? aggs_[i].options->ToString() : "<NULLPTR>"},
-                          {"function.kind", std::string(kind_name()) + "::Consume"}});
-      KernelContext batch_ctx{plan()->query_context()->exec_context()};
-      DCHECK_LT(thread_index, states_[i].size());
-      batch_ctx.SetState(states_[i][thread_index].get());
-
-      std::vector<ExecValue> column_values;
-      for (const int field : target_fieldsets_[i]) {
-        column_values.push_back(batch.values[field]);
-      }
-      ExecSpan column_batch{std::move(column_values), batch.length};
-      RETURN_NOT_OK(kernels_[i]->consume(&batch_ctx, column_batch));
-    }
-    return Status::OK();
-  }
-
-  Status InputReceived(ExecNode* input, ExecBatch batch) override {
-    auto scope = TraceInputReceived(batch);
-    DCHECK_EQ(input, inputs_[0]);
-
-    auto thread_index = plan_->query_context()->GetThreadIndex();
-    auto handler = [this, thread_index](const ExecBatch& full_batch,
-                                        const Segment& segment) {
-      // (1) The segment is starting of a new segment group and points to
-      // the beginning of the batch, then it means no data in the batch belongs
-      // to the current segment group. We can output and reset kernel states.
-      if (!segment.extends && segment.offset == 0) RETURN_NOT_OK(OutputResult(false));
-
-      // We add segment to the current segment group aggregation
-      auto exec_batch = full_batch.Slice(segment.offset, segment.length);
-      RETURN_NOT_OK(DoConsume(ExecSpan(exec_batch), thread_index));
-      RETURN_NOT_OK(
-          ExtractSegmenterValues(&segmenter_values_, exec_batch, segment_field_ids_));
-
-      // If the segment closes the current segment group, we can output segment group
-      // aggregation.
-      if (!segment.is_open) RETURN_NOT_OK(OutputResult(false));
-
-      return Status::OK();
-    };
-    RETURN_NOT_OK(HandleSegments(segmenter_.get(), batch, segment_field_ids_, handler));
-
-    if (input_counter_.Increment()) {
-      RETURN_NOT_OK(OutputResult(/*is_last=*/true));
-    }
-    return Status::OK();
-  }
-
-  Status InputFinished(ExecNode* input, int total_batches) override {
-    auto scope = TraceFinish();
-    EVENT_ON_CURRENT_SPAN("InputFinished", {{"batches.length", total_batches}});
-    DCHECK_EQ(input, inputs_[0]);
-    if (input_counter_.SetTotal(total_batches)) {
-      RETURN_NOT_OK(OutputResult(/*is_last=*/true));
-    }
-    return Status::OK();
-  }
-
-  Status StartProducing() override {
-    NoteStartProducing(ToStringExtra());
-    return Status::OK();
-  }
-
-  void PauseProducing(ExecNode* output, int32_t counter) override {
-    inputs_[0]->PauseProducing(this, counter);
-  }
-
-  void ResumeProducing(ExecNode* output, int32_t counter) override {
-    inputs_[0]->ResumeProducing(this, counter);
-  }
-
-  Status StopProducingImpl() override { return Status::OK(); }
-
- protected:
-  std::string ToStringExtra(int indent = 0) const override {
-    std::stringstream ss;
-    const auto input_schema = inputs_[0]->output_schema();
-    AggregatesToString(&ss, *input_schema, aggs_, target_fieldsets_);
-    return ss.str();
-  }
-
- private:
-  Status ResetKernelStates() {
-    auto exec_ctx = plan()->query_context()->exec_context();
-    for (size_t i = 0; i < kernels_.size(); ++i) {
-      states_[i].resize(plan()->query_context()->max_concurrency());
-      KernelContext kernel_ctx{exec_ctx};
-      RETURN_NOT_OK(Kernel::InitAll(
-          &kernel_ctx,
-          KernelInitArgs{kernels_[i], kernel_intypes_[i], aggs_[i].options.get()},
-          &states_[i]));
-    }
-    return Status::OK();
-  }
-
-  Status OutputResult(bool is_last) {
-    ExecBatch batch{{}, 1};
-    batch.values.resize(kernels_.size() + segment_field_ids_.size());
-
-    // First, insert segment keys
-    PlaceFields(batch, /*base=*/0, segmenter_values_);
-
-    // Followed by aggregate values
-    std::size_t base = segment_field_ids_.size();
-    for (size_t i = 0; i < kernels_.size(); ++i) {
-      arrow::util::tracing::Span span;
-      START_COMPUTE_SPAN(span, aggs_[i].function,
-                         {{"function.name", aggs_[i].function},
-                          {"function.options",
-                           aggs_[i].options ? aggs_[i].options->ToString() : "<NULLPTR>"},
-                          {"function.kind", std::string(kind_name()) + "::Finalize"}});
-      KernelContext ctx{plan()->query_context()->exec_context()};
-      ARROW_ASSIGN_OR_RAISE(auto merged, ScalarAggregateKernel::MergeAll(
-                                             kernels_[i], &ctx, std::move(states_[i])));
-      RETURN_NOT_OK(kernels_[i]->finalize(&ctx, &batch.values[base + i]));
-    }
-
-    ARROW_RETURN_NOT_OK(output_->InputReceived(this, std::move(batch)));
-    total_output_batches_++;
-    if (is_last) {
-      ARROW_RETURN_NOT_OK(output_->InputFinished(this, total_output_batches_));
-    } else {
-      ARROW_RETURN_NOT_OK(ResetKernelStates());
-    }
-    return Status::OK();
-  }
-
-  // A segmenter for the segment-keys
-  std::unique_ptr<RowSegmenter> segmenter_;
-  // Field indices corresponding to the segment-keys
-  const std::vector<int> segment_field_ids_;
-  // Holds the value of segment keys of the most recent input batch
-  // The values are updated everytime an input batch is processed
-  std::vector<Datum> segmenter_values_;
-
-  const std::vector<std::vector<int>> target_fieldsets_;
-  const std::vector<Aggregate> aggs_;
-  const std::vector<const ScalarAggregateKernel*> kernels_;
-
-  // Input type holders for each kernel, used for state initialization
-  std::vector<std::vector<TypeHolder>> kernel_intypes_;
-  std::vector<std::vector<std::unique_ptr<KernelState>>> states_;
-
-  AtomicCounter input_counter_;
-  /// \brief Total number of output batches produced
-  int total_output_batches_ = 0;
-};
-
-class GroupByNode : public ExecNode, public TracedNode {
- public:
-  GroupByNode(ExecNode* input, std::shared_ptr<Schema> output_schema,
-              std::vector<int> key_field_ids, std::vector<int> segment_key_field_ids,
-              std::unique_ptr<RowSegmenter> segmenter,
-              std::vector<std::vector<TypeHolder>> agg_src_types,
-              std::vector<std::vector<int>> agg_src_fieldsets,
-              std::vector<Aggregate> aggs,
-              std::vector<const HashAggregateKernel*> agg_kernels)
-      : ExecNode(input->plan(), {input}, {"groupby"}, std::move(output_schema)),
-        TracedNode(this),
-        segmenter_(std::move(segmenter)),
-        key_field_ids_(std::move(key_field_ids)),
-        segment_key_field_ids_(std::move(segment_key_field_ids)),
-        agg_src_types_(std::move(agg_src_types)),
-        agg_src_fieldsets_(std::move(agg_src_fieldsets)),
-        aggs_(std::move(aggs)),
-        agg_kernels_(std::move(agg_kernels)) {}
-
-  Status Init() override {
-    output_task_group_id_ = plan_->query_context()->RegisterTaskGroup(
-        [this](size_t, int64_t task_id) { return OutputNthBatch(task_id); },
-        [](size_t) { return Status::OK(); });
-    return Status::OK();
-  }
-
-  static Result<AggregateNodeArgs<HashAggregateKernel>> MakeAggregateNodeArgs(
-      const std::shared_ptr<Schema>& input_schema, const std::vector<FieldRef>& keys,
-      const std::vector<FieldRef>& segment_keys, const std::vector<Aggregate>& aggs,
-      ExecContext* ctx, const bool is_cpu_parallel) {
-    // Find input field indices for key fields
-    std::vector<int> key_field_ids(keys.size());
-    for (size_t i = 0; i < keys.size(); ++i) {
-      ARROW_ASSIGN_OR_RAISE(auto match, keys[i].FindOne(*input_schema));
-      key_field_ids[i] = match[0];
-    }
-
-    // Find input field indices for segment key fields
-    std::vector<int> segment_key_field_ids(segment_keys.size());
-    for (size_t i = 0; i < segment_keys.size(); ++i) {
-      ARROW_ASSIGN_OR_RAISE(auto match, segment_keys[i].FindOne(*input_schema));
-      segment_key_field_ids[i] = match[0];
-    }
-
-    // Check key fields and segment key fields are disjoint
-    std::unordered_set<int> key_field_id_set(key_field_ids.begin(), key_field_ids.end());
-    for (const auto& segment_key_field_id : segment_key_field_ids) {
-      if (key_field_id_set.find(segment_key_field_id) != key_field_id_set.end()) {
-        return Status::Invalid("Group-by aggregation with field '",
-                               input_schema->field(segment_key_field_id)->name(),
-                               "' as both key and segment key");
-      }
-    }
-
-    // Find input field indices for aggregates
-    std::vector<std::vector<int>> agg_src_fieldsets(aggs.size());
-    for (size_t i = 0; i < aggs.size(); ++i) {
-      const auto& target_fieldset = aggs[i].target;
-      for (const auto& target : target_fieldset) {
-        ARROW_ASSIGN_OR_RAISE(auto match, target.FindOne(*input_schema));
-        agg_src_fieldsets[i].push_back(match[0]);
-      }
-    }
-
-    // Build vector of aggregate source field data types
-    std::vector<std::vector<TypeHolder>> agg_src_types(aggs.size());
-    for (size_t i = 0; i < aggs.size(); ++i) {
-      for (const auto& agg_src_field_id : agg_src_fieldsets[i]) {
-        agg_src_types[i].push_back(input_schema->field(agg_src_field_id)->type().get());
-      }
-    }
-
-    // Build vector of segment key field data types
-    std::vector<TypeHolder> segment_key_types(segment_keys.size());
-    for (size_t i = 0; i < segment_keys.size(); ++i) {
-      auto segment_key_field_id = segment_key_field_ids[i];
-      segment_key_types[i] = input_schema->field(segment_key_field_id)->type().get();
-    }
-
-    ARROW_ASSIGN_OR_RAISE(auto segmenter,
-                          RowSegmenter::Make(std::move(segment_key_types),
-                                             /*nullable_keys=*/false, ctx));
-
-    // Construct aggregates
-    ARROW_ASSIGN_OR_RAISE(auto agg_kernels, GetKernels(ctx, aggs, agg_src_types));
-
-    if (is_cpu_parallel) {
-      if (segment_keys.size() > 0) {
-        return Status::NotImplemented(
-            "Segmented aggregation in a multi-threaded execution context");
-      }
-
-      for (auto kernel : agg_kernels) {
-        if (kernel->ordered) {
-          return Status::NotImplemented(
-              "Using ordered aggregator in multiple threaded execution is not supported");
-        }
-      }
-    }
-
-    ARROW_ASSIGN_OR_RAISE(auto agg_states,
-                          InitKernels(agg_kernels, ctx, aggs, agg_src_types));
-
-    ARROW_ASSIGN_OR_RAISE(
-        FieldVector agg_result_fields,
-        ResolveKernels(aggs, agg_kernels, agg_states, ctx, agg_src_types));
-
-    // Build field vector for output schema
-    FieldVector output_fields{keys.size() + segment_keys.size() + aggs.size()};
-
-    // First output is segment keys, followed by keys, followed by aggregates themselves
-    // This matches the behavior described by Substrait and also tends to be the behavior
-    // in SQL engines
-    for (size_t i = 0; i < segment_keys.size(); ++i) {
-      int segment_key_field_id = segment_key_field_ids[i];
-      output_fields[i] = input_schema->field(segment_key_field_id);
-    }
-    size_t base = segment_keys.size();
-    for (size_t i = 0; i < keys.size(); ++i) {
-      int key_field_id = key_field_ids[i];
-      output_fields[base + i] = input_schema->field(key_field_id);
-    }
-    base += keys.size();
-    for (size_t i = 0; i < aggs.size(); ++i) {
-      output_fields[base + i] = agg_result_fields[i]->WithName(aggs[i].name);
-    }
-
-    return AggregateNodeArgs<HashAggregateKernel>{schema(std::move(output_fields)),
-                                                  std::move(key_field_ids),
-                                                  std::move(segment_key_field_ids),
-                                                  std::move(segmenter),
-                                                  std::move(agg_src_fieldsets),
-                                                  std::move(aggs),
-                                                  std::move(agg_kernels),
-                                                  std::move(agg_src_types),
-                                                  /*states=*/{}};
-  }
-
-  static Result<ExecNode*> Make(ExecPlan* plan, std::vector<ExecNode*> inputs,
-                                const ExecNodeOptions& options) {
-    RETURN_NOT_OK(ValidateExecNodeInputs(plan, inputs, 1, "GroupByNode"));
-
-    auto input = inputs[0];
-    const auto& aggregate_options = checked_cast<const AggregateNodeOptions&>(options);
-    const auto& keys = aggregate_options.keys;
-    const auto& segment_keys = aggregate_options.segment_keys;
-    auto aggs = aggregate_options.aggregates;
-    bool is_cpu_parallel = plan->query_context()->executor()->GetCapacity() > 1;
-
-    const auto& input_schema = input->output_schema();
-    auto exec_ctx = plan->query_context()->exec_context();
-    ARROW_ASSIGN_OR_RAISE(
-        auto args, MakeAggregateNodeArgs(input_schema, keys, segment_keys, aggs, exec_ctx,
-                                         is_cpu_parallel));
-
-    return input->plan()->EmplaceNode<GroupByNode>(
-        input, std::move(args.output_schema), std::move(args.grouping_key_field_ids),
-        std::move(args.segment_key_field_ids), std::move(args.segmenter),
-        std::move(args.kernel_intypes), std::move(args.target_fieldsets),
-        std::move(args.aggregates), std::move(args.kernels));
-  }
-
-  Status ResetKernelStates() {
-    auto ctx = plan()->query_context()->exec_context();
-    ARROW_RETURN_NOT_OK(InitKernels(agg_kernels_, ctx, aggs_, agg_src_types_));
-    return Status::OK();
-  }
-
-  const char* kind_name() const override { return "GroupByNode"; }
-
-  Status Consume(ExecSpan batch) {
-    size_t thread_index = plan_->query_context()->GetThreadIndex();
-    if (thread_index >= local_states_.size()) {
-      return Status::IndexError("thread index ", thread_index, " is out of range [0, ",
-                                local_states_.size(), ")");
-    }
-
-    auto state = &local_states_[thread_index];
-    RETURN_NOT_OK(InitLocalStateIfNeeded(state));
-
-    // Create a batch with key columns
-    std::vector<ExecValue> keys(key_field_ids_.size());
-    for (size_t i = 0; i < key_field_ids_.size(); ++i) {
-      keys[i] = batch[key_field_ids_[i]];
-    }
-    ExecSpan key_batch(std::move(keys), batch.length);
-
-    // Create a batch with group ids
-    ARROW_ASSIGN_OR_RAISE(Datum id_batch, state->grouper->Consume(key_batch));
-
-    // Execute aggregate kernels
-    for (size_t i = 0; i < agg_kernels_.size(); ++i) {
-      arrow::util::tracing::Span span;
-      START_COMPUTE_SPAN(span, aggs_[i].function,
-                         {{"function.name", aggs_[i].function},
-                          {"function.options",
-                           aggs_[i].options ? aggs_[i].options->ToString() : "<NULLPTR>"},
-                          {"function.kind", std::string(kind_name()) + "::Consume"}});
-      auto ctx = plan_->query_context()->exec_context();
-      KernelContext kernel_ctx{ctx};
-      kernel_ctx.SetState(state->agg_states[i].get());
-
-      std::vector<ExecValue> column_values;
-      for (const int field : agg_src_fieldsets_[i]) {
-        column_values.push_back(batch[field]);
-      }
-      column_values.emplace_back(*id_batch.array());
-      ExecSpan agg_batch(std::move(column_values), batch.length);
-      RETURN_NOT_OK(agg_kernels_[i]->resize(&kernel_ctx, state->grouper->num_groups()));
-      RETURN_NOT_OK(agg_kernels_[i]->consume(&kernel_ctx, agg_batch));
-    }
-
-    return Status::OK();
-  }
-
-  Status Merge() {
-    arrow::util::tracing::Span span;
-    START_COMPUTE_SPAN(span, "Merge",
-                       {{"group_by", ToStringExtra()}, {"node.label", label()}});
-    ThreadLocalState* state0 = &local_states_[0];
-    for (size_t i = 1; i < local_states_.size(); ++i) {
-      ThreadLocalState* state = &local_states_[i];
-      if (!state->grouper) {
-        continue;
-      }
-
-      ARROW_ASSIGN_OR_RAISE(ExecBatch other_keys, state->grouper->GetUniques());
-      ARROW_ASSIGN_OR_RAISE(Datum transposition,
-                            state0->grouper->Consume(ExecSpan(other_keys)));
-      state->grouper.reset();
-
-      for (size_t i = 0; i < agg_kernels_.size(); ++i) {
-        arrow::util::tracing::Span span;
-        START_COMPUTE_SPAN(
-            span, aggs_[i].function,
-            {{"function.name", aggs_[i].function},
-             {"function.options",
-              aggs_[i].options ? aggs_[i].options->ToString() : "<NULLPTR>"},
-             {"function.kind", std::string(kind_name()) + "::Merge"}});
-
-        auto ctx = plan_->query_context()->exec_context();
-        KernelContext batch_ctx{ctx};
-        DCHECK(state0->agg_states[i]);
-        batch_ctx.SetState(state0->agg_states[i].get());
-
-        RETURN_NOT_OK(agg_kernels_[i]->resize(&batch_ctx, state0->grouper->num_groups()));
-        RETURN_NOT_OK(agg_kernels_[i]->merge(&batch_ctx, std::move(*state->agg_states[i]),
-                                             *transposition.array()));
-        state->agg_states[i].reset();
-      }
-    }
-    return Status::OK();
-  }
-
-  Result<ExecBatch> Finalize() {
-    arrow::util::tracing::Span span;
-    START_COMPUTE_SPAN(span, "Finalize",
-                       {{"group_by", ToStringExtra()}, {"node.label", label()}});
-
-    ThreadLocalState* state = &local_states_[0];
-    // If we never got any batches, then state won't have been initialized
-    RETURN_NOT_OK(InitLocalStateIfNeeded(state));
-
-    // Allocate a batch for output
-    ExecBatch out_data{{}, state->grouper->num_groups()};
-    out_data.values.resize(agg_kernels_.size() + key_field_ids_.size() +
-                           segment_key_field_ids_.size());
-
-    // Segment keys come first
-    PlaceFields(out_data, 0, segmenter_values_);
-    // Followed by keys
-    ARROW_ASSIGN_OR_RAISE(ExecBatch out_keys, state->grouper->GetUniques());
-    std::move(out_keys.values.begin(), out_keys.values.end(),
-              out_data.values.begin() + segment_key_field_ids_.size());
-    // And finally, the aggregates themselves
-    std::size_t base = segment_key_field_ids_.size() + key_field_ids_.size();
-    for (size_t i = 0; i < agg_kernels_.size(); ++i) {
-      arrow::util::tracing::Span span;
-      START_COMPUTE_SPAN(span, aggs_[i].function,
-                         {{"function.name", aggs_[i].function},
-                          {"function.options",
-                           aggs_[i].options ? aggs_[i].options->ToString() : "<NULLPTR>"},
-                          {"function.kind", std::string(kind_name()) + "::Finalize"}});
-      KernelContext batch_ctx{plan_->query_context()->exec_context()};
-      batch_ctx.SetState(state->agg_states[i].get());
-      RETURN_NOT_OK(agg_kernels_[i]->finalize(&batch_ctx, &out_data.values[i + base]));
-      state->agg_states[i].reset();
-    }
-    state->grouper.reset();
-
-    return out_data;
-  }
-
-  Status OutputNthBatch(int64_t n) {
-    int64_t batch_size = output_batch_size();
-    return output_->InputReceived(this, out_data_.Slice(batch_size * n, batch_size));
-  }
-
-  Status OutputResult(bool is_last) {
-    // To simplify merging, ensure that the first grouper is nonempty
-    for (size_t i = 0; i < local_states_.size(); i++) {
-      if (local_states_[i].grouper) {
-        std::swap(local_states_[i], local_states_[0]);
-        break;
-      }
-    }
-
-    RETURN_NOT_OK(Merge());
-    ARROW_ASSIGN_OR_RAISE(out_data_, Finalize());
-
-    int64_t num_output_batches = bit_util::CeilDiv(out_data_.length, output_batch_size());
-    total_output_batches_ += static_cast<int>(num_output_batches);
-    if (is_last) {
-      ARROW_RETURN_NOT_OK(output_->InputFinished(this, total_output_batches_));
-      RETURN_NOT_OK(plan_->query_context()->StartTaskGroup(output_task_group_id_,
-                                                           num_output_batches));
-    } else {
-      for (int64_t i = 0; i < num_output_batches; i++) {
-        ARROW_RETURN_NOT_OK(OutputNthBatch(i));
-      }
-      ARROW_RETURN_NOT_OK(ResetKernelStates());
-    }
-    return Status::OK();
-  }
-
-  Status InputReceived(ExecNode* input, ExecBatch batch) override {
-    auto scope = TraceInputReceived(batch);
-
-    DCHECK_EQ(input, inputs_[0]);
-
-    auto handler = [this](const ExecBatch& full_batch, const Segment& segment) {
-      if (!segment.extends && segment.offset == 0) RETURN_NOT_OK(OutputResult(false));
-      auto exec_batch = full_batch.Slice(segment.offset, segment.length);
-      auto batch = ExecSpan(exec_batch);
-      RETURN_NOT_OK(Consume(batch));
-      RETURN_NOT_OK(
-          ExtractSegmenterValues(&segmenter_values_, exec_batch, segment_key_field_ids_));
-      if (!segment.is_open) RETURN_NOT_OK(OutputResult(false));
-      return Status::OK();
-    };
-    ARROW_RETURN_NOT_OK(
-        HandleSegments(segmenter_.get(), batch, segment_key_field_ids_, handler));
-
-    if (input_counter_.Increment()) {
-      ARROW_RETURN_NOT_OK(OutputResult(/*is_last=*/true));
-    }
-    return Status::OK();
-  }
-
-  Status InputFinished(ExecNode* input, int total_batches) override {
-    auto scope = TraceFinish();
-    DCHECK_EQ(input, inputs_[0]);
-
-    if (input_counter_.SetTotal(total_batches)) {
-      RETURN_NOT_OK(OutputResult(/*is_last=*/true));
-    }
-    return Status::OK();
-  }
-
-  Status StartProducing() override {
-    NoteStartProducing(ToStringExtra());
-    local_states_.resize(plan_->query_context()->max_concurrency());
-    return Status::OK();
-  }
-
-  void PauseProducing(ExecNode* output, int32_t counter) override {
-    // TODO(ARROW-16260)
-    // Without spillover there is no way to handle backpressure in this node
-  }
-
-  void ResumeProducing(ExecNode* output, int32_t counter) override {
-    // TODO(ARROW-16260)
-    // Without spillover there is no way to handle backpressure in this node
-  }
-
-  Status StopProducingImpl() override { return Status::OK(); }
-
- protected:
-  std::string ToStringExtra(int indent = 0) const override {
-    std::stringstream ss;
-    const auto input_schema = inputs_[0]->output_schema();
-    ss << "keys=[";
-    for (size_t i = 0; i < key_field_ids_.size(); i++) {
-      if (i > 0) ss << ", ";
-      ss << '"' << input_schema->field(key_field_ids_[i])->name() << '"';
-    }
-    ss << "], ";
-    AggregatesToString(&ss, *input_schema, aggs_, agg_src_fieldsets_, indent);
-    return ss.str();
-  }
-
- private:
-  struct ThreadLocalState {
-    std::unique_ptr<Grouper> grouper;
-    std::vector<std::unique_ptr<KernelState>> agg_states;
-  };
-
-  ThreadLocalState* GetLocalState() {
-    size_t thread_index = plan_->query_context()->GetThreadIndex();
-    return &local_states_[thread_index];
-  }
-
-  Status InitLocalStateIfNeeded(ThreadLocalState* state) {
-    // Get input schema
-    auto input_schema = inputs_[0]->output_schema();
-
-    if (state->grouper != nullptr) return Status::OK();
-
-    // Build vector of key field data types
-    std::vector<TypeHolder> key_types(key_field_ids_.size());
-    for (size_t i = 0; i < key_field_ids_.size(); ++i) {
-      auto key_field_id = key_field_ids_[i];
-      key_types[i] = input_schema->field(key_field_id)->type().get();
-    }
-
-    // Construct grouper
-    ARROW_ASSIGN_OR_RAISE(
-        state->grouper, Grouper::Make(key_types, plan_->query_context()->exec_context()));
-
-    // Build vector of aggregate source field data types
-    std::vector<std::vector<TypeHolder>> agg_src_types(agg_kernels_.size());
-    for (size_t i = 0; i < agg_kernels_.size(); ++i) {
-      for (const auto& field_id : agg_src_fieldsets_[i]) {
-        agg_src_types[i].emplace_back(input_schema->field(field_id)->type().get());
-      }
-    }
-
-    ARROW_ASSIGN_OR_RAISE(
-        state->agg_states,
-        InitKernels(agg_kernels_, plan_->query_context()->exec_context(), aggs_,
-                    agg_src_types));
-
-    return Status::OK();
-  }
-
-  int output_batch_size() const {
-    int result =
-        static_cast<int>(plan_->query_context()->exec_context()->exec_chunksize());
-    if (result < 0) {
-      result = 32 * 1024;
-    }
-    return result;
-  }
-
-  int output_task_group_id_;
-  /// \brief A segmenter for the segment-keys
-  std::unique_ptr<RowSegmenter> segmenter_;
-  /// \brief Holds values of the current batch that were selected for the segment-keys
-  std::vector<Datum> segmenter_values_;
-
-  const std::vector<int> key_field_ids_;
-  /// \brief Field indices corresponding to the segment-keys
-  const std::vector<int> segment_key_field_ids_;
-  /// \brief Types of input fields per aggregate
-  const std::vector<std::vector<TypeHolder>> agg_src_types_;
-  const std::vector<std::vector<int>> agg_src_fieldsets_;
-  const std::vector<Aggregate> aggs_;
-  const std::vector<const HashAggregateKernel*> agg_kernels_;
-
-  AtomicCounter input_counter_;
-  /// \brief Total number of output batches produced
-  int total_output_batches_ = 0;
-
-  std::vector<ThreadLocalState> local_states_;
-  ExecBatch out_data_;
-};
-
-}  // namespace
-
-namespace aggregate {
-
-Result<std::shared_ptr<Schema>> MakeOutputSchema(
-    const std::shared_ptr<Schema>& input_schema, const std::vector<FieldRef>& keys,
-    const std::vector<FieldRef>& segment_keys, const std::vector<Aggregate>& aggregates,
-    ExecContext* exec_ctx) {
-  if (keys.empty()) {
-    ARROW_ASSIGN_OR_RAISE(auto args,
-                          ScalarAggregateNode::MakeAggregateNodeArgs(
-                              input_schema, keys, segment_keys, aggregates, exec_ctx,
-                              /*concurrency=*/1, /*is_cpu_parallel=*/false));
-    return std::move(args.output_schema);
-  } else {
-    ARROW_ASSIGN_OR_RAISE(auto args, GroupByNode::MakeAggregateNodeArgs(
-                                         input_schema, keys, segment_keys, aggregates,
-                                         exec_ctx, /*is_cpu_parallel=*/false));
-    return std::move(args.output_schema);
-  }
-}
-
-}  // namespace aggregate
-
-namespace internal {
-
-void RegisterAggregateNode(ExecFactoryRegistry* registry) {
-  DCHECK_OK(registry->AddFactory(
-      "aggregate",
-      [](ExecPlan* plan, std::vector<ExecNode*> inputs,
-         const ExecNodeOptions& options) -> Result<ExecNode*> {
-        const auto& aggregate_options =
-            checked_cast<const AggregateNodeOptions&>(options);
-
-        if (aggregate_options.keys.empty()) {
-          // construct scalar agg node
-          return ScalarAggregateNode::Make(plan, std::move(inputs), options);
-        }
-        return GroupByNode::Make(plan, std::move(inputs), options);
-      }));
-}
-
-}  // namespace internal
-}  // namespace acero
-}  // namespace arrow
diff --git a/cpp/src/arrow/acero/groupby_aggregate_node.cc b/cpp/src/arrow/acero/groupby_aggregate_node.cc
new file mode 100644
index 0000000000..723c8b7377
--- /dev/null
+++ b/cpp/src/arrow/acero/groupby_aggregate_node.cc
@@ -0,0 +1,447 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <mutex>
+#include <sstream>
+#include <thread>
+#include <unordered_map>
+#include <unordered_set>
+
+#include "arrow/acero/aggregate_internal.h"
+#include "arrow/acero/aggregate_node.h"
+#include "arrow/acero/exec_plan.h"
+#include "arrow/acero/options.h"
+#include "arrow/acero/query_context.h"
+#include "arrow/acero/util.h"
+#include "arrow/compute/exec.h"
+#include "arrow/compute/exec_internal.h"
+#include "arrow/compute/registry.h"
+#include "arrow/compute/row/grouper.h"
+#include "arrow/datum.h"
+#include "arrow/result.h"
+#include "arrow/util/checked_cast.h"
+#include "arrow/util/logging.h"
+#include "arrow/util/thread_pool.h"
+#include "arrow/util/tracing_internal.h"
+
+namespace arrow {
+
+using internal::checked_cast;
+
+using compute::ExecSpan;
+using compute::ExecValue;
+using compute::Function;
+using compute::FunctionOptions;
+using compute::Grouper;
+using compute::HashAggregateKernel;
+using compute::Kernel;
+using compute::KernelContext;
+using compute::KernelInitArgs;
+using compute::KernelState;
+using compute::RowSegmenter;
+using compute::ScalarAggregateKernel;
+using compute::Segment;
+
+namespace acero {
+namespace aggregate {
+
+Status GroupByNode::Init() {
+  output_task_group_id_ = plan_->query_context()->RegisterTaskGroup(
+      [this](size_t, int64_t task_id) { return OutputNthBatch(task_id); },
+      [](size_t) { return Status::OK(); });
+  return Status::OK();
+}
+
+Result<AggregateNodeArgs<HashAggregateKernel>> GroupByNode::MakeAggregateNodeArgs(
+    const std::shared_ptr<Schema>& input_schema, const std::vector<FieldRef>& keys,
+    const std::vector<FieldRef>& segment_keys, const std::vector<Aggregate>& aggs,
+    ExecContext* ctx, const bool is_cpu_parallel) {
+  // Find input field indices for key fields
+  std::vector<int> key_field_ids(keys.size());
+  for (size_t i = 0; i < keys.size(); ++i) {
+    ARROW_ASSIGN_OR_RAISE(auto match, keys[i].FindOne(*input_schema));
+    key_field_ids[i] = match[0];
+  }
+
+  // Find input field indices for segment key fields
+  std::vector<int> segment_key_field_ids(segment_keys.size());
+  for (size_t i = 0; i < segment_keys.size(); ++i) {
+    ARROW_ASSIGN_OR_RAISE(auto match, segment_keys[i].FindOne(*input_schema));
+    segment_key_field_ids[i] = match[0];
+  }
+
+  // Check key fields and segment key fields are disjoint
+  std::unordered_set<int> key_field_id_set(key_field_ids.begin(), key_field_ids.end());
+  for (const auto& segment_key_field_id : segment_key_field_ids) {
+    if (key_field_id_set.find(segment_key_field_id) != key_field_id_set.end()) {
+      return Status::Invalid("Group-by aggregation with field '",
+                             input_schema->field(segment_key_field_id)->name(),
+                             "' as both key and segment key");
+    }
+  }
+
+  // Find input field indices for aggregates
+  std::vector<std::vector<int>> agg_src_fieldsets(aggs.size());
+  for (size_t i = 0; i < aggs.size(); ++i) {
+    const auto& target_fieldset = aggs[i].target;
+    for (const auto& target : target_fieldset) {
+      ARROW_ASSIGN_OR_RAISE(auto match, target.FindOne(*input_schema));
+      agg_src_fieldsets[i].push_back(match[0]);
+    }
+  }
+
+  // Build vector of aggregate source field data types
+  std::vector<std::vector<TypeHolder>> agg_src_types(aggs.size());
+  for (size_t i = 0; i < aggs.size(); ++i) {
+    for (const auto& agg_src_field_id : agg_src_fieldsets[i]) {
+      agg_src_types[i].push_back(input_schema->field(agg_src_field_id)->type().get());
+    }
+  }
+
+  // Build vector of segment key field data types
+  std::vector<TypeHolder> segment_key_types(segment_keys.size());
+  for (size_t i = 0; i < segment_keys.size(); ++i) {
+    auto segment_key_field_id = segment_key_field_ids[i];
+    segment_key_types[i] = input_schema->field(segment_key_field_id)->type().get();
+  }
+
+  ARROW_ASSIGN_OR_RAISE(auto segmenter, RowSegmenter::Make(std::move(segment_key_types),
+                                                           /*nullable_keys=*/false, ctx));
+
+  // Construct aggregates
+  ARROW_ASSIGN_OR_RAISE(auto agg_kernels, GetKernels(ctx, aggs, agg_src_types));
+
+  if (is_cpu_parallel) {
+    if (segment_keys.size() > 0) {
+      return Status::NotImplemented(
+          "Segmented aggregation in a multi-threaded execution context");
+    }
+
+    for (auto kernel : agg_kernels) {
+      if (kernel->ordered) {
+        return Status::NotImplemented(
+            "Using ordered aggregator in multiple threaded execution is not supported");
+      }
+    }
+  }
+
+  ARROW_ASSIGN_OR_RAISE(auto agg_states,
+                        InitKernels(agg_kernels, ctx, aggs, agg_src_types));
+
+  ARROW_ASSIGN_OR_RAISE(
+      FieldVector agg_result_fields,
+      ResolveKernels(aggs, agg_kernels, agg_states, ctx, agg_src_types));
+
+  // Build field vector for output schema
+  FieldVector output_fields{keys.size() + segment_keys.size() + aggs.size()};
+
+  // First output is segment keys, followed by keys, followed by aggregates themselves
+  // This matches the behavior described by Substrait and also tends to be the behavior
+  // in SQL engines
+  for (size_t i = 0; i < segment_keys.size(); ++i) {
+    int segment_key_field_id = segment_key_field_ids[i];
+    output_fields[i] = input_schema->field(segment_key_field_id);
+  }
+  size_t base = segment_keys.size();
+  for (size_t i = 0; i < keys.size(); ++i) {
+    int key_field_id = key_field_ids[i];
+    output_fields[base + i] = input_schema->field(key_field_id);
+  }
+  base += keys.size();
+  for (size_t i = 0; i < aggs.size(); ++i) {
+    output_fields[base + i] = agg_result_fields[i]->WithName(aggs[i].name);
+  }
+
+  return AggregateNodeArgs<HashAggregateKernel>{schema(std::move(output_fields)),
+                                                std::move(key_field_ids),
+                                                std::move(segment_key_field_ids),
+                                                std::move(segmenter),
+                                                std::move(agg_src_fieldsets),
+                                                std::move(aggs),
+                                                std::move(agg_kernels),
+                                                std::move(agg_src_types),
+                                                /*states=*/{}};
+}
+
+Result<ExecNode*> GroupByNode::Make(ExecPlan* plan, std::vector<ExecNode*> inputs,
+                                    const ExecNodeOptions& options) {
+  RETURN_NOT_OK(ValidateExecNodeInputs(plan, inputs, 1, "GroupByNode"));
+
+  auto input = inputs[0];
+  const auto& aggregate_options = checked_cast<const AggregateNodeOptions&>(options);
+  const auto& keys = aggregate_options.keys;
+  const auto& segment_keys = aggregate_options.segment_keys;
+  auto aggs = aggregate_options.aggregates;
+  bool is_cpu_parallel = plan->query_context()->executor()->GetCapacity() > 1;
+
+  const auto& input_schema = input->output_schema();
+  auto exec_ctx = plan->query_context()->exec_context();
+  ARROW_ASSIGN_OR_RAISE(
+      auto args, MakeAggregateNodeArgs(input_schema, keys, segment_keys, aggs, exec_ctx,
+                                       is_cpu_parallel));
+
+  return input->plan()->EmplaceNode<GroupByNode>(
+      input, std::move(args.output_schema), std::move(args.grouping_key_field_ids),
+      std::move(args.segment_key_field_ids), std::move(args.segmenter),
+      std::move(args.kernel_intypes), std::move(args.target_fieldsets),
+      std::move(args.aggregates), std::move(args.kernels));
+}
+
+Status GroupByNode::ResetKernelStates() {
+  auto ctx = plan()->query_context()->exec_context();
+  ARROW_RETURN_NOT_OK(InitKernels(agg_kernels_, ctx, aggs_, agg_src_types_));
+  return Status::OK();
+}
+
+Status GroupByNode::Consume(ExecSpan batch) {
+  size_t thread_index = plan_->query_context()->GetThreadIndex();
+  if (thread_index >= local_states_.size()) {
+    return Status::IndexError("thread index ", thread_index, " is out of range [0, ",
+                              local_states_.size(), ")");
+  }
+
+  auto state = &local_states_[thread_index];
+  RETURN_NOT_OK(InitLocalStateIfNeeded(state));
+
+  // Create a batch with key columns
+  std::vector<ExecValue> keys(key_field_ids_.size());
+  for (size_t i = 0; i < key_field_ids_.size(); ++i) {
+    keys[i] = batch[key_field_ids_[i]];
+  }
+  ExecSpan key_batch(std::move(keys), batch.length);
+
+  // Create a batch with group ids
+  ARROW_ASSIGN_OR_RAISE(Datum id_batch, state->grouper->Consume(key_batch));
+
+  // Execute aggregate kernels
+  for (size_t i = 0; i < agg_kernels_.size(); ++i) {
+    arrow::util::tracing::Span span;
+    START_COMPUTE_SPAN(span, aggs_[i].function,
+                       {{"function.name", aggs_[i].function},
+                        {"function.options",
+                         aggs_[i].options ? aggs_[i].options->ToString() : "<NULLPTR>"},
+                        {"function.kind", std::string(kind_name()) + "::Consume"}});
+    auto ctx = plan_->query_context()->exec_context();
+    KernelContext kernel_ctx{ctx};
+    kernel_ctx.SetState(state->agg_states[i].get());
+
+    std::vector<ExecValue> column_values;
+    for (const int field : agg_src_fieldsets_[i]) {
+      column_values.push_back(batch[field]);
+    }
+    column_values.emplace_back(*id_batch.array());
+    ExecSpan agg_batch(std::move(column_values), batch.length);
+    RETURN_NOT_OK(agg_kernels_[i]->resize(&kernel_ctx, state->grouper->num_groups()));
+    RETURN_NOT_OK(agg_kernels_[i]->consume(&kernel_ctx, agg_batch));
+  }
+
+  return Status::OK();
+}
+
+Status GroupByNode::Merge() {
+  arrow::util::tracing::Span span;
+  START_COMPUTE_SPAN(span, "Merge",
+                     {{"group_by", ToStringExtra(0)}, {"node.label", label()}});
+  ThreadLocalState* state0 = &local_states_[0];
+  for (size_t i = 1; i < local_states_.size(); ++i) {
+    ThreadLocalState* state = &local_states_[i];
+    if (!state->grouper) {
+      continue;
+    }
+
+    ARROW_ASSIGN_OR_RAISE(ExecBatch other_keys, state->grouper->GetUniques());
+    ARROW_ASSIGN_OR_RAISE(Datum transposition,
+                          state0->grouper->Consume(ExecSpan(other_keys)));
+    state->grouper.reset();
+
+    for (size_t span_i = 0; span_i < agg_kernels_.size(); ++span_i) {
+      arrow::util::tracing::Span span_item;
+      START_COMPUTE_SPAN(
+          span_item, aggs_[span_i].function,
+          {{"function.name", aggs_[span_i].function},
+           {"function.options",
+            aggs_[span_i].options ? aggs_[span_i].options->ToString() : "<NULLPTR>"},
+           {"function.kind", std::string(kind_name()) + "::Merge"}});
+
+      auto ctx = plan_->query_context()->exec_context();
+      KernelContext batch_ctx{ctx};
+      DCHECK(state0->agg_states[span_i]);
+      batch_ctx.SetState(state0->agg_states[span_i].get());
+
+      RETURN_NOT_OK(
+          agg_kernels_[span_i]->resize(&batch_ctx, state0->grouper->num_groups()));
+      RETURN_NOT_OK(agg_kernels_[span_i]->merge(
+          &batch_ctx, std::move(*state->agg_states[span_i]), *transposition.array()));
+      state->agg_states[span_i].reset();
+    }
+  }
+  return Status::OK();
+}
+
+Result<ExecBatch> GroupByNode::Finalize() {
+  arrow::util::tracing::Span span;
+  START_COMPUTE_SPAN(span, "Finalize",
+                     {{"group_by", ToStringExtra(0)}, {"node.label", label()}});
+
+  ThreadLocalState* state = &local_states_[0];
+  // If we never got any batches, then state won't have been initialized
+  RETURN_NOT_OK(InitLocalStateIfNeeded(state));
+
+  // Allocate a batch for output
+  ExecBatch out_data{{}, state->grouper->num_groups()};
+  out_data.values.resize(agg_kernels_.size() + key_field_ids_.size() +
+                         segment_key_field_ids_.size());
+
+  // Segment keys come first
+  PlaceFields(out_data, 0, segmenter_values_);
+  // Followed by keys
+  ARROW_ASSIGN_OR_RAISE(ExecBatch out_keys, state->grouper->GetUniques());
+  std::move(out_keys.values.begin(), out_keys.values.end(),
+            out_data.values.begin() + segment_key_field_ids_.size());
+  // And finally, the aggregates themselves
+  std::size_t base = segment_key_field_ids_.size() + key_field_ids_.size();
+  for (size_t i = 0; i < agg_kernels_.size(); ++i) {
+    arrow::util::tracing::Span span_item;
+    START_COMPUTE_SPAN(span_item, aggs_[i].function,
+                       {{"function.name", aggs_[i].function},
+                        {"function.options",
+                         aggs_[i].options ? aggs_[i].options->ToString() : "<NULLPTR>"},
+                        {"function.kind", std::string(kind_name()) + "::Finalize"}});
+    KernelContext batch_ctx{plan_->query_context()->exec_context()};
+    batch_ctx.SetState(state->agg_states[i].get());
+    RETURN_NOT_OK(agg_kernels_[i]->finalize(&batch_ctx, &out_data.values[i + base]));
+    state->agg_states[i].reset();
+  }
+  state->grouper.reset();
+
+  return out_data;
+}
+
+Status GroupByNode::OutputNthBatch(int64_t n) {
+  int64_t batch_size = output_batch_size();
+  return output_->InputReceived(this, out_data_.Slice(batch_size * n, batch_size));
+}
+
+Status GroupByNode::OutputResult(bool is_last) {
+  // To simplify merging, ensure that the first grouper is nonempty
+  for (size_t i = 0; i < local_states_.size(); i++) {
+    if (local_states_[i].grouper) {
+      std::swap(local_states_[i], local_states_[0]);
+      break;
+    }
+  }
+
+  RETURN_NOT_OK(Merge());
+  ARROW_ASSIGN_OR_RAISE(out_data_, Finalize());
+
+  int64_t num_output_batches = bit_util::CeilDiv(out_data_.length, output_batch_size());
+  total_output_batches_ += static_cast<int>(num_output_batches);
+  if (is_last) {
+    ARROW_RETURN_NOT_OK(output_->InputFinished(this, total_output_batches_));
+    RETURN_NOT_OK(plan_->query_context()->StartTaskGroup(output_task_group_id_,
+                                                         num_output_batches));
+  } else {
+    for (int64_t i = 0; i < num_output_batches; i++) {
+      ARROW_RETURN_NOT_OK(OutputNthBatch(i));
+    }
+    ARROW_RETURN_NOT_OK(ResetKernelStates());
+  }
+  return Status::OK();
+}
+
+Status GroupByNode::InputReceived(ExecNode* input, ExecBatch batch) {
+  auto scope = TraceInputReceived(batch);
+
+  DCHECK_EQ(input, inputs_[0]);
+
+  auto handler = [this](const ExecBatch& full_batch, const Segment& segment) {
+    if (!segment.extends && segment.offset == 0) RETURN_NOT_OK(OutputResult(false));
+    auto exec_batch = full_batch.Slice(segment.offset, segment.length);
+    auto batch = ExecSpan(exec_batch);
+    RETURN_NOT_OK(Consume(batch));
+    RETURN_NOT_OK(
+        ExtractSegmenterValues(&segmenter_values_, exec_batch, segment_key_field_ids_));
+    if (!segment.is_open) RETURN_NOT_OK(OutputResult(false));
+    return Status::OK();
+  };
+  ARROW_RETURN_NOT_OK(
+      HandleSegments(segmenter_.get(), batch, segment_key_field_ids_, handler));
+
+  if (input_counter_.Increment()) {
+    ARROW_RETURN_NOT_OK(OutputResult(/*is_last=*/true));
+  }
+  return Status::OK();
+}
+
+Status GroupByNode::InputFinished(ExecNode* input, int total_batches) {
+  auto scope = TraceFinish();
+  DCHECK_EQ(input, inputs_[0]);
+
+  if (input_counter_.SetTotal(total_batches)) {
+    RETURN_NOT_OK(OutputResult(/*is_last=*/true));
+  }
+  return Status::OK();
+}
+
+std::string GroupByNode::ToStringExtra(int indent) const {
+  std::stringstream ss;
+  const auto input_schema = inputs_[0]->output_schema();
+  ss << "keys=[";
+  for (size_t i = 0; i < key_field_ids_.size(); i++) {
+    if (i > 0) ss << ", ";
+    ss << '"' << input_schema->field(key_field_ids_[i])->name() << '"';
+  }
+  ss << "], ";
+  AggregatesToString(&ss, *input_schema, aggs_, agg_src_fieldsets_, indent);
+  return ss.str();
+}
+
+Status GroupByNode::InitLocalStateIfNeeded(ThreadLocalState* state) {
+  // Get input schema
+  auto input_schema = inputs_[0]->output_schema();
+
+  if (state->grouper != nullptr) return Status::OK();
+
+  // Build vector of key field data types
+  std::vector<TypeHolder> key_types(key_field_ids_.size());
+  for (size_t i = 0; i < key_field_ids_.size(); ++i) {
+    auto key_field_id = key_field_ids_[i];
+    key_types[i] = input_schema->field(key_field_id)->type().get();
+  }
+
+  // Construct grouper
+  ARROW_ASSIGN_OR_RAISE(state->grouper,
+                        Grouper::Make(key_types, plan_->query_context()->exec_context()));
+
+  // Build vector of aggregate source field data types
+  std::vector<std::vector<TypeHolder>> agg_src_types(agg_kernels_.size());
+  for (size_t i = 0; i < agg_kernels_.size(); ++i) {
+    for (const auto& field_id : agg_src_fieldsets_[i]) {
+      agg_src_types[i].emplace_back(input_schema->field(field_id)->type().get());
+    }
+  }
+
+  ARROW_ASSIGN_OR_RAISE(state->agg_states,
+                        InitKernels(agg_kernels_, plan_->query_context()->exec_context(),
+                                    aggs_, agg_src_types));
+
+  return Status::OK();
+}
+
+}  // namespace aggregate
+}  // namespace acero
+}  // namespace arrow
diff --git a/cpp/src/arrow/acero/options.h b/cpp/src/arrow/acero/options.h
index b1ab6b5d9d..bb94bdaa4a 100644
--- a/cpp/src/arrow/acero/options.h
+++ b/cpp/src/arrow/acero/options.h
@@ -339,7 +339,7 @@ class ARROW_ACERO_EXPORT AggregateNodeOptions : public ExecNodeOptions {
         keys(std::move(keys)),
         segment_keys(std::move(segment_keys)) {}
 
-  // aggregations which will be applied to the targetted fields
+  // aggregations which will be applied to the targeted fields
   std::vector<Aggregate> aggregates;
   // keys by which aggregations will be grouped (optional)
   std::vector<FieldRef> keys;
diff --git a/cpp/src/arrow/acero/scalar_aggregate_node.cc b/cpp/src/arrow/acero/scalar_aggregate_node.cc
new file mode 100644
index 0000000000..ae59aa6920
--- /dev/null
+++ b/cpp/src/arrow/acero/scalar_aggregate_node.cc
@@ -0,0 +1,322 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <mutex>
+#include <thread>
+#include <unordered_set>
+
+#include "arrow/acero/aggregate_internal.h"
+#include "arrow/acero/aggregate_node.h"
+#include "arrow/acero/exec_plan.h"
+#include "arrow/acero/options.h"
+#include "arrow/acero/util.h"
+#include "arrow/compute/exec.h"
+#include "arrow/compute/registry.h"
+#include "arrow/compute/row/grouper.h"
+#include "arrow/datum.h"
+#include "arrow/result.h"
+#include "arrow/util/checked_cast.h"
+#include "arrow/util/logging.h"
+#include "arrow/util/thread_pool.h"
+#include "arrow/util/tracing_internal.h"
+
+namespace arrow {
+
+using internal::checked_cast;
+
+using compute::ExecSpan;
+using compute::ExecValue;
+using compute::Function;
+using compute::FunctionOptions;
+using compute::Grouper;
+using compute::HashAggregateKernel;
+using compute::Kernel;
+using compute::KernelContext;
+using compute::KernelInitArgs;
+using compute::KernelState;
+using compute::RowSegmenter;
+using compute::ScalarAggregateKernel;
+using compute::Segment;
+
+namespace acero {
+namespace aggregate {
+
+Result<AggregateNodeArgs<ScalarAggregateKernel>>
+ScalarAggregateNode::MakeAggregateNodeArgs(const std::shared_ptr<Schema>& input_schema,
+                                           const std::vector<FieldRef>& keys,
+                                           const std::vector<FieldRef>& segment_keys,
+                                           const std::vector<Aggregate>& aggs,
+                                           ExecContext* exec_ctx, size_t concurrency,
+                                           bool is_cpu_parallel) {
+  // Copy (need to modify options pointer below)
+  std::vector<Aggregate> aggregates(aggs);
+  std::vector<int> segment_field_ids(segment_keys.size());
+  std::vector<TypeHolder> segment_key_types(segment_keys.size());
+  for (size_t i = 0; i < segment_keys.size(); i++) {
+    ARROW_ASSIGN_OR_RAISE(FieldPath match, segment_keys[i].FindOne(*input_schema));
+    if (match.indices().size() > 1) {
+      // ARROW-18369: Support nested references as segment ids
+      return Status::Invalid("Nested references cannot be used as segment ids");
+    }
+    segment_field_ids[i] = match[0];
+    segment_key_types[i] = input_schema->field(match[0])->type().get();
+  }
+
+  ARROW_ASSIGN_OR_RAISE(auto segmenter,
+                        RowSegmenter::Make(std::move(segment_key_types),
+                                           /*nullable_keys=*/false, exec_ctx));
+
+  std::vector<std::vector<TypeHolder>> kernel_intypes(aggregates.size());
+  std::vector<const ScalarAggregateKernel*> kernels(aggregates.size());
+  std::vector<std::vector<std::unique_ptr<KernelState>>> states(kernels.size());
+  FieldVector fields(kernels.size() + segment_keys.size());
+
+  // Output the segment keys first, followed by the aggregates
+  for (size_t i = 0; i < segment_keys.size(); ++i) {
+    ARROW_ASSIGN_OR_RAISE(fields[i], segment_keys[i].GetOne(*input_schema));
+  }
+
+  std::vector<std::vector<int>> target_fieldsets(kernels.size());
+  std::size_t base = segment_keys.size();
+  for (size_t i = 0; i < kernels.size(); ++i) {
+    const auto& target_fieldset = aggregates[i].target;
+    for (const auto& target : target_fieldset) {
+      ARROW_ASSIGN_OR_RAISE(auto match, FieldRef(target).FindOne(*input_schema));
+      target_fieldsets[i].push_back(match[0]);
+    }
+
+    ARROW_ASSIGN_OR_RAISE(auto function,
+                          exec_ctx->func_registry()->GetFunction(aggregates[i].function));
+
+    if (function->kind() != Function::SCALAR_AGGREGATE) {
+      if (function->kind() == Function::HASH_AGGREGATE) {
+        return Status::Invalid("The provided function (", aggregates[i].function,
+                               ") is a hash aggregate function.  Since there are no "
+                               "keys to group by, a scalar aggregate function was "
+                               "expected (normally these do not start with hash_)");
+      }
+      return Status::Invalid("The provided function(", aggregates[i].function,
+                             ") is not an aggregate function");
+    }
+
+    std::vector<TypeHolder> in_types;
+    for (const auto& target : target_fieldsets[i]) {
+      in_types.emplace_back(input_schema->field(target)->type().get());
+    }
+    kernel_intypes[i] = in_types;
+    ARROW_ASSIGN_OR_RAISE(const Kernel* kernel,
+                          function->DispatchExact(kernel_intypes[i]));
+    const auto* agg_kernel = static_cast<const ScalarAggregateKernel*>(kernel);
+    if (is_cpu_parallel && agg_kernel->ordered) {
+      return Status::NotImplemented(
+          "Using ordered aggregator in multiple threaded execution is not supported");
+    }
+
+    kernels[i] = agg_kernel;
+
+    if (aggregates[i].options == nullptr) {
+      DCHECK(!function->doc().options_required);
+      const auto* default_options = function->default_options();
+      if (default_options) {
+        aggregates[i].options = default_options->Copy();
+      }
+    }
+
+    KernelContext kernel_ctx{exec_ctx};
+    states[i].resize(concurrency);
+    RETURN_NOT_OK(Kernel::InitAll(
+        &kernel_ctx,
+        KernelInitArgs{kernels[i], kernel_intypes[i], aggregates[i].options.get()},
+        &states[i]));
+
+    // pick one to resolve the kernel signature
+    kernel_ctx.SetState(states[i][0].get());
+    ARROW_ASSIGN_OR_RAISE(auto out_type, kernels[i]->signature->out_type().Resolve(
+                                             &kernel_ctx, kernel_intypes[i]));
+
+    fields[base + i] = field(aggregates[i].name, out_type.GetSharedPtr());
+  }
+
+  return AggregateNodeArgs<ScalarAggregateKernel>{
+      schema(std::move(fields)),
+      /*grouping_key_field_ids=*/{}, std::move(segment_field_ids),
+      std::move(segmenter),          std::move(target_fieldsets),
+      std::move(aggregates),         std::move(kernels),
+      std::move(kernel_intypes),     std::move(states)};
+}
+
+Result<ExecNode*> ScalarAggregateNode::Make(ExecPlan* plan, std::vector<ExecNode*> inputs,
+                                            const ExecNodeOptions& options) {
+  RETURN_NOT_OK(ValidateExecNodeInputs(plan, inputs, 1, "ScalarAggregateNode"));
+
+  const auto& aggregate_options = checked_cast<const AggregateNodeOptions&>(options);
+  auto aggregates = aggregate_options.aggregates;
+  const auto& keys = aggregate_options.keys;
+  const auto& segment_keys = aggregate_options.segment_keys;
+  const auto concurrency = plan->query_context()->max_concurrency();
+  // We can't use concurrency == 1 because that include I/O concurrency
+  const bool is_cpu_parallel = plan->query_context()->executor()->GetCapacity() > 1;
+
+  if (keys.size() > 0) {
+    return Status::Invalid("Scalar aggregation with some key");
+  }
+  if (is_cpu_parallel && segment_keys.size() > 0) {
+    return Status::NotImplemented("Segmented aggregation in a multi-threaded plan");
+  }
+
+  const auto& input_schema = inputs[0]->output_schema();
+  auto exec_ctx = plan->query_context()->exec_context();
+
+  ARROW_ASSIGN_OR_RAISE(
+      auto args, MakeAggregateNodeArgs(input_schema, keys, segment_keys, aggregates,
+                                       exec_ctx, concurrency, is_cpu_parallel));
+
+  if (is_cpu_parallel) {
+    for (auto& kernel : args.kernels) {
+      if (kernel->ordered) {
+        return Status::NotImplemented(
+            "Using ordered aggregator in multiple threaded execution is not supported");
+      }
+    }
+  }
+
+  return plan->EmplaceNode<ScalarAggregateNode>(
+      plan, std::move(inputs), std::move(args.output_schema), std::move(args.segmenter),
+      std::move(args.segment_key_field_ids), std::move(args.target_fieldsets),
+      std::move(args.aggregates), std::move(args.kernels), std::move(args.kernel_intypes),
+      std::move(args.states));
+}
+
+Status ScalarAggregateNode::DoConsume(const ExecSpan& batch, size_t thread_index) {
+  for (size_t i = 0; i < kernels_.size(); ++i) {
+    arrow::util::tracing::Span span;
+    START_COMPUTE_SPAN(span, aggs_[i].function,
+                       {{"function.name", aggs_[i].function},
+                        {"function.options",
+                         aggs_[i].options ? aggs_[i].options->ToString() : "<NULLPTR>"},
+                        {"function.kind", std::string(kind_name()) + "::Consume"}});
+    KernelContext batch_ctx{plan()->query_context()->exec_context()};
+    DCHECK_LT(thread_index, states_[i].size());
+    batch_ctx.SetState(states_[i][thread_index].get());
+
+    std::vector<ExecValue> column_values;
+    for (const int field : target_fieldsets_[i]) {
+      column_values.push_back(batch.values[field]);
+    }
+    ExecSpan column_batch{std::move(column_values), batch.length};
+    RETURN_NOT_OK(kernels_[i]->consume(&batch_ctx, column_batch));
+  }
+  return Status::OK();
+}
+
+Status ScalarAggregateNode::InputReceived(ExecNode* input, ExecBatch batch) {
+  auto scope = TraceInputReceived(batch);
+  DCHECK_EQ(input, inputs_[0]);
+
+  auto thread_index = plan_->query_context()->GetThreadIndex();
+  auto handler = [this, thread_index](const ExecBatch& full_batch,
+                                      const Segment& segment) {
+    // (1) The segment is starting of a new segment group and points to
+    // the beginning of the batch, then it means no data in the batch belongs
+    // to the current segment group. We can output and reset kernel states.
+    if (!segment.extends && segment.offset == 0) RETURN_NOT_OK(OutputResult(false));
+
+    // We add segment to the current segment group aggregation
+    auto exec_batch = full_batch.Slice(segment.offset, segment.length);
+    RETURN_NOT_OK(DoConsume(ExecSpan(exec_batch), thread_index));
+    RETURN_NOT_OK(
+        ExtractSegmenterValues(&segmenter_values_, exec_batch, segment_field_ids_));
+
+    // If the segment closes the current segment group, we can output segment group
+    // aggregation.
+    if (!segment.is_open) RETURN_NOT_OK(OutputResult(false));
+
+    return Status::OK();
+  };
+  RETURN_NOT_OK(HandleSegments(segmenter_.get(), batch, segment_field_ids_, handler));
+
+  if (input_counter_.Increment()) {
+    RETURN_NOT_OK(OutputResult(/*is_last=*/true));
+  }
+  return Status::OK();
+}
+
+Status ScalarAggregateNode::InputFinished(ExecNode* input, int total_batches) {
+  auto scope = TraceFinish();
+  EVENT_ON_CURRENT_SPAN("InputFinished", {{"batches.length", total_batches}});
+  DCHECK_EQ(input, inputs_[0]);
+  if (input_counter_.SetTotal(total_batches)) {
+    RETURN_NOT_OK(OutputResult(/*is_last=*/true));
+  }
+  return Status::OK();
+}
+
+std::string ScalarAggregateNode::ToStringExtra(int indent) const {
+  std::stringstream ss;
+  const auto input_schema = inputs_[0]->output_schema();
+  AggregatesToString(&ss, *input_schema, aggs_, target_fieldsets_);
+  return ss.str();
+}
+
+Status ScalarAggregateNode::ResetKernelStates() {
+  auto exec_ctx = plan()->query_context()->exec_context();
+  for (size_t i = 0; i < kernels_.size(); ++i) {
+    states_[i].resize(plan()->query_context()->max_concurrency());
+    KernelContext kernel_ctx{exec_ctx};
+    RETURN_NOT_OK(Kernel::InitAll(
+        &kernel_ctx,
+        KernelInitArgs{kernels_[i], kernel_intypes_[i], aggs_[i].options.get()},
+        &states_[i]));
+  }
+  return Status::OK();
+}
+
+Status ScalarAggregateNode::OutputResult(bool is_last) {
+  ExecBatch batch{{}, 1};
+  batch.values.resize(kernels_.size() + segment_field_ids_.size());
+
+  // First, insert segment keys
+  PlaceFields(batch, /*base=*/0, segmenter_values_);
+
+  // Followed by aggregate values
+  std::size_t base = segment_field_ids_.size();
+  for (size_t i = 0; i < kernels_.size(); ++i) {
+    arrow::util::tracing::Span span;
+    START_COMPUTE_SPAN(span, aggs_[i].function,
+                       {{"function.name", aggs_[i].function},
+                        {"function.options",
+                         aggs_[i].options ? aggs_[i].options->ToString() : "<NULLPTR>"},
+                        {"function.kind", std::string(kind_name()) + "::Finalize"}});
+    KernelContext ctx{plan()->query_context()->exec_context()};
+    ARROW_ASSIGN_OR_RAISE(auto merged, ScalarAggregateKernel::MergeAll(
+                                           kernels_[i], &ctx, std::move(states_[i])));
+    RETURN_NOT_OK(kernels_[i]->finalize(&ctx, &batch.values[base + i]));
+  }
+
+  ARROW_RETURN_NOT_OK(output_->InputReceived(this, std::move(batch)));
+  total_output_batches_++;
+  if (is_last) {
+    ARROW_RETURN_NOT_OK(output_->InputFinished(this, total_output_batches_));
+  } else {
+    ARROW_RETURN_NOT_OK(ResetKernelStates());
+  }
+  return Status::OK();
+}
+
+}  // namespace aggregate
+}  // namespace acero
+}  // namespace arrow
diff --git a/cpp/src/arrow/acero/test_util_internal.cc b/cpp/src/arrow/acero/test_util_internal.cc
index 46116d66d8..2042650be6 100644
--- a/cpp/src/arrow/acero/test_util_internal.cc
+++ b/cpp/src/arrow/acero/test_util_internal.cc
@@ -604,7 +604,7 @@ static inline void PrintToImpl(const std::string& factory_name,
       *os << ",";
       *os << "name=" << agg.name;
     }
-    *os << "},";
+    *os << "}";
 
     if (!o->keys.empty()) {
       *os << ",keys={";
@@ -640,8 +640,8 @@ void PrintTo(const Declaration& decl, std::ostream* os) {
 
   *os << "{";
   for (const auto& input : decl.inputs) {
-    if (auto decl = std::get_if<Declaration>(&input)) {
-      PrintTo(*decl, os);
+    if (auto idecl = std::get_if<Declaration>(&input)) {
+      PrintTo(*idecl, os);
     }
   }
   *os << "}";