You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by "westonpace (via GitHub)" <gi...@apache.org> on 2023/02/24 00:56:13 UTC

[GitHub] [arrow] westonpace commented on a diff in pull request #34311: GH-32884: [C++] Add ordered aggregation

westonpace commented on code in PR #34311:
URL: https://github.com/apache/arrow/pull/34311#discussion_r1116321964


##########
cpp/src/arrow/compute/row/grouper.h:
##########
@@ -30,6 +30,49 @@
 namespace arrow {
 namespace compute {
 
+/// \brief A segment of contiguous rows for grouping
+struct ARROW_EXPORT GroupingSegment {
+  int64_t offset;
+  int64_t length;
+  bool is_open;
+  bool extends;
+};
+
+inline bool operator==(const GroupingSegment& segment1, const GroupingSegment& segment2) {
+  return segment1.offset == segment2.offset && segment1.length == segment2.length &&
+         segment1.is_open == segment2.is_open && segment1.extends == segment2.extends;
+}
+inline bool operator!=(const GroupingSegment& segment1, const GroupingSegment& segment2) {
+  return !(segment1 == segment2);
+}
+
+/// \brief Computes grouping segments for a batch. Each segment covers rows with identical
+/// values in the batch. The values in the batch are often selected as keys from a larger
+/// batch.
+class ARROW_EXPORT GroupingSegmenter {
+ public:
+  virtual ~GroupingSegmenter() = default;
+
+  /// \brief Construct a GroupingSegmenter which receives the specified key types

Review Comment:
   ```suggestion
     /// \brief Construct a GroupingSegmenter which segments on the specified key types
   ```



##########
cpp/src/arrow/compute/exec/options.h:
##########
@@ -199,21 +199,32 @@ class ARROW_EXPORT ProjectNodeOptions : public ExecNodeOptions {
   std::vector<std::string> names;
 };
 
-/// \brief Make a node which aggregates input batches, optionally grouped by keys.
+/// \brief Make a node which aggregates input batches, optionally grouped by keys and
+/// optionally segmented by segment-keys. Both keys and segment-keys determine the group.
+/// However segment-keys are also used for determining grouping segments, which should be
+/// large, and allow streaming a partial aggregation result after processing each segment.
+/// One common use-case for segment-keys is ordered aggregation, in which the segment-key
+/// attribute specifies a column with non-decreasing values or a lexigographically-ordered
+/// set of such columns.
 ///
 /// If the keys attribute is a non-empty vector, then each aggregate in `aggregates` is
 /// expected to be a HashAggregate function. If the keys attribute is an empty vector,
 /// then each aggregate is assumed to be a ScalarAggregate function.
 class ARROW_EXPORT AggregateNodeOptions : public ExecNodeOptions {
  public:
   explicit AggregateNodeOptions(std::vector<Aggregate> aggregates,
-                                std::vector<FieldRef> keys = {})
-      : aggregates(std::move(aggregates)), keys(std::move(keys)) {}
+                                std::vector<FieldRef> keys = {},
+                                std::vector<FieldRef> segment_keys = {})
+      : aggregates(std::move(aggregates)),
+        keys(std::move(keys)),
+        segment_keys(std::move(segment_keys)) {}
 
   // aggregations which will be applied to the targetted fields
   std::vector<Aggregate> aggregates;
   // keys by which aggregations will be grouped
   std::vector<FieldRef> keys;
+  // keys by which aggregations will be segmented
+  std::vector<FieldRef> segment_keys;

Review Comment:
   Is it ok if an entry in `segment_keys` and `keys` reference the same field (I would expect this to be a pretty common case)?  Can you test this?
   
   I think all of the following would be valid:
   
   group by x,y and segment by x,y,z // Maybe this isn't valid?
   group by x,y,z and segment by x,y
   group by x,y and segment by x,y
   group by x,y and segment by a,b
   group by x,y and segment by a,x // ?



##########
cpp/src/arrow/compute/kernels/hash_aggregate_test.cc:
##########
@@ -301,53 +426,184 @@ Result<Datum> GroupByTest(const std::vector<Datum>& arguments,
         {t_agg.function, t_agg.options, "agg_" + ToChars(idx), t_agg.function});
     idx = idx + 1;
   }
-  return RunGroupBy(arguments, keys, internal_aggregates, use_threads);
+  return group_by(arguments, keys, segment_keys, internal_aggregates, use_threads,
+                  /*naive=*/false);
 }
 
-}  // namespace
+Result<Datum> GroupByTest(GroupByFunction group_by, const std::vector<Datum>& arguments,
+                          const std::vector<Datum>& keys,
+                          const std::vector<TestAggregate>& aggregates,
+                          bool use_threads) {
+  return GroupByTest(group_by, arguments, keys, {}, aggregates, use_threads);
+}
 
-TEST(Grouper, SupportedKeys) {
-  ASSERT_OK(Grouper::Make({boolean()}));
+template <typename GroupClass>

Review Comment:
   It's a little confusing to see `GroupingSegmenter` and `Grouper` used interchangeably here since they have completely different interfaces.  Though I suppose it works because they both have the same `Make` method?



##########
cpp/src/arrow/compute/row/grouper.cc:
##########
@@ -39,12 +43,521 @@
 namespace arrow {
 
 using internal::checked_cast;
+using internal::PrimitiveScalarBase;
 
 namespace compute {
 
 namespace {
 
-struct GrouperImpl : Grouper {
+constexpr uint32_t kNoGroupId = std::numeric_limits<uint32_t>::max();
+
+using group_id_t = std::remove_const<decltype(kNoGroupId)>::type;
+using GroupIdType = CTypeTraits<group_id_t>::ArrowType;
+auto group_id_type = std::make_shared<GroupIdType>();
+
+inline const uint8_t* GetValuesAsBytes(const ArrayData& data, int64_t offset = 0) {
+  DCHECK_GT(data.type->byte_width(), 0);
+  int64_t absolute_byte_offset = (data.offset + offset) * data.type->byte_width();
+  return data.GetValues<uint8_t>(1, absolute_byte_offset);
+}
+
+inline const uint8_t* GetValuesAsBytes(const ArraySpan& data, int64_t offset = 0) {
+  DCHECK_GT(data.type->byte_width(), 0);
+  int64_t absolute_byte_offset = (data.offset + offset) * data.type->byte_width();
+  return data.GetValues<uint8_t>(1, absolute_byte_offset);
+}
+
+template <typename Value>
+Status CheckForGetNextSegment(const std::vector<Value>& values, int64_t length,
+                              int64_t offset, const std::vector<TypeHolder>& key_types) {
+  if (offset < 0 || offset > length) {
+    return Status::Invalid("invalid grouping segmenter offset: ", offset);
+  }
+  if (values.size() != key_types.size()) {
+    return Status::Invalid("expected batch size ", key_types.size(), " but got ",
+                           values.size());
+  }
+  for (size_t i = 0; i < key_types.size(); i++) {
+    const auto& value = values[i];
+    const auto& key_type = key_types[i];
+    if (*value.type() != *key_type.type) {
+      return Status::Invalid("expected batch value ", i, " of type ", *key_type.type,
+                             " but got ", *value.type());
+    }
+  }
+  return Status::OK();
+}
+
+template <typename Batch>
+enable_if_t<std::is_same<Batch, ExecSpan>::value || std::is_same<Batch, ExecBatch>::value,
+            Status>
+CheckForGetNextSegment(const Batch& batch, int64_t offset,
+                       const std::vector<TypeHolder>& key_types) {
+  return CheckForGetNextSegment(batch.values, batch.length, offset, key_types);
+}
+
+struct BaseGroupingSegmenter : public GroupingSegmenter {
+  explicit BaseGroupingSegmenter(const std::vector<TypeHolder>& key_types)
+      : key_types_(key_types) {}
+
+  const std::vector<TypeHolder>& key_types() const override { return key_types_; }
+
+  std::vector<TypeHolder> key_types_;
+};
+
+GroupingSegment MakeSegment(int64_t batch_length, int64_t offset, int64_t length,
+                            bool extends) {
+  return GroupingSegment{offset, length, offset + length >= batch_length, extends};
+}
+
+int64_t GetMatchLength(const uint8_t* match_bytes, int64_t match_width,

Review Comment:
   Could you use a binary search here?



##########
cpp/src/arrow/compute/exec/options.h:
##########
@@ -199,21 +199,32 @@ class ARROW_EXPORT ProjectNodeOptions : public ExecNodeOptions {
   std::vector<std::string> names;
 };
 
-/// \brief Make a node which aggregates input batches, optionally grouped by keys.
+/// \brief Make a node which aggregates input batches, optionally grouped by keys and
+/// optionally segmented by segment-keys. Both keys and segment-keys determine the group.
+/// However segment-keys are also used for determining grouping segments, which should be
+/// large, and allow streaming a partial aggregation result after processing each segment.
+/// One common use-case for segment-keys is ordered aggregation, in which the segment-key
+/// attribute specifies a column with non-decreasing values or a lexigographically-ordered
+/// set of such columns.
 ///
 /// If the keys attribute is a non-empty vector, then each aggregate in `aggregates` is
 /// expected to be a HashAggregate function. If the keys attribute is an empty vector,
 /// then each aggregate is assumed to be a ScalarAggregate function.
 class ARROW_EXPORT AggregateNodeOptions : public ExecNodeOptions {
  public:
   explicit AggregateNodeOptions(std::vector<Aggregate> aggregates,
-                                std::vector<FieldRef> keys = {})
-      : aggregates(std::move(aggregates)), keys(std::move(keys)) {}
+                                std::vector<FieldRef> keys = {},
+                                std::vector<FieldRef> segment_keys = {})
+      : aggregates(std::move(aggregates)),
+        keys(std::move(keys)),
+        segment_keys(std::move(segment_keys)) {}
 
   // aggregations which will be applied to the targetted fields
   std::vector<Aggregate> aggregates;
   // keys by which aggregations will be grouped
   std::vector<FieldRef> keys;
+  // keys by which aggregations will be segmented

Review Comment:
   Can we clarify that this is optional?



##########
cpp/src/arrow/compute/row/grouper.cc:
##########
@@ -39,12 +43,521 @@
 namespace arrow {
 
 using internal::checked_cast;
+using internal::PrimitiveScalarBase;
 
 namespace compute {
 
 namespace {
 
-struct GrouperImpl : Grouper {
+constexpr uint32_t kNoGroupId = std::numeric_limits<uint32_t>::max();
+
+using group_id_t = std::remove_const<decltype(kNoGroupId)>::type;
+using GroupIdType = CTypeTraits<group_id_t>::ArrowType;
+auto group_id_type = std::make_shared<GroupIdType>();
+
+inline const uint8_t* GetValuesAsBytes(const ArrayData& data, int64_t offset = 0) {
+  DCHECK_GT(data.type->byte_width(), 0);
+  int64_t absolute_byte_offset = (data.offset + offset) * data.type->byte_width();
+  return data.GetValues<uint8_t>(1, absolute_byte_offset);
+}
+
+inline const uint8_t* GetValuesAsBytes(const ArraySpan& data, int64_t offset = 0) {
+  DCHECK_GT(data.type->byte_width(), 0);
+  int64_t absolute_byte_offset = (data.offset + offset) * data.type->byte_width();
+  return data.GetValues<uint8_t>(1, absolute_byte_offset);
+}
+
+template <typename Value>
+Status CheckForGetNextSegment(const std::vector<Value>& values, int64_t length,
+                              int64_t offset, const std::vector<TypeHolder>& key_types) {
+  if (offset < 0 || offset > length) {
+    return Status::Invalid("invalid grouping segmenter offset: ", offset);
+  }
+  if (values.size() != key_types.size()) {
+    return Status::Invalid("expected batch size ", key_types.size(), " but got ",
+                           values.size());
+  }
+  for (size_t i = 0; i < key_types.size(); i++) {
+    const auto& value = values[i];
+    const auto& key_type = key_types[i];
+    if (*value.type() != *key_type.type) {
+      return Status::Invalid("expected batch value ", i, " of type ", *key_type.type,
+                             " but got ", *value.type());
+    }
+  }
+  return Status::OK();
+}
+
+template <typename Batch>
+enable_if_t<std::is_same<Batch, ExecSpan>::value || std::is_same<Batch, ExecBatch>::value,
+            Status>
+CheckForGetNextSegment(const Batch& batch, int64_t offset,
+                       const std::vector<TypeHolder>& key_types) {
+  return CheckForGetNextSegment(batch.values, batch.length, offset, key_types);
+}
+
+struct BaseGroupingSegmenter : public GroupingSegmenter {
+  explicit BaseGroupingSegmenter(const std::vector<TypeHolder>& key_types)
+      : key_types_(key_types) {}
+
+  const std::vector<TypeHolder>& key_types() const override { return key_types_; }
+
+  std::vector<TypeHolder> key_types_;
+};
+
+GroupingSegment MakeSegment(int64_t batch_length, int64_t offset, int64_t length,
+                            bool extends) {
+  return GroupingSegment{offset, length, offset + length >= batch_length, extends};
+}
+
+int64_t GetMatchLength(const uint8_t* match_bytes, int64_t match_width,
+                       const uint8_t* array_bytes, int64_t offset, int64_t length) {
+  int64_t cursor, byte_cursor;
+  for (cursor = offset, byte_cursor = match_width * cursor; cursor < length;
+       cursor++, byte_cursor += match_width) {
+    if (memcmp(match_bytes, array_bytes + byte_cursor,
+               static_cast<size_t>(match_width)) != 0) {
+      break;
+    }
+  }
+  return std::min(cursor, length - offset);
+}
+
+using ExtendFunc = std::function<bool(const void*)>;
+constexpr bool kDefaultExtends = true;
+constexpr bool kEmptyExtends = true;
+
+Result<GroupingSegment> GetNextSegmentChunked(
+    const std::shared_ptr<ChunkedArray>& chunked_array, int64_t offset,
+    ExtendFunc extend) {
+  if (offset >= chunked_array->length()) {
+    return MakeSegment(chunked_array->length(), chunked_array->length(), 0,
+                       kEmptyExtends);
+  }
+  int64_t remaining_offset = offset;
+  const auto& arrays = chunked_array->chunks();
+  for (size_t i = 0; remaining_offset >= 0 && i < arrays.size(); i++) {
+    // look up chunk containing offset
+    int64_t array_length = arrays[i]->length();
+    if (remaining_offset < array_length) {
+      // found - switch to matching
+      int64_t match_width = arrays[i]->type()->byte_width();
+      const uint8_t* match_bytes = GetValuesAsBytes(*arrays[i]->data(), remaining_offset);
+      int64_t total_match_length = 0;
+      for (; i < arrays.size(); i++) {
+        int64_t array_length = arrays[i]->length();
+        if (array_length <= 0) continue;
+        const uint8_t* array_bytes = GetValuesAsBytes(*arrays[i]->data());
+        int64_t match_length = GetMatchLength(match_bytes, match_width, array_bytes,
+                                              remaining_offset, array_length);
+        total_match_length += match_length;
+        remaining_offset = 0;
+        if (match_length < array_length - remaining_offset) break;
+      }
+      bool extends = extend(match_bytes);
+      return MakeSegment(chunked_array->length(), offset, total_match_length, extends);
+    }
+    remaining_offset -= array_length;
+  }
+  return Status::Invalid("segmenting invalid chunked array value");
+}
+
+struct NoKeysGroupingSegmenter : public BaseGroupingSegmenter {
+  static std::unique_ptr<GroupingSegmenter> Make() {
+    return std::make_unique<NoKeysGroupingSegmenter>();
+  }
+
+  NoKeysGroupingSegmenter() : BaseGroupingSegmenter({}) {}
+
+  Status Reset() override { return Status::OK(); }
+
+  template <typename Batch>
+  Result<GroupingSegment> GetNextSegmentImpl(const Batch& batch, int64_t offset) {
+    ARROW_RETURN_NOT_OK(CheckForGetNextSegment(batch, offset, {}));
+    return MakeSegment(batch.length, offset, batch.length - offset, kDefaultExtends);
+  }
+
+  Result<GroupingSegment> GetNextSegment(const ExecSpan& batch, int64_t offset) override {
+    return GetNextSegmentImpl(batch, offset);
+  }
+
+  Result<GroupingSegment> GetNextSegment(const ExecBatch& batch,
+                                         int64_t offset) override {
+    return GetNextSegmentImpl(batch, offset);
+  }
+};
+
+struct SimpleKeyGroupingSegmenter : public BaseGroupingSegmenter {
+  static Result<std::unique_ptr<GroupingSegmenter>> Make(TypeHolder key_type) {
+    return std::make_unique<SimpleKeyGroupingSegmenter>(key_type);
+  }
+
+  explicit SimpleKeyGroupingSegmenter(TypeHolder key_type)
+      : BaseGroupingSegmenter({key_type}), key_type_(key_types_[0]), save_key_data_() {}
+
+  Status CheckType(const DataType& type) {
+    if (!is_fixed_width(type)) {
+      return Status::Invalid("SimpleKeyGroupingSegmenter does not support type ", type);
+    }
+    return Status::OK();
+  }
+
+  Status Reset() override {
+    save_key_data_.resize(0);
+    return Status::OK();
+  }
+
+  bool Extend(const void* data) {
+    size_t byte_width = static_cast<size_t>(key_type_.type->byte_width());
+    bool extends = save_key_data_.size() != byte_width
+                       ? kDefaultExtends
+                       : 0 == memcmp(save_key_data_.data(), data, byte_width);
+    save_key_data_.resize(byte_width);
+    memcpy(save_key_data_.data(), data, byte_width);
+    return extends;
+  }
+
+  Result<GroupingSegment> GetNextSegment(const Scalar& scalar, int64_t offset,
+                                         int64_t length) {
+    ARROW_RETURN_NOT_OK(CheckType(*scalar.type));
+    if (!scalar.is_valid) {
+      return Status::Invalid("segmenting an invalid scalar");
+    }
+    auto data = checked_cast<const PrimitiveScalarBase&>(scalar).data();
+    bool extends = length > 0 ? Extend(data) : kEmptyExtends;
+    return MakeSegment(length, 0, length, extends);
+  }
+
+  Result<GroupingSegment> GetNextSegment(const DataType& array_type,
+                                         const uint8_t* array_bytes, int64_t offset,
+                                         int64_t length) {
+    RETURN_NOT_OK(CheckType(array_type));
+    int64_t byte_width = array_type.byte_width();
+    int64_t match_length = GetMatchLength(array_bytes + offset * byte_width, byte_width,
+                                          array_bytes, offset, length);
+    bool extends = length > 0 ? Extend(array_bytes + offset * byte_width) : kEmptyExtends;
+    return MakeSegment(length, offset, match_length, extends);
+  }
+
+  Result<GroupingSegment> GetNextSegment(const ExecSpan& batch, int64_t offset) override {
+    ARROW_RETURN_NOT_OK(CheckForGetNextSegment(batch, offset, {key_type_}));
+    if (offset == batch.length) {
+      return MakeSegment(batch.length, offset, 0, kEmptyExtends);
+    }
+    const auto& value = batch.values[0];
+    if (value.is_scalar()) {
+      return GetNextSegment(*value.scalar, offset, batch.length);
+    }
+    ARROW_DCHECK(value.is_array());
+    const auto& array = value.array;
+    if (array.GetNullCount() > 0) {
+      return Status::NotImplemented("segmenting a nullable array");
+    }
+    return GetNextSegment(*array.type, GetValuesAsBytes(array), offset, batch.length);
+  }
+
+  Result<GroupingSegment> GetNextSegment(const ExecBatch& batch,
+                                         int64_t offset) override {
+    ARROW_RETURN_NOT_OK(CheckForGetNextSegment(batch, offset, {key_type_}));
+    if (offset == batch.length) {
+      return MakeSegment(batch.length, offset, 0, kEmptyExtends);
+    }
+    const auto& value = batch.values[0];
+    if (value.is_scalar()) {
+      return GetNextSegment(*value.scalar(), offset, batch.length);
+    }
+    if (value.is_array()) {
+      auto array = value.array();
+      if (array->GetNullCount() > 0) {
+        return Status::NotImplemented("segmenting a nullable array");
+      }
+      return GetNextSegment(*array->type, GetValuesAsBytes(*array), offset, batch.length);
+    }
+    if (value.is_chunked_array()) {
+      auto array = value.chunked_array();
+      if (array->null_count() > 0) {
+        return Status::NotImplemented("segmenting a nullable array");
+      }
+      return GetNextSegmentChunked(array, offset, bound_extend_);
+    }
+    return Status::Invalid("segmenting unsupported value kind ", value.kind());
+  }
+
+ private:
+  TypeHolder key_type_;
+  std::vector<uint8_t> save_key_data_;
+  ExtendFunc bound_extend_ = [this](const void* data) { return Extend(data); };

Review Comment:
   If you get rid of `GetNextSegmentChunked` you can get rid of this too.



##########
cpp/src/arrow/compute/exec/aggregate_node.cc:
##########
@@ -326,46 +446,86 @@ class ScalarAggregateNode : public ExecNode, public TracedNode {
   }
 
  private:
-  Status Finish() {
-    auto scope = TraceFinish();
+  Status ReconstructAggregates() {
+    const auto& input_schema = *inputs()[0]->output_schema();
+    auto exec_ctx = plan()->query_context()->exec_context();
+    for (size_t i = 0; i < kernels_.size(); ++i) {
+      std::vector<TypeHolder> in_types;
+      for (const auto& target : target_fieldsets_[i]) {
+        in_types.emplace_back(input_schema.field(target)->type().get());
+      }
+      states_[i].resize(plan()->query_context()->max_concurrency());
+      KernelContext kernel_ctx{exec_ctx};
+      RETURN_NOT_OK(Kernel::InitAll(
+          &kernel_ctx, KernelInitArgs{kernels_[i], in_types, aggs_[i].options.get()},
+          &states_[i]));
+    }
+    return Status::OK();
+  }
+
+  Status OutputResult(bool is_last = false, bool traced = false) {
+    if (is_last && !traced) {
+      auto scope = TraceFinish();
+      return OutputResult(is_last, /*traced=*/true);
+    }
+    GatedUniqueLock lock(gated_shared_mutex_);
     ExecBatch batch{{}, 1};
-    batch.values.resize(kernels_.size());
+    batch.values.resize(kernels_.size() + segment_field_ids_.size());
 
     for (size_t i = 0; i < kernels_.size(); ++i) {
       util::tracing::Span span;
       START_COMPUTE_SPAN(span, aggs_[i].function,
                          {{"function.name", aggs_[i].function},
                           {"function.options",
                            aggs_[i].options ? aggs_[i].options->ToString() : "<NULLPTR>"},
-                          {"function.kind", std::string(kind_name()) + "::Finalize"}});
+                          {"function.kind", std::string(kind_name()) + "::Output"}});
       KernelContext ctx{plan()->query_context()->exec_context()};
       ARROW_ASSIGN_OR_RAISE(auto merged, ScalarAggregateKernel::MergeAll(
                                              kernels_[i], &ctx, std::move(states_[i])));
       RETURN_NOT_OK(kernels_[i]->finalize(&ctx, &batch.values[i]));
     }
+    PlaceFields(batch, kernels_.size(), segmenter_values_);
 
-    return output_->InputReceived(this, std::move(batch));
+    ARROW_RETURN_NOT_OK(output_->InputReceived(this, std::move(batch)));
+    total_output_batches_++;
+    if (is_last) {
+      ARROW_RETURN_NOT_OK(output_->InputFinished(this, total_output_batches_));
+    } else {
+      ARROW_RETURN_NOT_OK(ReconstructAggregates());
+    }
+    return Status::OK();
   }
 
+  std::unique_ptr<GroupingSegmenter> segmenter_;
+  const std::vector<int> segment_field_ids_;
+  std::vector<Datum> segmenter_values_;
+
   const std::vector<std::vector<int>> target_fieldsets_;
   const std::vector<Aggregate> aggs_;
   const std::vector<const ScalarAggregateKernel*> kernels_;
 
   std::vector<std::vector<std::unique_ptr<KernelState>>> states_;
 
   AtomicCounter input_counter_;
+  int64_t total_output_batches_ = 0;

Review Comment:
   This should probably be `int` as require batch counts to be `int` everywhere else.



##########
cpp/src/arrow/compute/row/grouper.h:
##########
@@ -30,6 +30,49 @@
 namespace arrow {
 namespace compute {
 
+/// \brief A segment of contiguous rows for grouping
+struct ARROW_EXPORT GroupingSegment {
+  int64_t offset;
+  int64_t length;
+  bool is_open;
+  bool extends;
+};
+
+inline bool operator==(const GroupingSegment& segment1, const GroupingSegment& segment2) {
+  return segment1.offset == segment2.offset && segment1.length == segment2.length &&
+         segment1.is_open == segment2.is_open && segment1.extends == segment2.extends;
+}
+inline bool operator!=(const GroupingSegment& segment1, const GroupingSegment& segment2) {
+  return !(segment1 == segment2);
+}
+
+/// \brief Computes grouping segments for a batch. Each segment covers rows with identical
+/// values in the batch. The values in the batch are often selected as keys from a larger
+/// batch.
+class ARROW_EXPORT GroupingSegmenter {
+ public:
+  virtual ~GroupingSegmenter() = default;
+
+  /// \brief Construct a GroupingSegmenter which receives the specified key types
+  static Result<std::unique_ptr<GroupingSegmenter>> Make(
+      const std::vector<TypeHolder>& key_types, bool nullable_keys = false,

Review Comment:
   Can we clarify what `nullable_keys` means?



##########
cpp/src/arrow/compute/kernels/hash_aggregate_test.cc:
##########
@@ -135,22 +141,84 @@ Result<Datum> NaiveGroupBy(std::vector<Datum> arguments, std::vector<Datum> keys
   return Take(struct_arr, sorted_indices);
 }
 
+Result<Datum> MakeGroupByOutput(const std::vector<ExecBatch>& output_batches,
+                                const std::shared_ptr<Schema> output_schema,
+                                size_t num_aggregates, size_t num_keys, bool naive) {
+  ArrayVector out_arrays(num_aggregates + num_keys);
+  for (size_t i = 0; i < out_arrays.size(); ++i) {
+    std::vector<std::shared_ptr<Array>> arrays(output_batches.size());
+    for (size_t j = 0; j < output_batches.size(); ++j) {
+      arrays[j] = output_batches[j].values[i].make_array();
+    }
+    if (arrays.empty()) {
+      ARROW_ASSIGN_OR_RAISE(
+          out_arrays[i],
+          MakeArrayOfNull(output_schema->field(static_cast<int>(i))->type(),
+                          /*length=*/0));
+    } else {
+      ARROW_ASSIGN_OR_RAISE(out_arrays[i], Concatenate(arrays));
+    }
+  }
+
+  ARROW_ASSIGN_OR_RAISE(
+      std::shared_ptr<Array> struct_arr,
+      StructArray::Make(std::move(out_arrays), output_schema->fields()));
+
+  bool need_sort = !naive;
+  for (size_t i = num_aggregates; need_sort && i < out_arrays.size(); i++) {
+    if (output_schema->field(i)->type()->id() == Type::DICTIONARY) {

Review Comment:
   ```suggestion
       if (output_schema->field(static_cast<int>(i))->type()->id() == Type::DICTIONARY) {
   ```



##########
cpp/src/arrow/compute/row/grouper.cc:
##########
@@ -39,12 +43,521 @@
 namespace arrow {
 
 using internal::checked_cast;
+using internal::PrimitiveScalarBase;
 
 namespace compute {
 
 namespace {
 
-struct GrouperImpl : Grouper {
+constexpr uint32_t kNoGroupId = std::numeric_limits<uint32_t>::max();
+
+using group_id_t = std::remove_const<decltype(kNoGroupId)>::type;
+using GroupIdType = CTypeTraits<group_id_t>::ArrowType;
+auto group_id_type = std::make_shared<GroupIdType>();
+
+inline const uint8_t* GetValuesAsBytes(const ArrayData& data, int64_t offset = 0) {
+  DCHECK_GT(data.type->byte_width(), 0);
+  int64_t absolute_byte_offset = (data.offset + offset) * data.type->byte_width();
+  return data.GetValues<uint8_t>(1, absolute_byte_offset);
+}
+
+inline const uint8_t* GetValuesAsBytes(const ArraySpan& data, int64_t offset = 0) {
+  DCHECK_GT(data.type->byte_width(), 0);
+  int64_t absolute_byte_offset = (data.offset + offset) * data.type->byte_width();
+  return data.GetValues<uint8_t>(1, absolute_byte_offset);
+}
+
+template <typename Value>
+Status CheckForGetNextSegment(const std::vector<Value>& values, int64_t length,
+                              int64_t offset, const std::vector<TypeHolder>& key_types) {
+  if (offset < 0 || offset > length) {
+    return Status::Invalid("invalid grouping segmenter offset: ", offset);
+  }
+  if (values.size() != key_types.size()) {
+    return Status::Invalid("expected batch size ", key_types.size(), " but got ",
+                           values.size());
+  }
+  for (size_t i = 0; i < key_types.size(); i++) {
+    const auto& value = values[i];
+    const auto& key_type = key_types[i];
+    if (*value.type() != *key_type.type) {
+      return Status::Invalid("expected batch value ", i, " of type ", *key_type.type,
+                             " but got ", *value.type());
+    }
+  }
+  return Status::OK();
+}
+
+template <typename Batch>
+enable_if_t<std::is_same<Batch, ExecSpan>::value || std::is_same<Batch, ExecBatch>::value,
+            Status>
+CheckForGetNextSegment(const Batch& batch, int64_t offset,
+                       const std::vector<TypeHolder>& key_types) {
+  return CheckForGetNextSegment(batch.values, batch.length, offset, key_types);
+}
+
+struct BaseGroupingSegmenter : public GroupingSegmenter {
+  explicit BaseGroupingSegmenter(const std::vector<TypeHolder>& key_types)
+      : key_types_(key_types) {}
+
+  const std::vector<TypeHolder>& key_types() const override { return key_types_; }
+
+  std::vector<TypeHolder> key_types_;
+};
+
+GroupingSegment MakeSegment(int64_t batch_length, int64_t offset, int64_t length,
+                            bool extends) {
+  return GroupingSegment{offset, length, offset + length >= batch_length, extends};
+}
+
+int64_t GetMatchLength(const uint8_t* match_bytes, int64_t match_width,
+                       const uint8_t* array_bytes, int64_t offset, int64_t length) {
+  int64_t cursor, byte_cursor;
+  for (cursor = offset, byte_cursor = match_width * cursor; cursor < length;
+       cursor++, byte_cursor += match_width) {
+    if (memcmp(match_bytes, array_bytes + byte_cursor,
+               static_cast<size_t>(match_width)) != 0) {
+      break;
+    }
+  }
+  return std::min(cursor, length - offset);
+}
+
+using ExtendFunc = std::function<bool(const void*)>;
+constexpr bool kDefaultExtends = true;
+constexpr bool kEmptyExtends = true;
+
+Result<GroupingSegment> GetNextSegmentChunked(
+    const std::shared_ptr<ChunkedArray>& chunked_array, int64_t offset,
+    ExtendFunc extend) {
+  if (offset >= chunked_array->length()) {
+    return MakeSegment(chunked_array->length(), chunked_array->length(), 0,
+                       kEmptyExtends);
+  }
+  int64_t remaining_offset = offset;
+  const auto& arrays = chunked_array->chunks();
+  for (size_t i = 0; remaining_offset >= 0 && i < arrays.size(); i++) {
+    // look up chunk containing offset
+    int64_t array_length = arrays[i]->length();
+    if (remaining_offset < array_length) {
+      // found - switch to matching
+      int64_t match_width = arrays[i]->type()->byte_width();
+      const uint8_t* match_bytes = GetValuesAsBytes(*arrays[i]->data(), remaining_offset);
+      int64_t total_match_length = 0;
+      for (; i < arrays.size(); i++) {
+        int64_t array_length = arrays[i]->length();
+        if (array_length <= 0) continue;
+        const uint8_t* array_bytes = GetValuesAsBytes(*arrays[i]->data());
+        int64_t match_length = GetMatchLength(match_bytes, match_width, array_bytes,
+                                              remaining_offset, array_length);
+        total_match_length += match_length;
+        remaining_offset = 0;
+        if (match_length < array_length - remaining_offset) break;
+      }
+      bool extends = extend(match_bytes);
+      return MakeSegment(chunked_array->length(), offset, total_match_length, extends);
+    }
+    remaining_offset -= array_length;
+  }
+  return Status::Invalid("segmenting invalid chunked array value");
+}
+
+struct NoKeysGroupingSegmenter : public BaseGroupingSegmenter {
+  static std::unique_ptr<GroupingSegmenter> Make() {
+    return std::make_unique<NoKeysGroupingSegmenter>();
+  }
+
+  NoKeysGroupingSegmenter() : BaseGroupingSegmenter({}) {}
+
+  Status Reset() override { return Status::OK(); }
+
+  template <typename Batch>
+  Result<GroupingSegment> GetNextSegmentImpl(const Batch& batch, int64_t offset) {
+    ARROW_RETURN_NOT_OK(CheckForGetNextSegment(batch, offset, {}));
+    return MakeSegment(batch.length, offset, batch.length - offset, kDefaultExtends);
+  }
+
+  Result<GroupingSegment> GetNextSegment(const ExecSpan& batch, int64_t offset) override {
+    return GetNextSegmentImpl(batch, offset);
+  }
+
+  Result<GroupingSegment> GetNextSegment(const ExecBatch& batch,
+                                         int64_t offset) override {
+    return GetNextSegmentImpl(batch, offset);
+  }
+};
+
+struct SimpleKeyGroupingSegmenter : public BaseGroupingSegmenter {
+  static Result<std::unique_ptr<GroupingSegmenter>> Make(TypeHolder key_type) {
+    return std::make_unique<SimpleKeyGroupingSegmenter>(key_type);
+  }
+
+  explicit SimpleKeyGroupingSegmenter(TypeHolder key_type)
+      : BaseGroupingSegmenter({key_type}), key_type_(key_types_[0]), save_key_data_() {}
+
+  Status CheckType(const DataType& type) {
+    if (!is_fixed_width(type)) {
+      return Status::Invalid("SimpleKeyGroupingSegmenter does not support type ", type);
+    }
+    return Status::OK();
+  }
+
+  Status Reset() override {
+    save_key_data_.resize(0);
+    return Status::OK();
+  }
+
+  bool Extend(const void* data) {
+    size_t byte_width = static_cast<size_t>(key_type_.type->byte_width());
+    bool extends = save_key_data_.size() != byte_width
+                       ? kDefaultExtends
+                       : 0 == memcmp(save_key_data_.data(), data, byte_width);
+    save_key_data_.resize(byte_width);
+    memcpy(save_key_data_.data(), data, byte_width);
+    return extends;
+  }
+
+  Result<GroupingSegment> GetNextSegment(const Scalar& scalar, int64_t offset,
+                                         int64_t length) {
+    ARROW_RETURN_NOT_OK(CheckType(*scalar.type));
+    if (!scalar.is_valid) {
+      return Status::Invalid("segmenting an invalid scalar");
+    }
+    auto data = checked_cast<const PrimitiveScalarBase&>(scalar).data();
+    bool extends = length > 0 ? Extend(data) : kEmptyExtends;
+    return MakeSegment(length, 0, length, extends);
+  }
+
+  Result<GroupingSegment> GetNextSegment(const DataType& array_type,
+                                         const uint8_t* array_bytes, int64_t offset,
+                                         int64_t length) {
+    RETURN_NOT_OK(CheckType(array_type));
+    int64_t byte_width = array_type.byte_width();
+    int64_t match_length = GetMatchLength(array_bytes + offset * byte_width, byte_width,
+                                          array_bytes, offset, length);
+    bool extends = length > 0 ? Extend(array_bytes + offset * byte_width) : kEmptyExtends;
+    return MakeSegment(length, offset, match_length, extends);
+  }
+
+  Result<GroupingSegment> GetNextSegment(const ExecSpan& batch, int64_t offset) override {
+    ARROW_RETURN_NOT_OK(CheckForGetNextSegment(batch, offset, {key_type_}));
+    if (offset == batch.length) {
+      return MakeSegment(batch.length, offset, 0, kEmptyExtends);
+    }
+    const auto& value = batch.values[0];
+    if (value.is_scalar()) {
+      return GetNextSegment(*value.scalar, offset, batch.length);
+    }
+    ARROW_DCHECK(value.is_array());
+    const auto& array = value.array;
+    if (array.GetNullCount() > 0) {
+      return Status::NotImplemented("segmenting a nullable array");
+    }
+    return GetNextSegment(*array.type, GetValuesAsBytes(array), offset, batch.length);
+  }
+
+  Result<GroupingSegment> GetNextSegment(const ExecBatch& batch,
+                                         int64_t offset) override {
+    ARROW_RETURN_NOT_OK(CheckForGetNextSegment(batch, offset, {key_type_}));
+    if (offset == batch.length) {
+      return MakeSegment(batch.length, offset, 0, kEmptyExtends);
+    }
+    const auto& value = batch.values[0];
+    if (value.is_scalar()) {
+      return GetNextSegment(*value.scalar(), offset, batch.length);
+    }
+    if (value.is_array()) {
+      auto array = value.array();
+      if (array->GetNullCount() > 0) {
+        return Status::NotImplemented("segmenting a nullable array");
+      }
+      return GetNextSegment(*array->type, GetValuesAsBytes(*array), offset, batch.length);
+    }
+    if (value.is_chunked_array()) {
+      auto array = value.chunked_array();
+      if (array->null_count() > 0) {
+        return Status::NotImplemented("segmenting a nullable array");
+      }
+      return GetNextSegmentChunked(array, offset, bound_extend_);
+    }
+    return Status::Invalid("segmenting unsupported value kind ", value.kind());
+  }
+
+ private:
+  TypeHolder key_type_;
+  std::vector<uint8_t> save_key_data_;
+  ExtendFunc bound_extend_ = [this](const void* data) { return Extend(data); };
+};
+
+struct AnyKeysGroupingSegmenter : public BaseGroupingSegmenter {
+  static Result<std::unique_ptr<GroupingSegmenter>> Make(
+      const std::vector<TypeHolder>& key_types, ExecContext* ctx) {
+    ARROW_RETURN_NOT_OK(Grouper::Make(key_types, ctx));  // check types
+    return std::make_unique<AnyKeysGroupingSegmenter>(key_types, ctx);
+  }
+
+  AnyKeysGroupingSegmenter(const std::vector<TypeHolder>& key_types, ExecContext* ctx)
+      : BaseGroupingSegmenter(key_types),
+        ctx_(ctx),
+        grouper_(nullptr),
+        save_group_id_(kNoGroupId) {}
+
+  Status Reset() override {
+    grouper_ = nullptr;
+    save_group_id_ = kNoGroupId;
+    return Status::OK();
+  }
+
+  bool Extend(const void* data) {
+    auto group_id = *static_cast<const group_id_t*>(data);
+    bool extends =
+        save_group_id_ == kNoGroupId ? kDefaultExtends : save_group_id_ == group_id;
+    save_group_id_ = group_id;
+    return extends;
+  }
+
+  template <typename Batch>
+  Result<group_id_t> MapGroupIdAt(const Batch& batch, int64_t offset) {
+    if (offset < 0 || offset >= batch.length) {
+      return Status::Invalid("requesting group id out of bounds");
+    }
+    if (!grouper_) return kNoGroupId;
+    ARROW_ASSIGN_OR_RAISE(auto datum, grouper_->Consume(batch, offset,
+                                                        /*consume_length=*/1));
+    if (!(datum.is_array() || datum.is_chunked_array())) {
+      return Status::Invalid("accessing unsupported datum kind ", datum.kind());
+    }
+    const std::shared_ptr<ArrayData>& data =
+        datum.is_array() ? datum.array() : datum.chunked_array()->chunk(0)->data();
+    ARROW_DCHECK(data->GetNullCount() == 0);
+    DCHECK_EQ(data->type->id(), GroupIdType::type_id);
+    DCHECK_EQ(1, data->length);
+    const group_id_t* values = data->GetValues<group_id_t>(1);
+    return values[0];
+  }
+
+  template <typename Batch>
+  Result<GroupingSegment> GetNextSegmentImpl(const Batch& batch, int64_t offset) {
+    ARROW_RETURN_NOT_OK(CheckForGetNextSegment(batch, offset, key_types_));
+    if (offset == batch.length) {
+      return MakeSegment(batch.length, offset, 0, kEmptyExtends);
+    }
+    // ARROW-18311: make Grouper support Reset()
+    // so it can be cached instead of recreated below
+    //
+    // the group id must be computed prior to resetting the grouper, since it is compared
+    // to save_group_id_, and after resetting the grouper produces incomparable group ids
+    ARROW_ASSIGN_OR_RAISE(auto group_id, MapGroupIdAt(batch, offset));
+    ExtendFunc bound_extend = [this, group_id](const void* data) {
+      bool extends = Extend(&group_id);
+      save_group_id_ = *static_cast<const group_id_t*>(data);
+      return extends;
+    };
+    ARROW_ASSIGN_OR_RAISE(grouper_, Grouper::Make(key_types_, ctx_));  // TODO: reset it
+    ARROW_ASSIGN_OR_RAISE(auto datum, grouper_->Consume(batch, offset));
+    if (datum.is_array()) {
+      const std::shared_ptr<ArrayData>& data = datum.array();
+      ARROW_DCHECK(data->GetNullCount() == 0);
+      DCHECK_EQ(data->type->id(), GroupIdType::type_id);
+      const group_id_t* values = data->GetValues<group_id_t>(1);
+      int64_t cursor;
+      for (cursor = 1; cursor < data->length; cursor++) {
+        if (values[0] != values[cursor]) break;

Review Comment:
   binary search?



##########
cpp/src/arrow/compute/row/grouper.cc:
##########
@@ -39,12 +43,521 @@
 namespace arrow {
 
 using internal::checked_cast;
+using internal::PrimitiveScalarBase;
 
 namespace compute {
 
 namespace {
 
-struct GrouperImpl : Grouper {
+constexpr uint32_t kNoGroupId = std::numeric_limits<uint32_t>::max();
+
+using group_id_t = std::remove_const<decltype(kNoGroupId)>::type;
+using GroupIdType = CTypeTraits<group_id_t>::ArrowType;
+auto group_id_type = std::make_shared<GroupIdType>();
+
+inline const uint8_t* GetValuesAsBytes(const ArrayData& data, int64_t offset = 0) {
+  DCHECK_GT(data.type->byte_width(), 0);
+  int64_t absolute_byte_offset = (data.offset + offset) * data.type->byte_width();
+  return data.GetValues<uint8_t>(1, absolute_byte_offset);
+}
+
+inline const uint8_t* GetValuesAsBytes(const ArraySpan& data, int64_t offset = 0) {
+  DCHECK_GT(data.type->byte_width(), 0);
+  int64_t absolute_byte_offset = (data.offset + offset) * data.type->byte_width();
+  return data.GetValues<uint8_t>(1, absolute_byte_offset);
+}
+
+template <typename Value>
+Status CheckForGetNextSegment(const std::vector<Value>& values, int64_t length,
+                              int64_t offset, const std::vector<TypeHolder>& key_types) {
+  if (offset < 0 || offset > length) {
+    return Status::Invalid("invalid grouping segmenter offset: ", offset);
+  }
+  if (values.size() != key_types.size()) {
+    return Status::Invalid("expected batch size ", key_types.size(), " but got ",
+                           values.size());
+  }
+  for (size_t i = 0; i < key_types.size(); i++) {
+    const auto& value = values[i];
+    const auto& key_type = key_types[i];
+    if (*value.type() != *key_type.type) {
+      return Status::Invalid("expected batch value ", i, " of type ", *key_type.type,
+                             " but got ", *value.type());
+    }
+  }
+  return Status::OK();
+}
+
+template <typename Batch>
+enable_if_t<std::is_same<Batch, ExecSpan>::value || std::is_same<Batch, ExecBatch>::value,
+            Status>
+CheckForGetNextSegment(const Batch& batch, int64_t offset,
+                       const std::vector<TypeHolder>& key_types) {
+  return CheckForGetNextSegment(batch.values, batch.length, offset, key_types);
+}
+
+struct BaseGroupingSegmenter : public GroupingSegmenter {
+  explicit BaseGroupingSegmenter(const std::vector<TypeHolder>& key_types)
+      : key_types_(key_types) {}
+
+  const std::vector<TypeHolder>& key_types() const override { return key_types_; }
+
+  std::vector<TypeHolder> key_types_;
+};
+
+GroupingSegment MakeSegment(int64_t batch_length, int64_t offset, int64_t length,
+                            bool extends) {
+  return GroupingSegment{offset, length, offset + length >= batch_length, extends};
+}
+
+int64_t GetMatchLength(const uint8_t* match_bytes, int64_t match_width,
+                       const uint8_t* array_bytes, int64_t offset, int64_t length) {
+  int64_t cursor, byte_cursor;
+  for (cursor = offset, byte_cursor = match_width * cursor; cursor < length;
+       cursor++, byte_cursor += match_width) {
+    if (memcmp(match_bytes, array_bytes + byte_cursor,
+               static_cast<size_t>(match_width)) != 0) {
+      break;
+    }
+  }
+  return std::min(cursor, length - offset);
+}
+
+using ExtendFunc = std::function<bool(const void*)>;
+constexpr bool kDefaultExtends = true;
+constexpr bool kEmptyExtends = true;
+
+Result<GroupingSegment> GetNextSegmentChunked(
+    const std::shared_ptr<ChunkedArray>& chunked_array, int64_t offset,
+    ExtendFunc extend) {
+  if (offset >= chunked_array->length()) {
+    return MakeSegment(chunked_array->length(), chunked_array->length(), 0,
+                       kEmptyExtends);
+  }
+  int64_t remaining_offset = offset;
+  const auto& arrays = chunked_array->chunks();
+  for (size_t i = 0; remaining_offset >= 0 && i < arrays.size(); i++) {
+    // look up chunk containing offset
+    int64_t array_length = arrays[i]->length();
+    if (remaining_offset < array_length) {
+      // found - switch to matching
+      int64_t match_width = arrays[i]->type()->byte_width();
+      const uint8_t* match_bytes = GetValuesAsBytes(*arrays[i]->data(), remaining_offset);
+      int64_t total_match_length = 0;
+      for (; i < arrays.size(); i++) {
+        int64_t array_length = arrays[i]->length();
+        if (array_length <= 0) continue;
+        const uint8_t* array_bytes = GetValuesAsBytes(*arrays[i]->data());
+        int64_t match_length = GetMatchLength(match_bytes, match_width, array_bytes,
+                                              remaining_offset, array_length);
+        total_match_length += match_length;
+        remaining_offset = 0;
+        if (match_length < array_length - remaining_offset) break;
+      }
+      bool extends = extend(match_bytes);
+      return MakeSegment(chunked_array->length(), offset, total_match_length, extends);
+    }
+    remaining_offset -= array_length;
+  }
+  return Status::Invalid("segmenting invalid chunked array value");
+}
+
+struct NoKeysGroupingSegmenter : public BaseGroupingSegmenter {
+  static std::unique_ptr<GroupingSegmenter> Make() {
+    return std::make_unique<NoKeysGroupingSegmenter>();
+  }
+
+  NoKeysGroupingSegmenter() : BaseGroupingSegmenter({}) {}
+
+  Status Reset() override { return Status::OK(); }
+
+  template <typename Batch>
+  Result<GroupingSegment> GetNextSegmentImpl(const Batch& batch, int64_t offset) {
+    ARROW_RETURN_NOT_OK(CheckForGetNextSegment(batch, offset, {}));
+    return MakeSegment(batch.length, offset, batch.length - offset, kDefaultExtends);
+  }
+
+  Result<GroupingSegment> GetNextSegment(const ExecSpan& batch, int64_t offset) override {
+    return GetNextSegmentImpl(batch, offset);
+  }
+
+  Result<GroupingSegment> GetNextSegment(const ExecBatch& batch,
+                                         int64_t offset) override {
+    return GetNextSegmentImpl(batch, offset);
+  }
+};
+
+struct SimpleKeyGroupingSegmenter : public BaseGroupingSegmenter {
+  static Result<std::unique_ptr<GroupingSegmenter>> Make(TypeHolder key_type) {
+    return std::make_unique<SimpleKeyGroupingSegmenter>(key_type);
+  }
+
+  explicit SimpleKeyGroupingSegmenter(TypeHolder key_type)
+      : BaseGroupingSegmenter({key_type}), key_type_(key_types_[0]), save_key_data_() {}
+
+  Status CheckType(const DataType& type) {
+    if (!is_fixed_width(type)) {
+      return Status::Invalid("SimpleKeyGroupingSegmenter does not support type ", type);
+    }
+    return Status::OK();
+  }
+
+  Status Reset() override {
+    save_key_data_.resize(0);
+    return Status::OK();
+  }
+
+  bool Extend(const void* data) {
+    size_t byte_width = static_cast<size_t>(key_type_.type->byte_width());
+    bool extends = save_key_data_.size() != byte_width
+                       ? kDefaultExtends
+                       : 0 == memcmp(save_key_data_.data(), data, byte_width);
+    save_key_data_.resize(byte_width);
+    memcpy(save_key_data_.data(), data, byte_width);
+    return extends;
+  }
+
+  Result<GroupingSegment> GetNextSegment(const Scalar& scalar, int64_t offset,
+                                         int64_t length) {
+    ARROW_RETURN_NOT_OK(CheckType(*scalar.type));
+    if (!scalar.is_valid) {
+      return Status::Invalid("segmenting an invalid scalar");
+    }
+    auto data = checked_cast<const PrimitiveScalarBase&>(scalar).data();
+    bool extends = length > 0 ? Extend(data) : kEmptyExtends;
+    return MakeSegment(length, 0, length, extends);
+  }
+
+  Result<GroupingSegment> GetNextSegment(const DataType& array_type,
+                                         const uint8_t* array_bytes, int64_t offset,
+                                         int64_t length) {
+    RETURN_NOT_OK(CheckType(array_type));
+    int64_t byte_width = array_type.byte_width();
+    int64_t match_length = GetMatchLength(array_bytes + offset * byte_width, byte_width,
+                                          array_bytes, offset, length);
+    bool extends = length > 0 ? Extend(array_bytes + offset * byte_width) : kEmptyExtends;
+    return MakeSegment(length, offset, match_length, extends);
+  }
+
+  Result<GroupingSegment> GetNextSegment(const ExecSpan& batch, int64_t offset) override {
+    ARROW_RETURN_NOT_OK(CheckForGetNextSegment(batch, offset, {key_type_}));
+    if (offset == batch.length) {
+      return MakeSegment(batch.length, offset, 0, kEmptyExtends);
+    }
+    const auto& value = batch.values[0];
+    if (value.is_scalar()) {
+      return GetNextSegment(*value.scalar, offset, batch.length);
+    }
+    ARROW_DCHECK(value.is_array());
+    const auto& array = value.array;
+    if (array.GetNullCount() > 0) {
+      return Status::NotImplemented("segmenting a nullable array");
+    }
+    return GetNextSegment(*array.type, GetValuesAsBytes(array), offset, batch.length);
+  }
+
+  Result<GroupingSegment> GetNextSegment(const ExecBatch& batch,
+                                         int64_t offset) override {
+    ARROW_RETURN_NOT_OK(CheckForGetNextSegment(batch, offset, {key_type_}));
+    if (offset == batch.length) {
+      return MakeSegment(batch.length, offset, 0, kEmptyExtends);
+    }
+    const auto& value = batch.values[0];
+    if (value.is_scalar()) {
+      return GetNextSegment(*value.scalar(), offset, batch.length);
+    }
+    if (value.is_array()) {
+      auto array = value.array();
+      if (array->GetNullCount() > 0) {
+        return Status::NotImplemented("segmenting a nullable array");
+      }
+      return GetNextSegment(*array->type, GetValuesAsBytes(*array), offset, batch.length);
+    }
+    if (value.is_chunked_array()) {
+      auto array = value.chunked_array();
+      if (array->null_count() > 0) {
+        return Status::NotImplemented("segmenting a nullable array");
+      }
+      return GetNextSegmentChunked(array, offset, bound_extend_);
+    }
+    return Status::Invalid("segmenting unsupported value kind ", value.kind());
+  }
+
+ private:
+  TypeHolder key_type_;
+  std::vector<uint8_t> save_key_data_;
+  ExtendFunc bound_extend_ = [this](const void* data) { return Extend(data); };
+};
+
+struct AnyKeysGroupingSegmenter : public BaseGroupingSegmenter {
+  static Result<std::unique_ptr<GroupingSegmenter>> Make(
+      const std::vector<TypeHolder>& key_types, ExecContext* ctx) {
+    ARROW_RETURN_NOT_OK(Grouper::Make(key_types, ctx));  // check types
+    return std::make_unique<AnyKeysGroupingSegmenter>(key_types, ctx);
+  }
+
+  AnyKeysGroupingSegmenter(const std::vector<TypeHolder>& key_types, ExecContext* ctx)
+      : BaseGroupingSegmenter(key_types),
+        ctx_(ctx),
+        grouper_(nullptr),
+        save_group_id_(kNoGroupId) {}
+
+  Status Reset() override {
+    grouper_ = nullptr;
+    save_group_id_ = kNoGroupId;
+    return Status::OK();
+  }
+
+  bool Extend(const void* data) {
+    auto group_id = *static_cast<const group_id_t*>(data);
+    bool extends =
+        save_group_id_ == kNoGroupId ? kDefaultExtends : save_group_id_ == group_id;
+    save_group_id_ = group_id;
+    return extends;
+  }
+
+  template <typename Batch>
+  Result<group_id_t> MapGroupIdAt(const Batch& batch, int64_t offset) {
+    if (offset < 0 || offset >= batch.length) {
+      return Status::Invalid("requesting group id out of bounds");
+    }
+    if (!grouper_) return kNoGroupId;
+    ARROW_ASSIGN_OR_RAISE(auto datum, grouper_->Consume(batch, offset,
+                                                        /*consume_length=*/1));
+    if (!(datum.is_array() || datum.is_chunked_array())) {
+      return Status::Invalid("accessing unsupported datum kind ", datum.kind());
+    }
+    const std::shared_ptr<ArrayData>& data =
+        datum.is_array() ? datum.array() : datum.chunked_array()->chunk(0)->data();
+    ARROW_DCHECK(data->GetNullCount() == 0);
+    DCHECK_EQ(data->type->id(), GroupIdType::type_id);
+    DCHECK_EQ(1, data->length);
+    const group_id_t* values = data->GetValues<group_id_t>(1);
+    return values[0];
+  }
+
+  template <typename Batch>
+  Result<GroupingSegment> GetNextSegmentImpl(const Batch& batch, int64_t offset) {
+    ARROW_RETURN_NOT_OK(CheckForGetNextSegment(batch, offset, key_types_));
+    if (offset == batch.length) {
+      return MakeSegment(batch.length, offset, 0, kEmptyExtends);
+    }
+    // ARROW-18311: make Grouper support Reset()
+    // so it can be cached instead of recreated below
+    //
+    // the group id must be computed prior to resetting the grouper, since it is compared
+    // to save_group_id_, and after resetting the grouper produces incomparable group ids
+    ARROW_ASSIGN_OR_RAISE(auto group_id, MapGroupIdAt(batch, offset));
+    ExtendFunc bound_extend = [this, group_id](const void* data) {
+      bool extends = Extend(&group_id);
+      save_group_id_ = *static_cast<const group_id_t*>(data);
+      return extends;
+    };
+    ARROW_ASSIGN_OR_RAISE(grouper_, Grouper::Make(key_types_, ctx_));  // TODO: reset it
+    ARROW_ASSIGN_OR_RAISE(auto datum, grouper_->Consume(batch, offset));
+    if (datum.is_array()) {

Review Comment:
   What if datum is a scalar?



##########
cpp/src/arrow/compute/exec/aggregate_node.cc:
##########
@@ -169,17 +206,63 @@ void AggregatesToString(std::stringstream* ss, const Schema& input_schema,
   *ss << ']';
 }
 
+template <typename BatchHandler>
+Status HandleSegments(std::unique_ptr<GroupingSegmenter>& segmenter,
+                      const ExecBatch& batch, const std::vector<int>& ids,
+                      const BatchHandler& handle_batch) {
+  int64_t offset = 0;
+  ARROW_ASSIGN_OR_RAISE(auto segment_batch, batch.SelectValues(ids));
+  while (true) {
+    ARROW_ASSIGN_OR_RAISE(auto segment, segmenter->GetNextSegment(segment_batch, offset));
+    if (segment.offset >= segment_batch.length) break;

Review Comment:
   How can this happen?



##########
cpp/src/arrow/compute/kernels/hash_aggregate_test.cc:
##########
@@ -174,81 +242,117 @@ Result<Datum> RunGroupBy(const BatchesWithSchema& input,
   ARROW_ASSIGN_OR_RAISE(std::vector<ExecBatch> output_batches,
                         start_and_collect.MoveResult());
 
-  ArrayVector out_arrays(aggregates.size() + key_names.size());
   const auto& output_schema = plan->nodes()[0]->output()->output_schema();
+  if (!segmented) {
+    return MakeGroupByOutput(output_batches, output_schema, aggregates.size(),
+                             key_names.size(), naive);
+  }
+
+  std::vector<ArrayVector> out_arrays(aggregates.size() + key_names.size() +
+                                      segment_key_names.size());
   for (size_t i = 0; i < out_arrays.size(); ++i) {
     std::vector<std::shared_ptr<Array>> arrays(output_batches.size());
     for (size_t j = 0; j < output_batches.size(); ++j) {
-      arrays[j] = output_batches[j].values[i].make_array();
+      auto& value = output_batches[j].values[i];
+      if (value.is_scalar()) {
+        ARROW_ASSIGN_OR_RAISE(
+            arrays[j], MakeArrayFromScalar(*value.scalar(), output_batches[j].length));
+      } else if (value.is_array()) {
+        arrays[j] = value.make_array();
+      } else {
+        return Status::Invalid("GroupByUsingExecPlan unsupported value kind ",
+                               ToString(value.kind()));
+      }
     }
     if (arrays.empty()) {
+      arrays.resize(1);
       ARROW_ASSIGN_OR_RAISE(
-          out_arrays[i],
-          MakeArrayOfNull(output_schema->field(static_cast<int>(i))->type(),
-                          /*length=*/0));
-    } else {
-      ARROW_ASSIGN_OR_RAISE(out_arrays[i], Concatenate(arrays));
+          arrays[0], MakeArrayOfNull(output_schema->field(static_cast<int>(i))->type(),
+                                     /*length=*/0));
     }
+    out_arrays[i] = {std::move(arrays)};
   }
 
-  // The exec plan may reorder the output rows.  The tests are all setup to expect ouptut
-  // in ascending order of keys.  So we need to sort the result by the key columns.  To do
-  // that we create a table using the key columns, calculate the sort indices from that
-  // table (sorting on all fields) and then use those indices to calculate our result.
-  std::vector<std::shared_ptr<Field>> key_fields;
-  std::vector<std::shared_ptr<Array>> key_columns;
-  std::vector<SortKey> sort_keys;
-  for (std::size_t i = 0; i < key_names.size(); i++) {
-    const std::shared_ptr<Array>& arr = out_arrays[i + aggregates.size()];
-    if (arr->type_id() == Type::DICTIONARY) {
-      // Can't sort dictionary columns so need to decode
-      auto dict_arr = checked_pointer_cast<DictionaryArray>(arr);
-      ARROW_ASSIGN_OR_RAISE(auto decoded_arr,
-                            Take(*dict_arr->dictionary(), *dict_arr->indices()));
-      key_columns.push_back(decoded_arr);
-      key_fields.push_back(
-          field("name_does_not_matter", dict_arr->dict_type()->value_type()));
-    } else {
-      key_columns.push_back(arr);
-      key_fields.push_back(field("name_does_not_matter", arr->type()));
+  if (segmented && segment_key_names.size() > 0) {
+    ArrayVector struct_arrays;
+    struct_arrays.reserve(output_batches.size());
+    for (size_t j = 0; j < output_batches.size(); ++j) {
+      ArrayVector struct_fields;
+      struct_fields.reserve(out_arrays.size());
+      for (auto out_array : out_arrays) {
+        struct_fields.push_back(out_array[j]);
+      }
+      ARROW_ASSIGN_OR_RAISE(auto struct_array,
+                            StructArray::Make(struct_fields, output_schema->fields()));
+      struct_arrays.push_back(struct_array);
     }
-    sort_keys.emplace_back(static_cast<int>(i));
+    return ChunkedArray::Make(struct_arrays);
+  } else {
+    ArrayVector struct_fields(out_arrays.size());
+    for (size_t i = 0; i < out_arrays.size(); ++i) {
+      ARROW_ASSIGN_OR_RAISE(struct_fields[i], Concatenate(out_arrays[i]));
+    }
+    return StructArray::Make(std::move(struct_fields), output_schema->fields());
   }
-  std::shared_ptr<Schema> key_schema = schema(std::move(key_fields));
-  std::shared_ptr<Table> key_table = Table::Make(std::move(key_schema), key_columns);
-  SortOptions sort_options(std::move(sort_keys));
-  ARROW_ASSIGN_OR_RAISE(std::shared_ptr<Array> sort_indices,
-                        SortIndices(key_table, sort_options));
+}
 
-  ARROW_ASSIGN_OR_RAISE(
-      std::shared_ptr<Array> struct_arr,
-      StructArray::Make(std::move(out_arrays), output_schema->fields()));
+Result<Datum> RunGroupBy(const BatchesWithSchema& input,
+                         const std::vector<std::string>& key_names,
+                         const std::vector<std::string>& segment_key_names,
+                         const std::vector<Aggregate>& aggregates, bool use_threads,
+                         bool segmented = false, bool naive = false) {
+  if (segment_key_names.size() > 0) {
+    ARROW_ASSIGN_OR_RAISE(auto thread_pool, arrow::internal::ThreadPool::Make(1));
+    ExecContext seq_ctx(default_memory_pool(), thread_pool.get());
+    return RunGroupBy(input, key_names, segment_key_names, aggregates, &seq_ctx,
+                      use_threads, segmented, naive);
+  } else {
+    return RunGroupBy(input, key_names, segment_key_names, aggregates,
+                      threaded_exec_context(), use_threads, segmented, naive);
+  }
+}
 
-  return Take(struct_arr, sort_indices);
+Result<Datum> RunGroupBy(const BatchesWithSchema& input,
+                         const std::vector<std::string>& key_names,
+                         const std::vector<Aggregate>& aggregates, bool use_threads,
+                         bool segmented = false, bool naive = false) {
+  return RunGroupBy(input, key_names, {}, aggregates, use_threads, segmented);
 }
 
 /// Simpler overload where you can give the columns as datums
 Result<Datum> RunGroupBy(const std::vector<Datum>& arguments,
                          const std::vector<Datum>& keys,
-                         const std::vector<Aggregate>& aggregates,
-                         bool use_threads = false) {
+                         const std::vector<Datum>& segment_keys,
+                         const std::vector<Aggregate>& aggregates, bool use_threads,
+                         bool segmented = false, bool naive = false) {
   using arrow::compute::detail::ExecSpanIterator;
 
-  FieldVector scan_fields(arguments.size() + keys.size());
+  FieldVector scan_fields(arguments.size() + keys.size() + segment_keys.size());
   std::vector<std::string> key_names(keys.size());
+  std::vector<std::string> segment_key_names(segment_keys.size());
   for (size_t i = 0; i < arguments.size(); ++i) {
     auto name = std::string("agg_") + ToChars(i);
     scan_fields[i] = field(name, arguments[i].type());
   }
+  size_t base = arguments.size();
   for (size_t i = 0; i < keys.size(); ++i) {
     auto name = std::string("key_") + ToChars(i);
-    scan_fields[arguments.size() + i] = field(name, keys[i].type());
+    scan_fields[base + i] = field(name, keys[i].type());
     key_names[i] = std::move(name);
   }
+  base += keys.size();
+  size_t j = segmented ? keys.size() : keys.size();

Review Comment:
   ?



##########
cpp/src/arrow/compute/row/grouper.cc:
##########
@@ -39,12 +43,521 @@
 namespace arrow {
 
 using internal::checked_cast;
+using internal::PrimitiveScalarBase;
 
 namespace compute {
 
 namespace {
 
-struct GrouperImpl : Grouper {
+constexpr uint32_t kNoGroupId = std::numeric_limits<uint32_t>::max();
+
+using group_id_t = std::remove_const<decltype(kNoGroupId)>::type;
+using GroupIdType = CTypeTraits<group_id_t>::ArrowType;
+auto group_id_type = std::make_shared<GroupIdType>();
+
+inline const uint8_t* GetValuesAsBytes(const ArrayData& data, int64_t offset = 0) {
+  DCHECK_GT(data.type->byte_width(), 0);
+  int64_t absolute_byte_offset = (data.offset + offset) * data.type->byte_width();
+  return data.GetValues<uint8_t>(1, absolute_byte_offset);
+}
+
+inline const uint8_t* GetValuesAsBytes(const ArraySpan& data, int64_t offset = 0) {
+  DCHECK_GT(data.type->byte_width(), 0);
+  int64_t absolute_byte_offset = (data.offset + offset) * data.type->byte_width();
+  return data.GetValues<uint8_t>(1, absolute_byte_offset);
+}
+
+template <typename Value>
+Status CheckForGetNextSegment(const std::vector<Value>& values, int64_t length,
+                              int64_t offset, const std::vector<TypeHolder>& key_types) {
+  if (offset < 0 || offset > length) {
+    return Status::Invalid("invalid grouping segmenter offset: ", offset);
+  }
+  if (values.size() != key_types.size()) {
+    return Status::Invalid("expected batch size ", key_types.size(), " but got ",
+                           values.size());
+  }
+  for (size_t i = 0; i < key_types.size(); i++) {
+    const auto& value = values[i];
+    const auto& key_type = key_types[i];
+    if (*value.type() != *key_type.type) {
+      return Status::Invalid("expected batch value ", i, " of type ", *key_type.type,
+                             " but got ", *value.type());
+    }
+  }
+  return Status::OK();
+}
+
+template <typename Batch>
+enable_if_t<std::is_same<Batch, ExecSpan>::value || std::is_same<Batch, ExecBatch>::value,
+            Status>
+CheckForGetNextSegment(const Batch& batch, int64_t offset,
+                       const std::vector<TypeHolder>& key_types) {
+  return CheckForGetNextSegment(batch.values, batch.length, offset, key_types);
+}
+
+struct BaseGroupingSegmenter : public GroupingSegmenter {
+  explicit BaseGroupingSegmenter(const std::vector<TypeHolder>& key_types)
+      : key_types_(key_types) {}
+
+  const std::vector<TypeHolder>& key_types() const override { return key_types_; }
+
+  std::vector<TypeHolder> key_types_;

Review Comment:
   Is this kept around purely for validation purposes?  Otherwise it seems you could just get the types from the batch values themselves?



##########
cpp/src/arrow/compute/exec/aggregate_node.cc:
##########
@@ -191,13 +274,38 @@ class ScalarAggregateNode : public ExecNode, public TracedNode {
 
     const auto& aggregate_options = checked_cast<const AggregateNodeOptions&>(options);
     auto aggregates = aggregate_options.aggregates;
+    const auto& keys = aggregate_options.keys;
+    const auto& segment_keys = aggregate_options.segment_keys;
+
+    if (keys.size() > 0) {

Review Comment:
   This could probably be a DCHECK.  I don't think it should be possible to get here given that the factory in RegisterAggregateNode should check for this.



##########
cpp/src/arrow/compute/row/grouper.cc:
##########
@@ -39,12 +43,521 @@
 namespace arrow {
 
 using internal::checked_cast;
+using internal::PrimitiveScalarBase;
 
 namespace compute {
 
 namespace {
 
-struct GrouperImpl : Grouper {
+constexpr uint32_t kNoGroupId = std::numeric_limits<uint32_t>::max();
+
+using group_id_t = std::remove_const<decltype(kNoGroupId)>::type;
+using GroupIdType = CTypeTraits<group_id_t>::ArrowType;
+auto group_id_type = std::make_shared<GroupIdType>();
+
+inline const uint8_t* GetValuesAsBytes(const ArrayData& data, int64_t offset = 0) {
+  DCHECK_GT(data.type->byte_width(), 0);
+  int64_t absolute_byte_offset = (data.offset + offset) * data.type->byte_width();
+  return data.GetValues<uint8_t>(1, absolute_byte_offset);
+}
+
+inline const uint8_t* GetValuesAsBytes(const ArraySpan& data, int64_t offset = 0) {
+  DCHECK_GT(data.type->byte_width(), 0);
+  int64_t absolute_byte_offset = (data.offset + offset) * data.type->byte_width();
+  return data.GetValues<uint8_t>(1, absolute_byte_offset);
+}
+
+template <typename Value>
+Status CheckForGetNextSegment(const std::vector<Value>& values, int64_t length,
+                              int64_t offset, const std::vector<TypeHolder>& key_types) {
+  if (offset < 0 || offset > length) {
+    return Status::Invalid("invalid grouping segmenter offset: ", offset);
+  }
+  if (values.size() != key_types.size()) {
+    return Status::Invalid("expected batch size ", key_types.size(), " but got ",
+                           values.size());
+  }
+  for (size_t i = 0; i < key_types.size(); i++) {
+    const auto& value = values[i];
+    const auto& key_type = key_types[i];
+    if (*value.type() != *key_type.type) {
+      return Status::Invalid("expected batch value ", i, " of type ", *key_type.type,
+                             " but got ", *value.type());
+    }
+  }
+  return Status::OK();
+}
+
+template <typename Batch>
+enable_if_t<std::is_same<Batch, ExecSpan>::value || std::is_same<Batch, ExecBatch>::value,
+            Status>
+CheckForGetNextSegment(const Batch& batch, int64_t offset,
+                       const std::vector<TypeHolder>& key_types) {
+  return CheckForGetNextSegment(batch.values, batch.length, offset, key_types);
+}
+
+struct BaseGroupingSegmenter : public GroupingSegmenter {
+  explicit BaseGroupingSegmenter(const std::vector<TypeHolder>& key_types)
+      : key_types_(key_types) {}
+
+  const std::vector<TypeHolder>& key_types() const override { return key_types_; }
+
+  std::vector<TypeHolder> key_types_;
+};
+
+GroupingSegment MakeSegment(int64_t batch_length, int64_t offset, int64_t length,
+                            bool extends) {
+  return GroupingSegment{offset, length, offset + length >= batch_length, extends};
+}
+
+int64_t GetMatchLength(const uint8_t* match_bytes, int64_t match_width,
+                       const uint8_t* array_bytes, int64_t offset, int64_t length) {
+  int64_t cursor, byte_cursor;
+  for (cursor = offset, byte_cursor = match_width * cursor; cursor < length;
+       cursor++, byte_cursor += match_width) {
+    if (memcmp(match_bytes, array_bytes + byte_cursor,
+               static_cast<size_t>(match_width)) != 0) {
+      break;
+    }
+  }
+  return std::min(cursor, length - offset);
+}
+
+using ExtendFunc = std::function<bool(const void*)>;
+constexpr bool kDefaultExtends = true;
+constexpr bool kEmptyExtends = true;
+
+Result<GroupingSegment> GetNextSegmentChunked(
+    const std::shared_ptr<ChunkedArray>& chunked_array, int64_t offset,
+    ExtendFunc extend) {
+  if (offset >= chunked_array->length()) {
+    return MakeSegment(chunked_array->length(), chunked_array->length(), 0,
+                       kEmptyExtends);
+  }
+  int64_t remaining_offset = offset;
+  const auto& arrays = chunked_array->chunks();
+  for (size_t i = 0; remaining_offset >= 0 && i < arrays.size(); i++) {
+    // look up chunk containing offset
+    int64_t array_length = arrays[i]->length();
+    if (remaining_offset < array_length) {
+      // found - switch to matching
+      int64_t match_width = arrays[i]->type()->byte_width();
+      const uint8_t* match_bytes = GetValuesAsBytes(*arrays[i]->data(), remaining_offset);
+      int64_t total_match_length = 0;
+      for (; i < arrays.size(); i++) {
+        int64_t array_length = arrays[i]->length();
+        if (array_length <= 0) continue;
+        const uint8_t* array_bytes = GetValuesAsBytes(*arrays[i]->data());
+        int64_t match_length = GetMatchLength(match_bytes, match_width, array_bytes,
+                                              remaining_offset, array_length);
+        total_match_length += match_length;
+        remaining_offset = 0;
+        if (match_length < array_length - remaining_offset) break;
+      }
+      bool extends = extend(match_bytes);
+      return MakeSegment(chunked_array->length(), offset, total_match_length, extends);
+    }
+    remaining_offset -= array_length;
+  }
+  return Status::Invalid("segmenting invalid chunked array value");
+}
+
+struct NoKeysGroupingSegmenter : public BaseGroupingSegmenter {
+  static std::unique_ptr<GroupingSegmenter> Make() {
+    return std::make_unique<NoKeysGroupingSegmenter>();
+  }
+
+  NoKeysGroupingSegmenter() : BaseGroupingSegmenter({}) {}
+
+  Status Reset() override { return Status::OK(); }
+
+  template <typename Batch>
+  Result<GroupingSegment> GetNextSegmentImpl(const Batch& batch, int64_t offset) {
+    ARROW_RETURN_NOT_OK(CheckForGetNextSegment(batch, offset, {}));
+    return MakeSegment(batch.length, offset, batch.length - offset, kDefaultExtends);
+  }
+
+  Result<GroupingSegment> GetNextSegment(const ExecSpan& batch, int64_t offset) override {
+    return GetNextSegmentImpl(batch, offset);
+  }
+
+  Result<GroupingSegment> GetNextSegment(const ExecBatch& batch,
+                                         int64_t offset) override {
+    return GetNextSegmentImpl(batch, offset);
+  }
+};
+
+struct SimpleKeyGroupingSegmenter : public BaseGroupingSegmenter {
+  static Result<std::unique_ptr<GroupingSegmenter>> Make(TypeHolder key_type) {
+    return std::make_unique<SimpleKeyGroupingSegmenter>(key_type);
+  }
+
+  explicit SimpleKeyGroupingSegmenter(TypeHolder key_type)
+      : BaseGroupingSegmenter({key_type}), key_type_(key_types_[0]), save_key_data_() {}
+
+  Status CheckType(const DataType& type) {
+    if (!is_fixed_width(type)) {
+      return Status::Invalid("SimpleKeyGroupingSegmenter does not support type ", type);
+    }
+    return Status::OK();
+  }
+
+  Status Reset() override {
+    save_key_data_.resize(0);
+    return Status::OK();
+  }
+
+  bool Extend(const void* data) {
+    size_t byte_width = static_cast<size_t>(key_type_.type->byte_width());
+    bool extends = save_key_data_.size() != byte_width
+                       ? kDefaultExtends
+                       : 0 == memcmp(save_key_data_.data(), data, byte_width);
+    save_key_data_.resize(byte_width);
+    memcpy(save_key_data_.data(), data, byte_width);
+    return extends;
+  }
+
+  Result<GroupingSegment> GetNextSegment(const Scalar& scalar, int64_t offset,
+                                         int64_t length) {
+    ARROW_RETURN_NOT_OK(CheckType(*scalar.type));
+    if (!scalar.is_valid) {
+      return Status::Invalid("segmenting an invalid scalar");
+    }
+    auto data = checked_cast<const PrimitiveScalarBase&>(scalar).data();
+    bool extends = length > 0 ? Extend(data) : kEmptyExtends;
+    return MakeSegment(length, 0, length, extends);
+  }
+
+  Result<GroupingSegment> GetNextSegment(const DataType& array_type,
+                                         const uint8_t* array_bytes, int64_t offset,
+                                         int64_t length) {
+    RETURN_NOT_OK(CheckType(array_type));
+    int64_t byte_width = array_type.byte_width();
+    int64_t match_length = GetMatchLength(array_bytes + offset * byte_width, byte_width,
+                                          array_bytes, offset, length);
+    bool extends = length > 0 ? Extend(array_bytes + offset * byte_width) : kEmptyExtends;
+    return MakeSegment(length, offset, match_length, extends);
+  }
+
+  Result<GroupingSegment> GetNextSegment(const ExecSpan& batch, int64_t offset) override {
+    ARROW_RETURN_NOT_OK(CheckForGetNextSegment(batch, offset, {key_type_}));
+    if (offset == batch.length) {
+      return MakeSegment(batch.length, offset, 0, kEmptyExtends);
+    }
+    const auto& value = batch.values[0];
+    if (value.is_scalar()) {
+      return GetNextSegment(*value.scalar, offset, batch.length);
+    }
+    ARROW_DCHECK(value.is_array());
+    const auto& array = value.array;
+    if (array.GetNullCount() > 0) {
+      return Status::NotImplemented("segmenting a nullable array");
+    }
+    return GetNextSegment(*array.type, GetValuesAsBytes(array), offset, batch.length);
+  }
+
+  Result<GroupingSegment> GetNextSegment(const ExecBatch& batch,
+                                         int64_t offset) override {
+    ARROW_RETURN_NOT_OK(CheckForGetNextSegment(batch, offset, {key_type_}));
+    if (offset == batch.length) {
+      return MakeSegment(batch.length, offset, 0, kEmptyExtends);
+    }
+    const auto& value = batch.values[0];
+    if (value.is_scalar()) {
+      return GetNextSegment(*value.scalar(), offset, batch.length);
+    }
+    if (value.is_array()) {
+      auto array = value.array();
+      if (array->GetNullCount() > 0) {
+        return Status::NotImplemented("segmenting a nullable array");
+      }
+      return GetNextSegment(*array->type, GetValuesAsBytes(*array), offset, batch.length);
+    }
+    if (value.is_chunked_array()) {
+      auto array = value.chunked_array();
+      if (array->null_count() > 0) {
+        return Status::NotImplemented("segmenting a nullable array");
+      }
+      return GetNextSegmentChunked(array, offset, bound_extend_);
+    }
+    return Status::Invalid("segmenting unsupported value kind ", value.kind());
+  }
+
+ private:
+  TypeHolder key_type_;
+  std::vector<uint8_t> save_key_data_;
+  ExtendFunc bound_extend_ = [this](const void* data) { return Extend(data); };
+};
+
+struct AnyKeysGroupingSegmenter : public BaseGroupingSegmenter {
+  static Result<std::unique_ptr<GroupingSegmenter>> Make(
+      const std::vector<TypeHolder>& key_types, ExecContext* ctx) {
+    ARROW_RETURN_NOT_OK(Grouper::Make(key_types, ctx));  // check types
+    return std::make_unique<AnyKeysGroupingSegmenter>(key_types, ctx);
+  }
+
+  AnyKeysGroupingSegmenter(const std::vector<TypeHolder>& key_types, ExecContext* ctx)
+      : BaseGroupingSegmenter(key_types),
+        ctx_(ctx),
+        grouper_(nullptr),
+        save_group_id_(kNoGroupId) {}
+
+  Status Reset() override {
+    grouper_ = nullptr;
+    save_group_id_ = kNoGroupId;
+    return Status::OK();
+  }
+
+  bool Extend(const void* data) {
+    auto group_id = *static_cast<const group_id_t*>(data);
+    bool extends =
+        save_group_id_ == kNoGroupId ? kDefaultExtends : save_group_id_ == group_id;
+    save_group_id_ = group_id;
+    return extends;
+  }
+
+  template <typename Batch>
+  Result<group_id_t> MapGroupIdAt(const Batch& batch, int64_t offset) {
+    if (offset < 0 || offset >= batch.length) {
+      return Status::Invalid("requesting group id out of bounds");
+    }
+    if (!grouper_) return kNoGroupId;
+    ARROW_ASSIGN_OR_RAISE(auto datum, grouper_->Consume(batch, offset,
+                                                        /*consume_length=*/1));
+    if (!(datum.is_array() || datum.is_chunked_array())) {
+      return Status::Invalid("accessing unsupported datum kind ", datum.kind());
+    }
+    const std::shared_ptr<ArrayData>& data =
+        datum.is_array() ? datum.array() : datum.chunked_array()->chunk(0)->data();
+    ARROW_DCHECK(data->GetNullCount() == 0);
+    DCHECK_EQ(data->type->id(), GroupIdType::type_id);
+    DCHECK_EQ(1, data->length);
+    const group_id_t* values = data->GetValues<group_id_t>(1);
+    return values[0];
+  }
+
+  template <typename Batch>
+  Result<GroupingSegment> GetNextSegmentImpl(const Batch& batch, int64_t offset) {
+    ARROW_RETURN_NOT_OK(CheckForGetNextSegment(batch, offset, key_types_));
+    if (offset == batch.length) {
+      return MakeSegment(batch.length, offset, 0, kEmptyExtends);
+    }
+    // ARROW-18311: make Grouper support Reset()
+    // so it can be cached instead of recreated below
+    //
+    // the group id must be computed prior to resetting the grouper, since it is compared
+    // to save_group_id_, and after resetting the grouper produces incomparable group ids
+    ARROW_ASSIGN_OR_RAISE(auto group_id, MapGroupIdAt(batch, offset));
+    ExtendFunc bound_extend = [this, group_id](const void* data) {
+      bool extends = Extend(&group_id);
+      save_group_id_ = *static_cast<const group_id_t*>(data);
+      return extends;
+    };
+    ARROW_ASSIGN_OR_RAISE(grouper_, Grouper::Make(key_types_, ctx_));  // TODO: reset it
+    ARROW_ASSIGN_OR_RAISE(auto datum, grouper_->Consume(batch, offset));
+    if (datum.is_array()) {
+      const std::shared_ptr<ArrayData>& data = datum.array();
+      ARROW_DCHECK(data->GetNullCount() == 0);
+      DCHECK_EQ(data->type->id(), GroupIdType::type_id);
+      const group_id_t* values = data->GetValues<group_id_t>(1);
+      int64_t cursor;
+      for (cursor = 1; cursor < data->length; cursor++) {
+        if (values[0] != values[cursor]) break;
+      }
+      int64_t length = std::min(cursor, batch.length - offset);
+      bool extends = length > 0 ? bound_extend(values) : kEmptyExtends;
+      return MakeSegment(batch.length, offset, length, extends);
+    } else if (datum.is_chunked_array()) {
+      ARROW_ASSIGN_OR_RAISE(
+          auto segment, GetNextSegmentChunked(datum.chunked_array(), 0, bound_extend));
+      segment.offset += offset;
+      return segment;
+    } else {
+      return Status::Invalid("segmenting unsupported datum kind ", datum.kind());
+    }
+  }
+
+  Result<GroupingSegment> GetNextSegment(const ExecSpan& batch, int64_t offset) override {
+    return GetNextSegmentImpl(batch, offset);
+  }
+
+  Result<GroupingSegment> GetNextSegment(const ExecBatch& batch,
+                                         int64_t offset) override {
+    return GetNextSegmentImpl(batch, offset);
+  }
+
+ private:
+  ExecContext* const ctx_;
+  std::unique_ptr<Grouper> grouper_;
+  group_id_t save_group_id_;
+};
+
+Status CheckForConsume(int64_t batch_length, int64_t& consume_offset,
+                       int64_t* consume_length) {
+  if (consume_offset < 0) {
+    return Status::Invalid("invalid grouper consume offset: ", consume_offset);
+  }
+  if (*consume_length < 0) {
+    *consume_length = batch_length - consume_offset;
+  }
+  return Status::OK();
+}
+
+}  // namespace
+
+Result<std::unique_ptr<GroupingSegmenter>> GroupingSegmenter::Make(
+    const std::vector<TypeHolder>& key_types, bool nullable_keys, ExecContext* ctx) {
+  if (key_types.size() == 0) {
+    return NoKeysGroupingSegmenter::Make();
+  } else if (!nullable_keys && key_types.size() == 1) {
+    const DataType* type = key_types[0].type;
+    if (type != NULLPTR && is_fixed_width(*type)) {
+      return SimpleKeyGroupingSegmenter::Make(key_types[0]);
+    }
+  }
+  return AnyKeysGroupingSegmenter::Make(key_types, ctx);
+}
+
+namespace {
+
+struct BaseGrouper : public Grouper {
+  int IndexOfChunk(const ExecBatch& batch) {
+    int i = 0;
+    for (const auto& value : batch.values) {
+      if (value.is_chunked_array()) {
+        return i;
+      }
+      ++i;
+    }
+    return -1;
+  }
+
+  bool HasConsistentChunks(const ExecBatch& batch, int index_of_chunk) {
+    auto first_chunked_array = batch.values[index_of_chunk].chunked_array();
+    if (first_chunked_array < 0) {
+      // having no chunks is considered consistent
+      return true;
+    }
+    int num_chunks = first_chunked_array->num_chunks();
+    int64_t length = first_chunked_array->length();
+    for (const auto& value : batch.values) {
+      if (!value.is_chunked_array()) {
+        continue;
+      }
+      auto curr_chunk = value.chunked_array();
+      if (num_chunks != curr_chunk->num_chunks() || length != curr_chunk->length()) {
+        return false;
+      }
+    }
+    if (num_chunks > 0) {
+      for (int i = 0; i < num_chunks; i++) {
+        int64_t chunk_length = first_chunked_array->chunk(i)->length();
+        for (const auto& value : batch.values) {
+          if (!value.is_chunked_array()) {
+            continue;
+          }
+          auto curr_chunk = value.chunked_array();
+          if (chunk_length != curr_chunk->chunk(i)->length()) {
+            return false;
+          }
+        }
+      }
+    }
+    return true;
+  }

Review Comment:
   Can probably remove this logic since we don't have to worry about chunked arrays.



##########
cpp/src/arrow/compute/exec/aggregate_node.cc:
##########
@@ -326,46 +446,86 @@ class ScalarAggregateNode : public ExecNode, public TracedNode {
   }
 
  private:
-  Status Finish() {
-    auto scope = TraceFinish();
+  Status ReconstructAggregates() {

Review Comment:
   Maybe "reset" instead of "reconstruct"?



##########
cpp/src/arrow/compute/kernels/hash_aggregate_test.cc:
##########
@@ -174,81 +242,117 @@ Result<Datum> RunGroupBy(const BatchesWithSchema& input,
   ARROW_ASSIGN_OR_RAISE(std::vector<ExecBatch> output_batches,
                         start_and_collect.MoveResult());
 
-  ArrayVector out_arrays(aggregates.size() + key_names.size());
   const auto& output_schema = plan->nodes()[0]->output()->output_schema();
+  if (!segmented) {
+    return MakeGroupByOutput(output_batches, output_schema, aggregates.size(),
+                             key_names.size(), naive);
+  }
+
+  std::vector<ArrayVector> out_arrays(aggregates.size() + key_names.size() +
+                                      segment_key_names.size());
   for (size_t i = 0; i < out_arrays.size(); ++i) {
     std::vector<std::shared_ptr<Array>> arrays(output_batches.size());
     for (size_t j = 0; j < output_batches.size(); ++j) {
-      arrays[j] = output_batches[j].values[i].make_array();
+      auto& value = output_batches[j].values[i];
+      if (value.is_scalar()) {
+        ARROW_ASSIGN_OR_RAISE(
+            arrays[j], MakeArrayFromScalar(*value.scalar(), output_batches[j].length));
+      } else if (value.is_array()) {
+        arrays[j] = value.make_array();
+      } else {
+        return Status::Invalid("GroupByUsingExecPlan unsupported value kind ",
+                               ToString(value.kind()));
+      }
     }
     if (arrays.empty()) {
+      arrays.resize(1);
       ARROW_ASSIGN_OR_RAISE(
-          out_arrays[i],
-          MakeArrayOfNull(output_schema->field(static_cast<int>(i))->type(),
-                          /*length=*/0));
-    } else {
-      ARROW_ASSIGN_OR_RAISE(out_arrays[i], Concatenate(arrays));
+          arrays[0], MakeArrayOfNull(output_schema->field(static_cast<int>(i))->type(),
+                                     /*length=*/0));
     }
+    out_arrays[i] = {std::move(arrays)};
   }
 
-  // The exec plan may reorder the output rows.  The tests are all setup to expect ouptut
-  // in ascending order of keys.  So we need to sort the result by the key columns.  To do
-  // that we create a table using the key columns, calculate the sort indices from that
-  // table (sorting on all fields) and then use those indices to calculate our result.
-  std::vector<std::shared_ptr<Field>> key_fields;
-  std::vector<std::shared_ptr<Array>> key_columns;
-  std::vector<SortKey> sort_keys;
-  for (std::size_t i = 0; i < key_names.size(); i++) {
-    const std::shared_ptr<Array>& arr = out_arrays[i + aggregates.size()];
-    if (arr->type_id() == Type::DICTIONARY) {
-      // Can't sort dictionary columns so need to decode
-      auto dict_arr = checked_pointer_cast<DictionaryArray>(arr);
-      ARROW_ASSIGN_OR_RAISE(auto decoded_arr,
-                            Take(*dict_arr->dictionary(), *dict_arr->indices()));
-      key_columns.push_back(decoded_arr);
-      key_fields.push_back(
-          field("name_does_not_matter", dict_arr->dict_type()->value_type()));
-    } else {
-      key_columns.push_back(arr);
-      key_fields.push_back(field("name_does_not_matter", arr->type()));
+  if (segmented && segment_key_names.size() > 0) {
+    ArrayVector struct_arrays;
+    struct_arrays.reserve(output_batches.size());
+    for (size_t j = 0; j < output_batches.size(); ++j) {
+      ArrayVector struct_fields;
+      struct_fields.reserve(out_arrays.size());
+      for (auto out_array : out_arrays) {
+        struct_fields.push_back(out_array[j]);
+      }
+      ARROW_ASSIGN_OR_RAISE(auto struct_array,
+                            StructArray::Make(struct_fields, output_schema->fields()));
+      struct_arrays.push_back(struct_array);
     }
-    sort_keys.emplace_back(static_cast<int>(i));
+    return ChunkedArray::Make(struct_arrays);
+  } else {
+    ArrayVector struct_fields(out_arrays.size());
+    for (size_t i = 0; i < out_arrays.size(); ++i) {
+      ARROW_ASSIGN_OR_RAISE(struct_fields[i], Concatenate(out_arrays[i]));
+    }
+    return StructArray::Make(std::move(struct_fields), output_schema->fields());
   }
-  std::shared_ptr<Schema> key_schema = schema(std::move(key_fields));
-  std::shared_ptr<Table> key_table = Table::Make(std::move(key_schema), key_columns);
-  SortOptions sort_options(std::move(sort_keys));
-  ARROW_ASSIGN_OR_RAISE(std::shared_ptr<Array> sort_indices,
-                        SortIndices(key_table, sort_options));
+}
 
-  ARROW_ASSIGN_OR_RAISE(
-      std::shared_ptr<Array> struct_arr,
-      StructArray::Make(std::move(out_arrays), output_schema->fields()));
+Result<Datum> RunGroupBy(const BatchesWithSchema& input,
+                         const std::vector<std::string>& key_names,
+                         const std::vector<std::string>& segment_key_names,
+                         const std::vector<Aggregate>& aggregates, bool use_threads,
+                         bool segmented = false, bool naive = false) {
+  if (segment_key_names.size() > 0) {
+    ARROW_ASSIGN_OR_RAISE(auto thread_pool, arrow::internal::ThreadPool::Make(1));
+    ExecContext seq_ctx(default_memory_pool(), thread_pool.get());
+    return RunGroupBy(input, key_names, segment_key_names, aggregates, &seq_ctx,
+                      use_threads, segmented, naive);
+  } else {
+    return RunGroupBy(input, key_names, segment_key_names, aggregates,
+                      threaded_exec_context(), use_threads, segmented, naive);
+  }
+}
 
-  return Take(struct_arr, sort_indices);
+Result<Datum> RunGroupBy(const BatchesWithSchema& input,
+                         const std::vector<std::string>& key_names,
+                         const std::vector<Aggregate>& aggregates, bool use_threads,
+                         bool segmented = false, bool naive = false) {
+  return RunGroupBy(input, key_names, {}, aggregates, use_threads, segmented);
 }
 
 /// Simpler overload where you can give the columns as datums
 Result<Datum> RunGroupBy(const std::vector<Datum>& arguments,
                          const std::vector<Datum>& keys,
-                         const std::vector<Aggregate>& aggregates,
-                         bool use_threads = false) {
+                         const std::vector<Datum>& segment_keys,
+                         const std::vector<Aggregate>& aggregates, bool use_threads,
+                         bool segmented = false, bool naive = false) {

Review Comment:
   Why is there both `segmented` and `segment_keys`?  Wouldn't `segmented == !segment_keys.empty()`?



##########
cpp/src/arrow/compute/row/grouper.cc:
##########
@@ -39,12 +43,521 @@
 namespace arrow {
 
 using internal::checked_cast;
+using internal::PrimitiveScalarBase;
 
 namespace compute {
 
 namespace {
 
-struct GrouperImpl : Grouper {
+constexpr uint32_t kNoGroupId = std::numeric_limits<uint32_t>::max();
+
+using group_id_t = std::remove_const<decltype(kNoGroupId)>::type;
+using GroupIdType = CTypeTraits<group_id_t>::ArrowType;
+auto group_id_type = std::make_shared<GroupIdType>();
+
+inline const uint8_t* GetValuesAsBytes(const ArrayData& data, int64_t offset = 0) {
+  DCHECK_GT(data.type->byte_width(), 0);
+  int64_t absolute_byte_offset = (data.offset + offset) * data.type->byte_width();
+  return data.GetValues<uint8_t>(1, absolute_byte_offset);
+}
+
+inline const uint8_t* GetValuesAsBytes(const ArraySpan& data, int64_t offset = 0) {
+  DCHECK_GT(data.type->byte_width(), 0);
+  int64_t absolute_byte_offset = (data.offset + offset) * data.type->byte_width();
+  return data.GetValues<uint8_t>(1, absolute_byte_offset);
+}
+
+template <typename Value>
+Status CheckForGetNextSegment(const std::vector<Value>& values, int64_t length,
+                              int64_t offset, const std::vector<TypeHolder>& key_types) {
+  if (offset < 0 || offset > length) {
+    return Status::Invalid("invalid grouping segmenter offset: ", offset);
+  }
+  if (values.size() != key_types.size()) {
+    return Status::Invalid("expected batch size ", key_types.size(), " but got ",
+                           values.size());
+  }
+  for (size_t i = 0; i < key_types.size(); i++) {
+    const auto& value = values[i];
+    const auto& key_type = key_types[i];
+    if (*value.type() != *key_type.type) {
+      return Status::Invalid("expected batch value ", i, " of type ", *key_type.type,
+                             " but got ", *value.type());
+    }
+  }
+  return Status::OK();
+}
+
+template <typename Batch>
+enable_if_t<std::is_same<Batch, ExecSpan>::value || std::is_same<Batch, ExecBatch>::value,
+            Status>
+CheckForGetNextSegment(const Batch& batch, int64_t offset,
+                       const std::vector<TypeHolder>& key_types) {
+  return CheckForGetNextSegment(batch.values, batch.length, offset, key_types);
+}
+
+struct BaseGroupingSegmenter : public GroupingSegmenter {
+  explicit BaseGroupingSegmenter(const std::vector<TypeHolder>& key_types)
+      : key_types_(key_types) {}
+
+  const std::vector<TypeHolder>& key_types() const override { return key_types_; }
+
+  std::vector<TypeHolder> key_types_;
+};
+
+GroupingSegment MakeSegment(int64_t batch_length, int64_t offset, int64_t length,
+                            bool extends) {
+  return GroupingSegment{offset, length, offset + length >= batch_length, extends};
+}
+
+int64_t GetMatchLength(const uint8_t* match_bytes, int64_t match_width,
+                       const uint8_t* array_bytes, int64_t offset, int64_t length) {
+  int64_t cursor, byte_cursor;
+  for (cursor = offset, byte_cursor = match_width * cursor; cursor < length;
+       cursor++, byte_cursor += match_width) {
+    if (memcmp(match_bytes, array_bytes + byte_cursor,
+               static_cast<size_t>(match_width)) != 0) {
+      break;
+    }
+  }
+  return std::min(cursor, length - offset);
+}
+
+using ExtendFunc = std::function<bool(const void*)>;
+constexpr bool kDefaultExtends = true;
+constexpr bool kEmptyExtends = true;
+
+Result<GroupingSegment> GetNextSegmentChunked(
+    const std::shared_ptr<ChunkedArray>& chunked_array, int64_t offset,
+    ExtendFunc extend) {
+  if (offset >= chunked_array->length()) {
+    return MakeSegment(chunked_array->length(), chunked_array->length(), 0,
+                       kEmptyExtends);
+  }
+  int64_t remaining_offset = offset;
+  const auto& arrays = chunked_array->chunks();
+  for (size_t i = 0; remaining_offset >= 0 && i < arrays.size(); i++) {
+    // look up chunk containing offset
+    int64_t array_length = arrays[i]->length();
+    if (remaining_offset < array_length) {
+      // found - switch to matching
+      int64_t match_width = arrays[i]->type()->byte_width();
+      const uint8_t* match_bytes = GetValuesAsBytes(*arrays[i]->data(), remaining_offset);
+      int64_t total_match_length = 0;
+      for (; i < arrays.size(); i++) {
+        int64_t array_length = arrays[i]->length();
+        if (array_length <= 0) continue;
+        const uint8_t* array_bytes = GetValuesAsBytes(*arrays[i]->data());
+        int64_t match_length = GetMatchLength(match_bytes, match_width, array_bytes,
+                                              remaining_offset, array_length);
+        total_match_length += match_length;
+        remaining_offset = 0;
+        if (match_length < array_length - remaining_offset) break;
+      }
+      bool extends = extend(match_bytes);
+      return MakeSegment(chunked_array->length(), offset, total_match_length, extends);
+    }
+    remaining_offset -= array_length;
+  }
+  return Status::Invalid("segmenting invalid chunked array value");
+}
+
+struct NoKeysGroupingSegmenter : public BaseGroupingSegmenter {
+  static std::unique_ptr<GroupingSegmenter> Make() {
+    return std::make_unique<NoKeysGroupingSegmenter>();
+  }
+
+  NoKeysGroupingSegmenter() : BaseGroupingSegmenter({}) {}
+
+  Status Reset() override { return Status::OK(); }
+
+  template <typename Batch>
+  Result<GroupingSegment> GetNextSegmentImpl(const Batch& batch, int64_t offset) {
+    ARROW_RETURN_NOT_OK(CheckForGetNextSegment(batch, offset, {}));
+    return MakeSegment(batch.length, offset, batch.length - offset, kDefaultExtends);
+  }
+
+  Result<GroupingSegment> GetNextSegment(const ExecSpan& batch, int64_t offset) override {
+    return GetNextSegmentImpl(batch, offset);
+  }
+
+  Result<GroupingSegment> GetNextSegment(const ExecBatch& batch,
+                                         int64_t offset) override {
+    return GetNextSegmentImpl(batch, offset);
+  }
+};
+
+struct SimpleKeyGroupingSegmenter : public BaseGroupingSegmenter {
+  static Result<std::unique_ptr<GroupingSegmenter>> Make(TypeHolder key_type) {
+    return std::make_unique<SimpleKeyGroupingSegmenter>(key_type);
+  }
+
+  explicit SimpleKeyGroupingSegmenter(TypeHolder key_type)
+      : BaseGroupingSegmenter({key_type}), key_type_(key_types_[0]), save_key_data_() {}
+
+  Status CheckType(const DataType& type) {
+    if (!is_fixed_width(type)) {
+      return Status::Invalid("SimpleKeyGroupingSegmenter does not support type ", type);
+    }
+    return Status::OK();
+  }
+
+  Status Reset() override {
+    save_key_data_.resize(0);
+    return Status::OK();
+  }
+
+  bool Extend(const void* data) {
+    size_t byte_width = static_cast<size_t>(key_type_.type->byte_width());
+    bool extends = save_key_data_.size() != byte_width
+                       ? kDefaultExtends
+                       : 0 == memcmp(save_key_data_.data(), data, byte_width);
+    save_key_data_.resize(byte_width);
+    memcpy(save_key_data_.data(), data, byte_width);
+    return extends;
+  }
+
+  Result<GroupingSegment> GetNextSegment(const Scalar& scalar, int64_t offset,
+                                         int64_t length) {
+    ARROW_RETURN_NOT_OK(CheckType(*scalar.type));
+    if (!scalar.is_valid) {
+      return Status::Invalid("segmenting an invalid scalar");
+    }
+    auto data = checked_cast<const PrimitiveScalarBase&>(scalar).data();
+    bool extends = length > 0 ? Extend(data) : kEmptyExtends;
+    return MakeSegment(length, 0, length, extends);

Review Comment:
   ```suggestion
       return MakeSegment(length, offset, length - offset, extends);
   ```
   Although I suppose this doesn't really matter.



##########
cpp/src/arrow/compute/exec/aggregate_node.cc:
##########
@@ -326,46 +446,86 @@ class ScalarAggregateNode : public ExecNode, public TracedNode {
   }
 
  private:
-  Status Finish() {
-    auto scope = TraceFinish();
+  Status ReconstructAggregates() {
+    const auto& input_schema = *inputs()[0]->output_schema();
+    auto exec_ctx = plan()->query_context()->exec_context();
+    for (size_t i = 0; i < kernels_.size(); ++i) {
+      std::vector<TypeHolder> in_types;
+      for (const auto& target : target_fieldsets_[i]) {
+        in_types.emplace_back(input_schema.field(target)->type().get());
+      }
+      states_[i].resize(plan()->query_context()->max_concurrency());
+      KernelContext kernel_ctx{exec_ctx};
+      RETURN_NOT_OK(Kernel::InitAll(
+          &kernel_ctx, KernelInitArgs{kernels_[i], in_types, aggs_[i].options.get()},
+          &states_[i]));
+    }
+    return Status::OK();
+  }
+
+  Status OutputResult(bool is_last = false, bool traced = false) {
+    if (is_last && !traced) {
+      auto scope = TraceFinish();

Review Comment:
   Could you just call `TraceFinish` in `InputFinished`?



##########
cpp/src/arrow/compute/row/grouper.cc:
##########
@@ -39,12 +43,521 @@
 namespace arrow {
 
 using internal::checked_cast;
+using internal::PrimitiveScalarBase;
 
 namespace compute {
 
 namespace {
 
-struct GrouperImpl : Grouper {
+constexpr uint32_t kNoGroupId = std::numeric_limits<uint32_t>::max();
+
+using group_id_t = std::remove_const<decltype(kNoGroupId)>::type;
+using GroupIdType = CTypeTraits<group_id_t>::ArrowType;
+auto group_id_type = std::make_shared<GroupIdType>();
+
+inline const uint8_t* GetValuesAsBytes(const ArrayData& data, int64_t offset = 0) {
+  DCHECK_GT(data.type->byte_width(), 0);
+  int64_t absolute_byte_offset = (data.offset + offset) * data.type->byte_width();
+  return data.GetValues<uint8_t>(1, absolute_byte_offset);
+}
+
+inline const uint8_t* GetValuesAsBytes(const ArraySpan& data, int64_t offset = 0) {
+  DCHECK_GT(data.type->byte_width(), 0);
+  int64_t absolute_byte_offset = (data.offset + offset) * data.type->byte_width();
+  return data.GetValues<uint8_t>(1, absolute_byte_offset);
+}
+
+template <typename Value>
+Status CheckForGetNextSegment(const std::vector<Value>& values, int64_t length,
+                              int64_t offset, const std::vector<TypeHolder>& key_types) {
+  if (offset < 0 || offset > length) {
+    return Status::Invalid("invalid grouping segmenter offset: ", offset);
+  }
+  if (values.size() != key_types.size()) {
+    return Status::Invalid("expected batch size ", key_types.size(), " but got ",
+                           values.size());
+  }
+  for (size_t i = 0; i < key_types.size(); i++) {
+    const auto& value = values[i];
+    const auto& key_type = key_types[i];
+    if (*value.type() != *key_type.type) {
+      return Status::Invalid("expected batch value ", i, " of type ", *key_type.type,
+                             " but got ", *value.type());
+    }
+  }
+  return Status::OK();
+}
+
+template <typename Batch>
+enable_if_t<std::is_same<Batch, ExecSpan>::value || std::is_same<Batch, ExecBatch>::value,
+            Status>
+CheckForGetNextSegment(const Batch& batch, int64_t offset,
+                       const std::vector<TypeHolder>& key_types) {
+  return CheckForGetNextSegment(batch.values, batch.length, offset, key_types);
+}
+
+struct BaseGroupingSegmenter : public GroupingSegmenter {
+  explicit BaseGroupingSegmenter(const std::vector<TypeHolder>& key_types)
+      : key_types_(key_types) {}
+
+  const std::vector<TypeHolder>& key_types() const override { return key_types_; }
+
+  std::vector<TypeHolder> key_types_;
+};
+
+GroupingSegment MakeSegment(int64_t batch_length, int64_t offset, int64_t length,
+                            bool extends) {
+  return GroupingSegment{offset, length, offset + length >= batch_length, extends};
+}
+
+int64_t GetMatchLength(const uint8_t* match_bytes, int64_t match_width,
+                       const uint8_t* array_bytes, int64_t offset, int64_t length) {
+  int64_t cursor, byte_cursor;
+  for (cursor = offset, byte_cursor = match_width * cursor; cursor < length;
+       cursor++, byte_cursor += match_width) {
+    if (memcmp(match_bytes, array_bytes + byte_cursor,
+               static_cast<size_t>(match_width)) != 0) {
+      break;
+    }
+  }
+  return std::min(cursor, length - offset);
+}
+
+using ExtendFunc = std::function<bool(const void*)>;
+constexpr bool kDefaultExtends = true;
+constexpr bool kEmptyExtends = true;
+
+Result<GroupingSegment> GetNextSegmentChunked(
+    const std::shared_ptr<ChunkedArray>& chunked_array, int64_t offset,
+    ExtendFunc extend) {
+  if (offset >= chunked_array->length()) {
+    return MakeSegment(chunked_array->length(), chunked_array->length(), 0,
+                       kEmptyExtends);
+  }
+  int64_t remaining_offset = offset;
+  const auto& arrays = chunked_array->chunks();
+  for (size_t i = 0; remaining_offset >= 0 && i < arrays.size(); i++) {
+    // look up chunk containing offset
+    int64_t array_length = arrays[i]->length();
+    if (remaining_offset < array_length) {
+      // found - switch to matching
+      int64_t match_width = arrays[i]->type()->byte_width();
+      const uint8_t* match_bytes = GetValuesAsBytes(*arrays[i]->data(), remaining_offset);
+      int64_t total_match_length = 0;
+      for (; i < arrays.size(); i++) {
+        int64_t array_length = arrays[i]->length();
+        if (array_length <= 0) continue;
+        const uint8_t* array_bytes = GetValuesAsBytes(*arrays[i]->data());
+        int64_t match_length = GetMatchLength(match_bytes, match_width, array_bytes,
+                                              remaining_offset, array_length);
+        total_match_length += match_length;
+        remaining_offset = 0;
+        if (match_length < array_length - remaining_offset) break;
+      }
+      bool extends = extend(match_bytes);
+      return MakeSegment(chunked_array->length(), offset, total_match_length, extends);
+    }
+    remaining_offset -= array_length;
+  }
+  return Status::Invalid("segmenting invalid chunked array value");
+}
+
+struct NoKeysGroupingSegmenter : public BaseGroupingSegmenter {
+  static std::unique_ptr<GroupingSegmenter> Make() {
+    return std::make_unique<NoKeysGroupingSegmenter>();
+  }
+
+  NoKeysGroupingSegmenter() : BaseGroupingSegmenter({}) {}
+
+  Status Reset() override { return Status::OK(); }
+
+  template <typename Batch>
+  Result<GroupingSegment> GetNextSegmentImpl(const Batch& batch, int64_t offset) {
+    ARROW_RETURN_NOT_OK(CheckForGetNextSegment(batch, offset, {}));
+    return MakeSegment(batch.length, offset, batch.length - offset, kDefaultExtends);
+  }
+
+  Result<GroupingSegment> GetNextSegment(const ExecSpan& batch, int64_t offset) override {
+    return GetNextSegmentImpl(batch, offset);
+  }
+
+  Result<GroupingSegment> GetNextSegment(const ExecBatch& batch,
+                                         int64_t offset) override {
+    return GetNextSegmentImpl(batch, offset);
+  }
+};
+
+struct SimpleKeyGroupingSegmenter : public BaseGroupingSegmenter {
+  static Result<std::unique_ptr<GroupingSegmenter>> Make(TypeHolder key_type) {
+    return std::make_unique<SimpleKeyGroupingSegmenter>(key_type);
+  }
+
+  explicit SimpleKeyGroupingSegmenter(TypeHolder key_type)
+      : BaseGroupingSegmenter({key_type}), key_type_(key_types_[0]), save_key_data_() {}
+
+  Status CheckType(const DataType& type) {
+    if (!is_fixed_width(type)) {
+      return Status::Invalid("SimpleKeyGroupingSegmenter does not support type ", type);
+    }
+    return Status::OK();
+  }
+
+  Status Reset() override {
+    save_key_data_.resize(0);
+    return Status::OK();
+  }
+
+  bool Extend(const void* data) {

Review Comment:
   ```suggestion
     // Checks to see if this data extends the last seen value and
     // if it doesn't then updates the last seen value
     bool Extend(const void* data) {
   ```



##########
cpp/src/arrow/compute/kernels/hash_aggregate_test.cc:
##########
@@ -174,81 +242,117 @@ Result<Datum> RunGroupBy(const BatchesWithSchema& input,
   ARROW_ASSIGN_OR_RAISE(std::vector<ExecBatch> output_batches,
                         start_and_collect.MoveResult());
 
-  ArrayVector out_arrays(aggregates.size() + key_names.size());
   const auto& output_schema = plan->nodes()[0]->output()->output_schema();
+  if (!segmented) {
+    return MakeGroupByOutput(output_batches, output_schema, aggregates.size(),
+                             key_names.size(), naive);
+  }
+
+  std::vector<ArrayVector> out_arrays(aggregates.size() + key_names.size() +
+                                      segment_key_names.size());
   for (size_t i = 0; i < out_arrays.size(); ++i) {
     std::vector<std::shared_ptr<Array>> arrays(output_batches.size());
     for (size_t j = 0; j < output_batches.size(); ++j) {
-      arrays[j] = output_batches[j].values[i].make_array();
+      auto& value = output_batches[j].values[i];
+      if (value.is_scalar()) {
+        ARROW_ASSIGN_OR_RAISE(
+            arrays[j], MakeArrayFromScalar(*value.scalar(), output_batches[j].length));
+      } else if (value.is_array()) {
+        arrays[j] = value.make_array();
+      } else {
+        return Status::Invalid("GroupByUsingExecPlan unsupported value kind ",
+                               ToString(value.kind()));
+      }
     }
     if (arrays.empty()) {
+      arrays.resize(1);
       ARROW_ASSIGN_OR_RAISE(
-          out_arrays[i],
-          MakeArrayOfNull(output_schema->field(static_cast<int>(i))->type(),
-                          /*length=*/0));
-    } else {
-      ARROW_ASSIGN_OR_RAISE(out_arrays[i], Concatenate(arrays));
+          arrays[0], MakeArrayOfNull(output_schema->field(static_cast<int>(i))->type(),
+                                     /*length=*/0));
     }
+    out_arrays[i] = {std::move(arrays)};
   }
 
-  // The exec plan may reorder the output rows.  The tests are all setup to expect ouptut
-  // in ascending order of keys.  So we need to sort the result by the key columns.  To do
-  // that we create a table using the key columns, calculate the sort indices from that
-  // table (sorting on all fields) and then use those indices to calculate our result.
-  std::vector<std::shared_ptr<Field>> key_fields;
-  std::vector<std::shared_ptr<Array>> key_columns;
-  std::vector<SortKey> sort_keys;
-  for (std::size_t i = 0; i < key_names.size(); i++) {
-    const std::shared_ptr<Array>& arr = out_arrays[i + aggregates.size()];
-    if (arr->type_id() == Type::DICTIONARY) {
-      // Can't sort dictionary columns so need to decode
-      auto dict_arr = checked_pointer_cast<DictionaryArray>(arr);
-      ARROW_ASSIGN_OR_RAISE(auto decoded_arr,
-                            Take(*dict_arr->dictionary(), *dict_arr->indices()));
-      key_columns.push_back(decoded_arr);
-      key_fields.push_back(
-          field("name_does_not_matter", dict_arr->dict_type()->value_type()));
-    } else {
-      key_columns.push_back(arr);
-      key_fields.push_back(field("name_does_not_matter", arr->type()));
+  if (segmented && segment_key_names.size() > 0) {
+    ArrayVector struct_arrays;
+    struct_arrays.reserve(output_batches.size());
+    for (size_t j = 0; j < output_batches.size(); ++j) {
+      ArrayVector struct_fields;
+      struct_fields.reserve(out_arrays.size());
+      for (auto out_array : out_arrays) {
+        struct_fields.push_back(out_array[j]);
+      }
+      ARROW_ASSIGN_OR_RAISE(auto struct_array,
+                            StructArray::Make(struct_fields, output_schema->fields()));
+      struct_arrays.push_back(struct_array);
     }
-    sort_keys.emplace_back(static_cast<int>(i));
+    return ChunkedArray::Make(struct_arrays);
+  } else {
+    ArrayVector struct_fields(out_arrays.size());
+    for (size_t i = 0; i < out_arrays.size(); ++i) {
+      ARROW_ASSIGN_OR_RAISE(struct_fields[i], Concatenate(out_arrays[i]));
+    }
+    return StructArray::Make(std::move(struct_fields), output_schema->fields());
   }
-  std::shared_ptr<Schema> key_schema = schema(std::move(key_fields));
-  std::shared_ptr<Table> key_table = Table::Make(std::move(key_schema), key_columns);
-  SortOptions sort_options(std::move(sort_keys));
-  ARROW_ASSIGN_OR_RAISE(std::shared_ptr<Array> sort_indices,
-                        SortIndices(key_table, sort_options));
+}
 
-  ARROW_ASSIGN_OR_RAISE(
-      std::shared_ptr<Array> struct_arr,
-      StructArray::Make(std::move(out_arrays), output_schema->fields()));
+Result<Datum> RunGroupBy(const BatchesWithSchema& input,
+                         const std::vector<std::string>& key_names,
+                         const std::vector<std::string>& segment_key_names,
+                         const std::vector<Aggregate>& aggregates, bool use_threads,
+                         bool segmented = false, bool naive = false) {
+  if (segment_key_names.size() > 0) {
+    ARROW_ASSIGN_OR_RAISE(auto thread_pool, arrow::internal::ThreadPool::Make(1));
+    ExecContext seq_ctx(default_memory_pool(), thread_pool.get());
+    return RunGroupBy(input, key_names, segment_key_names, aggregates, &seq_ctx,
+                      use_threads, segmented, naive);
+  } else {
+    return RunGroupBy(input, key_names, segment_key_names, aggregates,
+                      threaded_exec_context(), use_threads, segmented, naive);
+  }
+}
 
-  return Take(struct_arr, sort_indices);
+Result<Datum> RunGroupBy(const BatchesWithSchema& input,
+                         const std::vector<std::string>& key_names,
+                         const std::vector<Aggregate>& aggregates, bool use_threads,
+                         bool segmented = false, bool naive = false) {
+  return RunGroupBy(input, key_names, {}, aggregates, use_threads, segmented);
 }
 
 /// Simpler overload where you can give the columns as datums
 Result<Datum> RunGroupBy(const std::vector<Datum>& arguments,
                          const std::vector<Datum>& keys,
-                         const std::vector<Aggregate>& aggregates,
-                         bool use_threads = false) {
+                         const std::vector<Datum>& segment_keys,
+                         const std::vector<Aggregate>& aggregates, bool use_threads,
+                         bool segmented = false, bool naive = false) {
   using arrow::compute::detail::ExecSpanIterator;
 
-  FieldVector scan_fields(arguments.size() + keys.size());
+  FieldVector scan_fields(arguments.size() + keys.size() + segment_keys.size());
   std::vector<std::string> key_names(keys.size());
+  std::vector<std::string> segment_key_names(segment_keys.size());
   for (size_t i = 0; i < arguments.size(); ++i) {
     auto name = std::string("agg_") + ToChars(i);
     scan_fields[i] = field(name, arguments[i].type());
   }
+  size_t base = arguments.size();
   for (size_t i = 0; i < keys.size(); ++i) {
     auto name = std::string("key_") + ToChars(i);
-    scan_fields[arguments.size() + i] = field(name, keys[i].type());
+    scan_fields[base + i] = field(name, keys[i].type());
     key_names[i] = std::move(name);
   }
+  base += keys.size();
+  size_t j = segmented ? keys.size() : keys.size();
+  std::string prefix(segmented ? "key_" : "key_");

Review Comment:
   ?



##########
cpp/src/arrow/compute/row/grouper.h:
##########
@@ -30,6 +30,49 @@
 namespace arrow {
 namespace compute {
 
+/// \brief A segment of contiguous rows for grouping
+struct ARROW_EXPORT GroupingSegment {
+  int64_t offset;
+  int64_t length;
+  bool is_open;
+  bool extends;
+};
+
+inline bool operator==(const GroupingSegment& segment1, const GroupingSegment& segment2) {
+  return segment1.offset == segment2.offset && segment1.length == segment2.length &&
+         segment1.is_open == segment2.is_open && segment1.extends == segment2.extends;
+}
+inline bool operator!=(const GroupingSegment& segment1, const GroupingSegment& segment2) {
+  return !(segment1 == segment2);
+}
+
+/// \brief Computes grouping segments for a batch. Each segment covers rows with identical
+/// values in the batch. The values in the batch are often selected as keys from a larger
+/// batch.

Review Comment:
   ```suggestion
   /// \brief a helper class to divide a batch into segments of equal values
   ///
   /// For example, given a batch with two rows:
   ///
   /// A A
   /// A A
   /// A B
   /// A B
   /// A B
   ///
   /// Then the batch could be divided into two segments.  The first would be rows 0 & 1
   /// and the second would be rows 2, 3, & 4.
   ///
   /// In addition, a segmenter keeps track of the last value seen.  This allows it to calculate
   /// segments which span batches.  In our above example the last batch we emit would be
   /// "open" which indicates that the segment may or may not extend into the next batch.
   ///
   /// If the next call to the segmenter starts with `A B` then that segment would be marked with
   /// `extends = true` so that the caller knows this is a continuation of the last open batch.
   ```



##########
cpp/src/arrow/compute/row/grouper.cc:
##########
@@ -39,12 +43,521 @@
 namespace arrow {
 
 using internal::checked_cast;
+using internal::PrimitiveScalarBase;
 
 namespace compute {
 
 namespace {
 
-struct GrouperImpl : Grouper {
+constexpr uint32_t kNoGroupId = std::numeric_limits<uint32_t>::max();
+
+using group_id_t = std::remove_const<decltype(kNoGroupId)>::type;
+using GroupIdType = CTypeTraits<group_id_t>::ArrowType;
+auto group_id_type = std::make_shared<GroupIdType>();
+
+inline const uint8_t* GetValuesAsBytes(const ArrayData& data, int64_t offset = 0) {
+  DCHECK_GT(data.type->byte_width(), 0);
+  int64_t absolute_byte_offset = (data.offset + offset) * data.type->byte_width();
+  return data.GetValues<uint8_t>(1, absolute_byte_offset);
+}
+
+inline const uint8_t* GetValuesAsBytes(const ArraySpan& data, int64_t offset = 0) {
+  DCHECK_GT(data.type->byte_width(), 0);
+  int64_t absolute_byte_offset = (data.offset + offset) * data.type->byte_width();
+  return data.GetValues<uint8_t>(1, absolute_byte_offset);
+}
+
+template <typename Value>
+Status CheckForGetNextSegment(const std::vector<Value>& values, int64_t length,
+                              int64_t offset, const std::vector<TypeHolder>& key_types) {
+  if (offset < 0 || offset > length) {
+    return Status::Invalid("invalid grouping segmenter offset: ", offset);
+  }
+  if (values.size() != key_types.size()) {
+    return Status::Invalid("expected batch size ", key_types.size(), " but got ",
+                           values.size());
+  }
+  for (size_t i = 0; i < key_types.size(); i++) {
+    const auto& value = values[i];
+    const auto& key_type = key_types[i];
+    if (*value.type() != *key_type.type) {
+      return Status::Invalid("expected batch value ", i, " of type ", *key_type.type,
+                             " but got ", *value.type());
+    }
+  }
+  return Status::OK();
+}
+
+template <typename Batch>
+enable_if_t<std::is_same<Batch, ExecSpan>::value || std::is_same<Batch, ExecBatch>::value,
+            Status>
+CheckForGetNextSegment(const Batch& batch, int64_t offset,
+                       const std::vector<TypeHolder>& key_types) {
+  return CheckForGetNextSegment(batch.values, batch.length, offset, key_types);
+}
+
+struct BaseGroupingSegmenter : public GroupingSegmenter {
+  explicit BaseGroupingSegmenter(const std::vector<TypeHolder>& key_types)
+      : key_types_(key_types) {}
+
+  const std::vector<TypeHolder>& key_types() const override { return key_types_; }
+
+  std::vector<TypeHolder> key_types_;
+};
+
+GroupingSegment MakeSegment(int64_t batch_length, int64_t offset, int64_t length,
+                            bool extends) {
+  return GroupingSegment{offset, length, offset + length >= batch_length, extends};
+}
+
+int64_t GetMatchLength(const uint8_t* match_bytes, int64_t match_width,
+                       const uint8_t* array_bytes, int64_t offset, int64_t length) {
+  int64_t cursor, byte_cursor;
+  for (cursor = offset, byte_cursor = match_width * cursor; cursor < length;
+       cursor++, byte_cursor += match_width) {
+    if (memcmp(match_bytes, array_bytes + byte_cursor,
+               static_cast<size_t>(match_width)) != 0) {
+      break;
+    }
+  }
+  return std::min(cursor, length - offset);
+}
+
+using ExtendFunc = std::function<bool(const void*)>;
+constexpr bool kDefaultExtends = true;
+constexpr bool kEmptyExtends = true;
+
+Result<GroupingSegment> GetNextSegmentChunked(
+    const std::shared_ptr<ChunkedArray>& chunked_array, int64_t offset,
+    ExtendFunc extend) {
+  if (offset >= chunked_array->length()) {
+    return MakeSegment(chunked_array->length(), chunked_array->length(), 0,
+                       kEmptyExtends);
+  }
+  int64_t remaining_offset = offset;
+  const auto& arrays = chunked_array->chunks();
+  for (size_t i = 0; remaining_offset >= 0 && i < arrays.size(); i++) {
+    // look up chunk containing offset
+    int64_t array_length = arrays[i]->length();
+    if (remaining_offset < array_length) {
+      // found - switch to matching
+      int64_t match_width = arrays[i]->type()->byte_width();
+      const uint8_t* match_bytes = GetValuesAsBytes(*arrays[i]->data(), remaining_offset);
+      int64_t total_match_length = 0;
+      for (; i < arrays.size(); i++) {
+        int64_t array_length = arrays[i]->length();
+        if (array_length <= 0) continue;
+        const uint8_t* array_bytes = GetValuesAsBytes(*arrays[i]->data());
+        int64_t match_length = GetMatchLength(match_bytes, match_width, array_bytes,
+                                              remaining_offset, array_length);
+        total_match_length += match_length;
+        remaining_offset = 0;
+        if (match_length < array_length - remaining_offset) break;
+      }
+      bool extends = extend(match_bytes);
+      return MakeSegment(chunked_array->length(), offset, total_match_length, extends);
+    }
+    remaining_offset -= array_length;
+  }
+  return Status::Invalid("segmenting invalid chunked array value");
+}

Review Comment:
   ```suggestion
   ```
   
   Now that #14867 has merged I think we can safely get rid of this.



##########
cpp/src/arrow/compute/row/grouper.cc:
##########
@@ -39,12 +43,521 @@
 namespace arrow {
 
 using internal::checked_cast;
+using internal::PrimitiveScalarBase;
 
 namespace compute {
 
 namespace {
 
-struct GrouperImpl : Grouper {
+constexpr uint32_t kNoGroupId = std::numeric_limits<uint32_t>::max();
+
+using group_id_t = std::remove_const<decltype(kNoGroupId)>::type;
+using GroupIdType = CTypeTraits<group_id_t>::ArrowType;
+auto group_id_type = std::make_shared<GroupIdType>();
+
+inline const uint8_t* GetValuesAsBytes(const ArrayData& data, int64_t offset = 0) {
+  DCHECK_GT(data.type->byte_width(), 0);
+  int64_t absolute_byte_offset = (data.offset + offset) * data.type->byte_width();
+  return data.GetValues<uint8_t>(1, absolute_byte_offset);
+}
+
+inline const uint8_t* GetValuesAsBytes(const ArraySpan& data, int64_t offset = 0) {
+  DCHECK_GT(data.type->byte_width(), 0);
+  int64_t absolute_byte_offset = (data.offset + offset) * data.type->byte_width();
+  return data.GetValues<uint8_t>(1, absolute_byte_offset);
+}
+
+template <typename Value>
+Status CheckForGetNextSegment(const std::vector<Value>& values, int64_t length,
+                              int64_t offset, const std::vector<TypeHolder>& key_types) {
+  if (offset < 0 || offset > length) {
+    return Status::Invalid("invalid grouping segmenter offset: ", offset);
+  }
+  if (values.size() != key_types.size()) {
+    return Status::Invalid("expected batch size ", key_types.size(), " but got ",
+                           values.size());
+  }
+  for (size_t i = 0; i < key_types.size(); i++) {
+    const auto& value = values[i];
+    const auto& key_type = key_types[i];
+    if (*value.type() != *key_type.type) {
+      return Status::Invalid("expected batch value ", i, " of type ", *key_type.type,
+                             " but got ", *value.type());
+    }
+  }
+  return Status::OK();
+}
+
+template <typename Batch>
+enable_if_t<std::is_same<Batch, ExecSpan>::value || std::is_same<Batch, ExecBatch>::value,
+            Status>
+CheckForGetNextSegment(const Batch& batch, int64_t offset,
+                       const std::vector<TypeHolder>& key_types) {
+  return CheckForGetNextSegment(batch.values, batch.length, offset, key_types);
+}
+
+struct BaseGroupingSegmenter : public GroupingSegmenter {
+  explicit BaseGroupingSegmenter(const std::vector<TypeHolder>& key_types)
+      : key_types_(key_types) {}
+
+  const std::vector<TypeHolder>& key_types() const override { return key_types_; }
+
+  std::vector<TypeHolder> key_types_;
+};
+
+GroupingSegment MakeSegment(int64_t batch_length, int64_t offset, int64_t length,
+                            bool extends) {
+  return GroupingSegment{offset, length, offset + length >= batch_length, extends};
+}
+
+int64_t GetMatchLength(const uint8_t* match_bytes, int64_t match_width,
+                       const uint8_t* array_bytes, int64_t offset, int64_t length) {
+  int64_t cursor, byte_cursor;
+  for (cursor = offset, byte_cursor = match_width * cursor; cursor < length;
+       cursor++, byte_cursor += match_width) {
+    if (memcmp(match_bytes, array_bytes + byte_cursor,
+               static_cast<size_t>(match_width)) != 0) {
+      break;
+    }
+  }
+  return std::min(cursor, length - offset);
+}
+
+using ExtendFunc = std::function<bool(const void*)>;
+constexpr bool kDefaultExtends = true;
+constexpr bool kEmptyExtends = true;
+
+Result<GroupingSegment> GetNextSegmentChunked(
+    const std::shared_ptr<ChunkedArray>& chunked_array, int64_t offset,
+    ExtendFunc extend) {
+  if (offset >= chunked_array->length()) {
+    return MakeSegment(chunked_array->length(), chunked_array->length(), 0,
+                       kEmptyExtends);
+  }
+  int64_t remaining_offset = offset;
+  const auto& arrays = chunked_array->chunks();
+  for (size_t i = 0; remaining_offset >= 0 && i < arrays.size(); i++) {
+    // look up chunk containing offset
+    int64_t array_length = arrays[i]->length();
+    if (remaining_offset < array_length) {
+      // found - switch to matching
+      int64_t match_width = arrays[i]->type()->byte_width();
+      const uint8_t* match_bytes = GetValuesAsBytes(*arrays[i]->data(), remaining_offset);
+      int64_t total_match_length = 0;
+      for (; i < arrays.size(); i++) {
+        int64_t array_length = arrays[i]->length();
+        if (array_length <= 0) continue;
+        const uint8_t* array_bytes = GetValuesAsBytes(*arrays[i]->data());
+        int64_t match_length = GetMatchLength(match_bytes, match_width, array_bytes,
+                                              remaining_offset, array_length);
+        total_match_length += match_length;
+        remaining_offset = 0;
+        if (match_length < array_length - remaining_offset) break;
+      }
+      bool extends = extend(match_bytes);
+      return MakeSegment(chunked_array->length(), offset, total_match_length, extends);
+    }
+    remaining_offset -= array_length;
+  }
+  return Status::Invalid("segmenting invalid chunked array value");
+}
+
+struct NoKeysGroupingSegmenter : public BaseGroupingSegmenter {
+  static std::unique_ptr<GroupingSegmenter> Make() {
+    return std::make_unique<NoKeysGroupingSegmenter>();
+  }
+
+  NoKeysGroupingSegmenter() : BaseGroupingSegmenter({}) {}
+
+  Status Reset() override { return Status::OK(); }
+
+  template <typename Batch>
+  Result<GroupingSegment> GetNextSegmentImpl(const Batch& batch, int64_t offset) {
+    ARROW_RETURN_NOT_OK(CheckForGetNextSegment(batch, offset, {}));
+    return MakeSegment(batch.length, offset, batch.length - offset, kDefaultExtends);
+  }
+
+  Result<GroupingSegment> GetNextSegment(const ExecSpan& batch, int64_t offset) override {
+    return GetNextSegmentImpl(batch, offset);
+  }
+
+  Result<GroupingSegment> GetNextSegment(const ExecBatch& batch,
+                                         int64_t offset) override {
+    return GetNextSegmentImpl(batch, offset);
+  }
+};
+
+struct SimpleKeyGroupingSegmenter : public BaseGroupingSegmenter {
+  static Result<std::unique_ptr<GroupingSegmenter>> Make(TypeHolder key_type) {
+    return std::make_unique<SimpleKeyGroupingSegmenter>(key_type);
+  }
+
+  explicit SimpleKeyGroupingSegmenter(TypeHolder key_type)
+      : BaseGroupingSegmenter({key_type}), key_type_(key_types_[0]), save_key_data_() {}
+
+  Status CheckType(const DataType& type) {
+    if (!is_fixed_width(type)) {
+      return Status::Invalid("SimpleKeyGroupingSegmenter does not support type ", type);
+    }
+    return Status::OK();
+  }
+
+  Status Reset() override {
+    save_key_data_.resize(0);
+    return Status::OK();
+  }
+
+  bool Extend(const void* data) {
+    size_t byte_width = static_cast<size_t>(key_type_.type->byte_width());
+    bool extends = save_key_data_.size() != byte_width
+                       ? kDefaultExtends
+                       : 0 == memcmp(save_key_data_.data(), data, byte_width);
+    save_key_data_.resize(byte_width);
+    memcpy(save_key_data_.data(), data, byte_width);
+    return extends;
+  }
+
+  Result<GroupingSegment> GetNextSegment(const Scalar& scalar, int64_t offset,
+                                         int64_t length) {
+    ARROW_RETURN_NOT_OK(CheckType(*scalar.type));
+    if (!scalar.is_valid) {
+      return Status::Invalid("segmenting an invalid scalar");
+    }
+    auto data = checked_cast<const PrimitiveScalarBase&>(scalar).data();
+    bool extends = length > 0 ? Extend(data) : kEmptyExtends;
+    return MakeSegment(length, 0, length, extends);
+  }
+
+  Result<GroupingSegment> GetNextSegment(const DataType& array_type,
+                                         const uint8_t* array_bytes, int64_t offset,
+                                         int64_t length) {
+    RETURN_NOT_OK(CheckType(array_type));
+    int64_t byte_width = array_type.byte_width();
+    int64_t match_length = GetMatchLength(array_bytes + offset * byte_width, byte_width,
+                                          array_bytes, offset, length);
+    bool extends = length > 0 ? Extend(array_bytes + offset * byte_width) : kEmptyExtends;
+    return MakeSegment(length, offset, match_length, extends);
+  }
+
+  Result<GroupingSegment> GetNextSegment(const ExecSpan& batch, int64_t offset) override {
+    ARROW_RETURN_NOT_OK(CheckForGetNextSegment(batch, offset, {key_type_}));
+    if (offset == batch.length) {
+      return MakeSegment(batch.length, offset, 0, kEmptyExtends);
+    }
+    const auto& value = batch.values[0];
+    if (value.is_scalar()) {
+      return GetNextSegment(*value.scalar, offset, batch.length);
+    }
+    ARROW_DCHECK(value.is_array());
+    const auto& array = value.array;
+    if (array.GetNullCount() > 0) {
+      return Status::NotImplemented("segmenting a nullable array");
+    }
+    return GetNextSegment(*array.type, GetValuesAsBytes(array), offset, batch.length);
+  }
+
+  Result<GroupingSegment> GetNextSegment(const ExecBatch& batch,
+                                         int64_t offset) override {
+    ARROW_RETURN_NOT_OK(CheckForGetNextSegment(batch, offset, {key_type_}));
+    if (offset == batch.length) {
+      return MakeSegment(batch.length, offset, 0, kEmptyExtends);
+    }
+    const auto& value = batch.values[0];
+    if (value.is_scalar()) {
+      return GetNextSegment(*value.scalar(), offset, batch.length);
+    }
+    if (value.is_array()) {
+      auto array = value.array();
+      if (array->GetNullCount() > 0) {
+        return Status::NotImplemented("segmenting a nullable array");
+      }
+      return GetNextSegment(*array->type, GetValuesAsBytes(*array), offset, batch.length);
+    }
+    if (value.is_chunked_array()) {
+      auto array = value.chunked_array();
+      if (array->null_count() > 0) {
+        return Status::NotImplemented("segmenting a nullable array");
+      }
+      return GetNextSegmentChunked(array, offset, bound_extend_);
+    }
+    return Status::Invalid("segmenting unsupported value kind ", value.kind());
+  }
+
+ private:
+  TypeHolder key_type_;
+  std::vector<uint8_t> save_key_data_;
+  ExtendFunc bound_extend_ = [this](const void* data) { return Extend(data); };
+};
+
+struct AnyKeysGroupingSegmenter : public BaseGroupingSegmenter {
+  static Result<std::unique_ptr<GroupingSegmenter>> Make(
+      const std::vector<TypeHolder>& key_types, ExecContext* ctx) {
+    ARROW_RETURN_NOT_OK(Grouper::Make(key_types, ctx));  // check types
+    return std::make_unique<AnyKeysGroupingSegmenter>(key_types, ctx);
+  }
+
+  AnyKeysGroupingSegmenter(const std::vector<TypeHolder>& key_types, ExecContext* ctx)
+      : BaseGroupingSegmenter(key_types),
+        ctx_(ctx),
+        grouper_(nullptr),
+        save_group_id_(kNoGroupId) {}
+
+  Status Reset() override {
+    grouper_ = nullptr;
+    save_group_id_ = kNoGroupId;
+    return Status::OK();
+  }
+
+  bool Extend(const void* data) {
+    auto group_id = *static_cast<const group_id_t*>(data);
+    bool extends =
+        save_group_id_ == kNoGroupId ? kDefaultExtends : save_group_id_ == group_id;
+    save_group_id_ = group_id;
+    return extends;
+  }
+
+  template <typename Batch>
+  Result<group_id_t> MapGroupIdAt(const Batch& batch, int64_t offset) {
+    if (offset < 0 || offset >= batch.length) {
+      return Status::Invalid("requesting group id out of bounds");
+    }

Review Comment:
   Isn't this already checked by the call to `CheckForGetNextSegment` in `GetNextSegmentImpl`?



##########
cpp/src/arrow/compute/row/grouper.cc:
##########
@@ -39,12 +43,521 @@
 namespace arrow {
 
 using internal::checked_cast;
+using internal::PrimitiveScalarBase;
 
 namespace compute {
 
 namespace {
 
-struct GrouperImpl : Grouper {
+constexpr uint32_t kNoGroupId = std::numeric_limits<uint32_t>::max();
+
+using group_id_t = std::remove_const<decltype(kNoGroupId)>::type;
+using GroupIdType = CTypeTraits<group_id_t>::ArrowType;
+auto group_id_type = std::make_shared<GroupIdType>();
+
+inline const uint8_t* GetValuesAsBytes(const ArrayData& data, int64_t offset = 0) {
+  DCHECK_GT(data.type->byte_width(), 0);
+  int64_t absolute_byte_offset = (data.offset + offset) * data.type->byte_width();
+  return data.GetValues<uint8_t>(1, absolute_byte_offset);
+}
+
+inline const uint8_t* GetValuesAsBytes(const ArraySpan& data, int64_t offset = 0) {
+  DCHECK_GT(data.type->byte_width(), 0);
+  int64_t absolute_byte_offset = (data.offset + offset) * data.type->byte_width();
+  return data.GetValues<uint8_t>(1, absolute_byte_offset);
+}
+
+template <typename Value>
+Status CheckForGetNextSegment(const std::vector<Value>& values, int64_t length,
+                              int64_t offset, const std::vector<TypeHolder>& key_types) {
+  if (offset < 0 || offset > length) {
+    return Status::Invalid("invalid grouping segmenter offset: ", offset);
+  }
+  if (values.size() != key_types.size()) {
+    return Status::Invalid("expected batch size ", key_types.size(), " but got ",
+                           values.size());
+  }
+  for (size_t i = 0; i < key_types.size(); i++) {
+    const auto& value = values[i];
+    const auto& key_type = key_types[i];
+    if (*value.type() != *key_type.type) {
+      return Status::Invalid("expected batch value ", i, " of type ", *key_type.type,
+                             " but got ", *value.type());
+    }
+  }
+  return Status::OK();
+}
+
+template <typename Batch>
+enable_if_t<std::is_same<Batch, ExecSpan>::value || std::is_same<Batch, ExecBatch>::value,
+            Status>
+CheckForGetNextSegment(const Batch& batch, int64_t offset,
+                       const std::vector<TypeHolder>& key_types) {
+  return CheckForGetNextSegment(batch.values, batch.length, offset, key_types);
+}
+
+struct BaseGroupingSegmenter : public GroupingSegmenter {
+  explicit BaseGroupingSegmenter(const std::vector<TypeHolder>& key_types)
+      : key_types_(key_types) {}
+
+  const std::vector<TypeHolder>& key_types() const override { return key_types_; }
+
+  std::vector<TypeHolder> key_types_;
+};
+
+GroupingSegment MakeSegment(int64_t batch_length, int64_t offset, int64_t length,
+                            bool extends) {
+  return GroupingSegment{offset, length, offset + length >= batch_length, extends};
+}
+
+int64_t GetMatchLength(const uint8_t* match_bytes, int64_t match_width,
+                       const uint8_t* array_bytes, int64_t offset, int64_t length) {
+  int64_t cursor, byte_cursor;
+  for (cursor = offset, byte_cursor = match_width * cursor; cursor < length;
+       cursor++, byte_cursor += match_width) {
+    if (memcmp(match_bytes, array_bytes + byte_cursor,
+               static_cast<size_t>(match_width)) != 0) {
+      break;
+    }
+  }
+  return std::min(cursor, length - offset);
+}
+
+using ExtendFunc = std::function<bool(const void*)>;
+constexpr bool kDefaultExtends = true;
+constexpr bool kEmptyExtends = true;
+
+Result<GroupingSegment> GetNextSegmentChunked(
+    const std::shared_ptr<ChunkedArray>& chunked_array, int64_t offset,
+    ExtendFunc extend) {
+  if (offset >= chunked_array->length()) {
+    return MakeSegment(chunked_array->length(), chunked_array->length(), 0,
+                       kEmptyExtends);
+  }
+  int64_t remaining_offset = offset;
+  const auto& arrays = chunked_array->chunks();
+  for (size_t i = 0; remaining_offset >= 0 && i < arrays.size(); i++) {
+    // look up chunk containing offset
+    int64_t array_length = arrays[i]->length();
+    if (remaining_offset < array_length) {
+      // found - switch to matching
+      int64_t match_width = arrays[i]->type()->byte_width();
+      const uint8_t* match_bytes = GetValuesAsBytes(*arrays[i]->data(), remaining_offset);
+      int64_t total_match_length = 0;
+      for (; i < arrays.size(); i++) {
+        int64_t array_length = arrays[i]->length();
+        if (array_length <= 0) continue;
+        const uint8_t* array_bytes = GetValuesAsBytes(*arrays[i]->data());
+        int64_t match_length = GetMatchLength(match_bytes, match_width, array_bytes,
+                                              remaining_offset, array_length);
+        total_match_length += match_length;
+        remaining_offset = 0;
+        if (match_length < array_length - remaining_offset) break;
+      }
+      bool extends = extend(match_bytes);
+      return MakeSegment(chunked_array->length(), offset, total_match_length, extends);
+    }
+    remaining_offset -= array_length;
+  }
+  return Status::Invalid("segmenting invalid chunked array value");
+}
+
+struct NoKeysGroupingSegmenter : public BaseGroupingSegmenter {
+  static std::unique_ptr<GroupingSegmenter> Make() {
+    return std::make_unique<NoKeysGroupingSegmenter>();
+  }
+
+  NoKeysGroupingSegmenter() : BaseGroupingSegmenter({}) {}
+
+  Status Reset() override { return Status::OK(); }
+
+  template <typename Batch>
+  Result<GroupingSegment> GetNextSegmentImpl(const Batch& batch, int64_t offset) {
+    ARROW_RETURN_NOT_OK(CheckForGetNextSegment(batch, offset, {}));
+    return MakeSegment(batch.length, offset, batch.length - offset, kDefaultExtends);
+  }
+
+  Result<GroupingSegment> GetNextSegment(const ExecSpan& batch, int64_t offset) override {
+    return GetNextSegmentImpl(batch, offset);
+  }
+
+  Result<GroupingSegment> GetNextSegment(const ExecBatch& batch,
+                                         int64_t offset) override {
+    return GetNextSegmentImpl(batch, offset);
+  }
+};
+
+struct SimpleKeyGroupingSegmenter : public BaseGroupingSegmenter {
+  static Result<std::unique_ptr<GroupingSegmenter>> Make(TypeHolder key_type) {
+    return std::make_unique<SimpleKeyGroupingSegmenter>(key_type);
+  }
+
+  explicit SimpleKeyGroupingSegmenter(TypeHolder key_type)
+      : BaseGroupingSegmenter({key_type}), key_type_(key_types_[0]), save_key_data_() {}
+
+  Status CheckType(const DataType& type) {
+    if (!is_fixed_width(type)) {
+      return Status::Invalid("SimpleKeyGroupingSegmenter does not support type ", type);
+    }
+    return Status::OK();
+  }
+
+  Status Reset() override {
+    save_key_data_.resize(0);
+    return Status::OK();
+  }
+
+  bool Extend(const void* data) {
+    size_t byte_width = static_cast<size_t>(key_type_.type->byte_width());
+    bool extends = save_key_data_.size() != byte_width
+                       ? kDefaultExtends
+                       : 0 == memcmp(save_key_data_.data(), data, byte_width);
+    save_key_data_.resize(byte_width);
+    memcpy(save_key_data_.data(), data, byte_width);
+    return extends;
+  }
+
+  Result<GroupingSegment> GetNextSegment(const Scalar& scalar, int64_t offset,
+                                         int64_t length) {
+    ARROW_RETURN_NOT_OK(CheckType(*scalar.type));
+    if (!scalar.is_valid) {
+      return Status::Invalid("segmenting an invalid scalar");
+    }
+    auto data = checked_cast<const PrimitiveScalarBase&>(scalar).data();
+    bool extends = length > 0 ? Extend(data) : kEmptyExtends;
+    return MakeSegment(length, 0, length, extends);
+  }
+
+  Result<GroupingSegment> GetNextSegment(const DataType& array_type,
+                                         const uint8_t* array_bytes, int64_t offset,
+                                         int64_t length) {
+    RETURN_NOT_OK(CheckType(array_type));
+    int64_t byte_width = array_type.byte_width();
+    int64_t match_length = GetMatchLength(array_bytes + offset * byte_width, byte_width,
+                                          array_bytes, offset, length);
+    bool extends = length > 0 ? Extend(array_bytes + offset * byte_width) : kEmptyExtends;
+    return MakeSegment(length, offset, match_length, extends);
+  }
+
+  Result<GroupingSegment> GetNextSegment(const ExecSpan& batch, int64_t offset) override {
+    ARROW_RETURN_NOT_OK(CheckForGetNextSegment(batch, offset, {key_type_}));
+    if (offset == batch.length) {
+      return MakeSegment(batch.length, offset, 0, kEmptyExtends);
+    }
+    const auto& value = batch.values[0];
+    if (value.is_scalar()) {
+      return GetNextSegment(*value.scalar, offset, batch.length);
+    }
+    ARROW_DCHECK(value.is_array());
+    const auto& array = value.array;
+    if (array.GetNullCount() > 0) {
+      return Status::NotImplemented("segmenting a nullable array");
+    }
+    return GetNextSegment(*array.type, GetValuesAsBytes(array), offset, batch.length);
+  }
+
+  Result<GroupingSegment> GetNextSegment(const ExecBatch& batch,
+                                         int64_t offset) override {
+    ARROW_RETURN_NOT_OK(CheckForGetNextSegment(batch, offset, {key_type_}));
+    if (offset == batch.length) {
+      return MakeSegment(batch.length, offset, 0, kEmptyExtends);
+    }
+    const auto& value = batch.values[0];
+    if (value.is_scalar()) {
+      return GetNextSegment(*value.scalar(), offset, batch.length);
+    }
+    if (value.is_array()) {
+      auto array = value.array();
+      if (array->GetNullCount() > 0) {
+        return Status::NotImplemented("segmenting a nullable array");
+      }
+      return GetNextSegment(*array->type, GetValuesAsBytes(*array), offset, batch.length);
+    }
+    if (value.is_chunked_array()) {
+      auto array = value.chunked_array();
+      if (array->null_count() > 0) {
+        return Status::NotImplemented("segmenting a nullable array");
+      }
+      return GetNextSegmentChunked(array, offset, bound_extend_);
+    }
+    return Status::Invalid("segmenting unsupported value kind ", value.kind());
+  }
+
+ private:
+  TypeHolder key_type_;
+  std::vector<uint8_t> save_key_data_;
+  ExtendFunc bound_extend_ = [this](const void* data) { return Extend(data); };
+};
+
+struct AnyKeysGroupingSegmenter : public BaseGroupingSegmenter {
+  static Result<std::unique_ptr<GroupingSegmenter>> Make(
+      const std::vector<TypeHolder>& key_types, ExecContext* ctx) {
+    ARROW_RETURN_NOT_OK(Grouper::Make(key_types, ctx));  // check types
+    return std::make_unique<AnyKeysGroupingSegmenter>(key_types, ctx);
+  }
+
+  AnyKeysGroupingSegmenter(const std::vector<TypeHolder>& key_types, ExecContext* ctx)
+      : BaseGroupingSegmenter(key_types),
+        ctx_(ctx),
+        grouper_(nullptr),
+        save_group_id_(kNoGroupId) {}
+
+  Status Reset() override {
+    grouper_ = nullptr;
+    save_group_id_ = kNoGroupId;
+    return Status::OK();
+  }
+
+  bool Extend(const void* data) {
+    auto group_id = *static_cast<const group_id_t*>(data);
+    bool extends =
+        save_group_id_ == kNoGroupId ? kDefaultExtends : save_group_id_ == group_id;
+    save_group_id_ = group_id;
+    return extends;
+  }
+
+  template <typename Batch>
+  Result<group_id_t> MapGroupIdAt(const Batch& batch, int64_t offset) {
+    if (offset < 0 || offset >= batch.length) {
+      return Status::Invalid("requesting group id out of bounds");
+    }
+    if (!grouper_) return kNoGroupId;
+    ARROW_ASSIGN_OR_RAISE(auto datum, grouper_->Consume(batch, offset,
+                                                        /*consume_length=*/1));
+    if (!(datum.is_array() || datum.is_chunked_array())) {
+      return Status::Invalid("accessing unsupported datum kind ", datum.kind());
+    }
+    const std::shared_ptr<ArrayData>& data =
+        datum.is_array() ? datum.array() : datum.chunked_array()->chunk(0)->data();
+    ARROW_DCHECK(data->GetNullCount() == 0);
+    DCHECK_EQ(data->type->id(), GroupIdType::type_id);
+    DCHECK_EQ(1, data->length);
+    const group_id_t* values = data->GetValues<group_id_t>(1);
+    return values[0];
+  }
+
+  template <typename Batch>
+  Result<GroupingSegment> GetNextSegmentImpl(const Batch& batch, int64_t offset) {
+    ARROW_RETURN_NOT_OK(CheckForGetNextSegment(batch, offset, key_types_));
+    if (offset == batch.length) {
+      return MakeSegment(batch.length, offset, 0, kEmptyExtends);
+    }
+    // ARROW-18311: make Grouper support Reset()
+    // so it can be cached instead of recreated below
+    //
+    // the group id must be computed prior to resetting the grouper, since it is compared
+    // to save_group_id_, and after resetting the grouper produces incomparable group ids
+    ARROW_ASSIGN_OR_RAISE(auto group_id, MapGroupIdAt(batch, offset));
+    ExtendFunc bound_extend = [this, group_id](const void* data) {
+      bool extends = Extend(&group_id);
+      save_group_id_ = *static_cast<const group_id_t*>(data);
+      return extends;
+    };
+    ARROW_ASSIGN_OR_RAISE(grouper_, Grouper::Make(key_types_, ctx_));  // TODO: reset it

Review Comment:
   Do you expect a reset to be cheaper than just creating a new grouper?  I suppose it might be able to reuse some allocations.



##########
cpp/src/arrow/compute/row/grouper.cc:
##########
@@ -39,12 +43,521 @@
 namespace arrow {
 
 using internal::checked_cast;
+using internal::PrimitiveScalarBase;
 
 namespace compute {
 
 namespace {
 
-struct GrouperImpl : Grouper {
+constexpr uint32_t kNoGroupId = std::numeric_limits<uint32_t>::max();
+
+using group_id_t = std::remove_const<decltype(kNoGroupId)>::type;
+using GroupIdType = CTypeTraits<group_id_t>::ArrowType;
+auto group_id_type = std::make_shared<GroupIdType>();
+
+inline const uint8_t* GetValuesAsBytes(const ArrayData& data, int64_t offset = 0) {
+  DCHECK_GT(data.type->byte_width(), 0);
+  int64_t absolute_byte_offset = (data.offset + offset) * data.type->byte_width();
+  return data.GetValues<uint8_t>(1, absolute_byte_offset);
+}
+
+inline const uint8_t* GetValuesAsBytes(const ArraySpan& data, int64_t offset = 0) {
+  DCHECK_GT(data.type->byte_width(), 0);
+  int64_t absolute_byte_offset = (data.offset + offset) * data.type->byte_width();
+  return data.GetValues<uint8_t>(1, absolute_byte_offset);
+}
+
+template <typename Value>
+Status CheckForGetNextSegment(const std::vector<Value>& values, int64_t length,
+                              int64_t offset, const std::vector<TypeHolder>& key_types) {
+  if (offset < 0 || offset > length) {
+    return Status::Invalid("invalid grouping segmenter offset: ", offset);
+  }
+  if (values.size() != key_types.size()) {
+    return Status::Invalid("expected batch size ", key_types.size(), " but got ",
+                           values.size());
+  }
+  for (size_t i = 0; i < key_types.size(); i++) {
+    const auto& value = values[i];
+    const auto& key_type = key_types[i];
+    if (*value.type() != *key_type.type) {
+      return Status::Invalid("expected batch value ", i, " of type ", *key_type.type,
+                             " but got ", *value.type());
+    }
+  }
+  return Status::OK();
+}
+
+template <typename Batch>
+enable_if_t<std::is_same<Batch, ExecSpan>::value || std::is_same<Batch, ExecBatch>::value,
+            Status>
+CheckForGetNextSegment(const Batch& batch, int64_t offset,
+                       const std::vector<TypeHolder>& key_types) {
+  return CheckForGetNextSegment(batch.values, batch.length, offset, key_types);
+}
+
+struct BaseGroupingSegmenter : public GroupingSegmenter {
+  explicit BaseGroupingSegmenter(const std::vector<TypeHolder>& key_types)
+      : key_types_(key_types) {}
+
+  const std::vector<TypeHolder>& key_types() const override { return key_types_; }
+
+  std::vector<TypeHolder> key_types_;
+};
+
+GroupingSegment MakeSegment(int64_t batch_length, int64_t offset, int64_t length,
+                            bool extends) {
+  return GroupingSegment{offset, length, offset + length >= batch_length, extends};
+}
+
+int64_t GetMatchLength(const uint8_t* match_bytes, int64_t match_width,
+                       const uint8_t* array_bytes, int64_t offset, int64_t length) {
+  int64_t cursor, byte_cursor;
+  for (cursor = offset, byte_cursor = match_width * cursor; cursor < length;
+       cursor++, byte_cursor += match_width) {
+    if (memcmp(match_bytes, array_bytes + byte_cursor,
+               static_cast<size_t>(match_width)) != 0) {
+      break;
+    }
+  }
+  return std::min(cursor, length - offset);
+}
+
+using ExtendFunc = std::function<bool(const void*)>;
+constexpr bool kDefaultExtends = true;
+constexpr bool kEmptyExtends = true;
+
+Result<GroupingSegment> GetNextSegmentChunked(
+    const std::shared_ptr<ChunkedArray>& chunked_array, int64_t offset,
+    ExtendFunc extend) {
+  if (offset >= chunked_array->length()) {
+    return MakeSegment(chunked_array->length(), chunked_array->length(), 0,
+                       kEmptyExtends);
+  }
+  int64_t remaining_offset = offset;
+  const auto& arrays = chunked_array->chunks();
+  for (size_t i = 0; remaining_offset >= 0 && i < arrays.size(); i++) {
+    // look up chunk containing offset
+    int64_t array_length = arrays[i]->length();
+    if (remaining_offset < array_length) {
+      // found - switch to matching
+      int64_t match_width = arrays[i]->type()->byte_width();
+      const uint8_t* match_bytes = GetValuesAsBytes(*arrays[i]->data(), remaining_offset);
+      int64_t total_match_length = 0;
+      for (; i < arrays.size(); i++) {
+        int64_t array_length = arrays[i]->length();
+        if (array_length <= 0) continue;
+        const uint8_t* array_bytes = GetValuesAsBytes(*arrays[i]->data());
+        int64_t match_length = GetMatchLength(match_bytes, match_width, array_bytes,
+                                              remaining_offset, array_length);
+        total_match_length += match_length;
+        remaining_offset = 0;
+        if (match_length < array_length - remaining_offset) break;
+      }
+      bool extends = extend(match_bytes);
+      return MakeSegment(chunked_array->length(), offset, total_match_length, extends);
+    }
+    remaining_offset -= array_length;
+  }
+  return Status::Invalid("segmenting invalid chunked array value");
+}
+
+struct NoKeysGroupingSegmenter : public BaseGroupingSegmenter {
+  static std::unique_ptr<GroupingSegmenter> Make() {
+    return std::make_unique<NoKeysGroupingSegmenter>();
+  }
+
+  NoKeysGroupingSegmenter() : BaseGroupingSegmenter({}) {}
+
+  Status Reset() override { return Status::OK(); }
+
+  template <typename Batch>
+  Result<GroupingSegment> GetNextSegmentImpl(const Batch& batch, int64_t offset) {
+    ARROW_RETURN_NOT_OK(CheckForGetNextSegment(batch, offset, {}));
+    return MakeSegment(batch.length, offset, batch.length - offset, kDefaultExtends);
+  }
+
+  Result<GroupingSegment> GetNextSegment(const ExecSpan& batch, int64_t offset) override {
+    return GetNextSegmentImpl(batch, offset);
+  }
+
+  Result<GroupingSegment> GetNextSegment(const ExecBatch& batch,
+                                         int64_t offset) override {
+    return GetNextSegmentImpl(batch, offset);
+  }
+};
+
+struct SimpleKeyGroupingSegmenter : public BaseGroupingSegmenter {
+  static Result<std::unique_ptr<GroupingSegmenter>> Make(TypeHolder key_type) {
+    return std::make_unique<SimpleKeyGroupingSegmenter>(key_type);
+  }
+
+  explicit SimpleKeyGroupingSegmenter(TypeHolder key_type)
+      : BaseGroupingSegmenter({key_type}), key_type_(key_types_[0]), save_key_data_() {}
+
+  Status CheckType(const DataType& type) {
+    if (!is_fixed_width(type)) {
+      return Status::Invalid("SimpleKeyGroupingSegmenter does not support type ", type);
+    }
+    return Status::OK();
+  }
+
+  Status Reset() override {
+    save_key_data_.resize(0);
+    return Status::OK();
+  }
+
+  bool Extend(const void* data) {
+    size_t byte_width = static_cast<size_t>(key_type_.type->byte_width());
+    bool extends = save_key_data_.size() != byte_width
+                       ? kDefaultExtends
+                       : 0 == memcmp(save_key_data_.data(), data, byte_width);
+    save_key_data_.resize(byte_width);
+    memcpy(save_key_data_.data(), data, byte_width);
+    return extends;
+  }
+
+  Result<GroupingSegment> GetNextSegment(const Scalar& scalar, int64_t offset,
+                                         int64_t length) {
+    ARROW_RETURN_NOT_OK(CheckType(*scalar.type));
+    if (!scalar.is_valid) {
+      return Status::Invalid("segmenting an invalid scalar");
+    }
+    auto data = checked_cast<const PrimitiveScalarBase&>(scalar).data();
+    bool extends = length > 0 ? Extend(data) : kEmptyExtends;
+    return MakeSegment(length, 0, length, extends);
+  }
+
+  Result<GroupingSegment> GetNextSegment(const DataType& array_type,
+                                         const uint8_t* array_bytes, int64_t offset,
+                                         int64_t length) {
+    RETURN_NOT_OK(CheckType(array_type));
+    int64_t byte_width = array_type.byte_width();
+    int64_t match_length = GetMatchLength(array_bytes + offset * byte_width, byte_width,
+                                          array_bytes, offset, length);
+    bool extends = length > 0 ? Extend(array_bytes + offset * byte_width) : kEmptyExtends;
+    return MakeSegment(length, offset, match_length, extends);
+  }
+
+  Result<GroupingSegment> GetNextSegment(const ExecSpan& batch, int64_t offset) override {
+    ARROW_RETURN_NOT_OK(CheckForGetNextSegment(batch, offset, {key_type_}));
+    if (offset == batch.length) {
+      return MakeSegment(batch.length, offset, 0, kEmptyExtends);
+    }
+    const auto& value = batch.values[0];
+    if (value.is_scalar()) {
+      return GetNextSegment(*value.scalar, offset, batch.length);
+    }
+    ARROW_DCHECK(value.is_array());
+    const auto& array = value.array;
+    if (array.GetNullCount() > 0) {
+      return Status::NotImplemented("segmenting a nullable array");
+    }
+    return GetNextSegment(*array.type, GetValuesAsBytes(array), offset, batch.length);
+  }
+
+  Result<GroupingSegment> GetNextSegment(const ExecBatch& batch,
+                                         int64_t offset) override {
+    ARROW_RETURN_NOT_OK(CheckForGetNextSegment(batch, offset, {key_type_}));
+    if (offset == batch.length) {
+      return MakeSegment(batch.length, offset, 0, kEmptyExtends);
+    }
+    const auto& value = batch.values[0];
+    if (value.is_scalar()) {
+      return GetNextSegment(*value.scalar(), offset, batch.length);
+    }
+    if (value.is_array()) {
+      auto array = value.array();
+      if (array->GetNullCount() > 0) {
+        return Status::NotImplemented("segmenting a nullable array");
+      }
+      return GetNextSegment(*array->type, GetValuesAsBytes(*array), offset, batch.length);
+    }
+    if (value.is_chunked_array()) {
+      auto array = value.chunked_array();
+      if (array->null_count() > 0) {
+        return Status::NotImplemented("segmenting a nullable array");
+      }
+      return GetNextSegmentChunked(array, offset, bound_extend_);
+    }
+    return Status::Invalid("segmenting unsupported value kind ", value.kind());
+  }
+
+ private:
+  TypeHolder key_type_;
+  std::vector<uint8_t> save_key_data_;
+  ExtendFunc bound_extend_ = [this](const void* data) { return Extend(data); };
+};
+
+struct AnyKeysGroupingSegmenter : public BaseGroupingSegmenter {
+  static Result<std::unique_ptr<GroupingSegmenter>> Make(
+      const std::vector<TypeHolder>& key_types, ExecContext* ctx) {
+    ARROW_RETURN_NOT_OK(Grouper::Make(key_types, ctx));  // check types
+    return std::make_unique<AnyKeysGroupingSegmenter>(key_types, ctx);
+  }
+
+  AnyKeysGroupingSegmenter(const std::vector<TypeHolder>& key_types, ExecContext* ctx)
+      : BaseGroupingSegmenter(key_types),
+        ctx_(ctx),
+        grouper_(nullptr),
+        save_group_id_(kNoGroupId) {}
+
+  Status Reset() override {
+    grouper_ = nullptr;
+    save_group_id_ = kNoGroupId;
+    return Status::OK();
+  }
+
+  bool Extend(const void* data) {
+    auto group_id = *static_cast<const group_id_t*>(data);
+    bool extends =
+        save_group_id_ == kNoGroupId ? kDefaultExtends : save_group_id_ == group_id;
+    save_group_id_ = group_id;
+    return extends;
+  }
+
+  template <typename Batch>

Review Comment:
   ```suggestion
     // Runs the grouper on a single row.  This is used to determine
     // the group id of the first row of a new segment to see if it extends
     // the previous segment.
     template <typename Batch>
   ```



##########
cpp/src/arrow/compute/row/grouper.cc:
##########
@@ -39,12 +43,521 @@
 namespace arrow {
 
 using internal::checked_cast;
+using internal::PrimitiveScalarBase;
 
 namespace compute {
 
 namespace {
 
-struct GrouperImpl : Grouper {
+constexpr uint32_t kNoGroupId = std::numeric_limits<uint32_t>::max();
+
+using group_id_t = std::remove_const<decltype(kNoGroupId)>::type;
+using GroupIdType = CTypeTraits<group_id_t>::ArrowType;
+auto group_id_type = std::make_shared<GroupIdType>();
+
+inline const uint8_t* GetValuesAsBytes(const ArrayData& data, int64_t offset = 0) {
+  DCHECK_GT(data.type->byte_width(), 0);
+  int64_t absolute_byte_offset = (data.offset + offset) * data.type->byte_width();
+  return data.GetValues<uint8_t>(1, absolute_byte_offset);
+}
+
+inline const uint8_t* GetValuesAsBytes(const ArraySpan& data, int64_t offset = 0) {
+  DCHECK_GT(data.type->byte_width(), 0);
+  int64_t absolute_byte_offset = (data.offset + offset) * data.type->byte_width();
+  return data.GetValues<uint8_t>(1, absolute_byte_offset);
+}
+
+template <typename Value>
+Status CheckForGetNextSegment(const std::vector<Value>& values, int64_t length,
+                              int64_t offset, const std::vector<TypeHolder>& key_types) {
+  if (offset < 0 || offset > length) {
+    return Status::Invalid("invalid grouping segmenter offset: ", offset);
+  }
+  if (values.size() != key_types.size()) {
+    return Status::Invalid("expected batch size ", key_types.size(), " but got ",
+                           values.size());
+  }
+  for (size_t i = 0; i < key_types.size(); i++) {
+    const auto& value = values[i];
+    const auto& key_type = key_types[i];
+    if (*value.type() != *key_type.type) {
+      return Status::Invalid("expected batch value ", i, " of type ", *key_type.type,
+                             " but got ", *value.type());
+    }
+  }
+  return Status::OK();
+}
+
+template <typename Batch>
+enable_if_t<std::is_same<Batch, ExecSpan>::value || std::is_same<Batch, ExecBatch>::value,
+            Status>
+CheckForGetNextSegment(const Batch& batch, int64_t offset,
+                       const std::vector<TypeHolder>& key_types) {
+  return CheckForGetNextSegment(batch.values, batch.length, offset, key_types);
+}
+
+struct BaseGroupingSegmenter : public GroupingSegmenter {
+  explicit BaseGroupingSegmenter(const std::vector<TypeHolder>& key_types)
+      : key_types_(key_types) {}
+
+  const std::vector<TypeHolder>& key_types() const override { return key_types_; }
+
+  std::vector<TypeHolder> key_types_;
+};
+
+GroupingSegment MakeSegment(int64_t batch_length, int64_t offset, int64_t length,
+                            bool extends) {
+  return GroupingSegment{offset, length, offset + length >= batch_length, extends};
+}
+
+int64_t GetMatchLength(const uint8_t* match_bytes, int64_t match_width,
+                       const uint8_t* array_bytes, int64_t offset, int64_t length) {
+  int64_t cursor, byte_cursor;
+  for (cursor = offset, byte_cursor = match_width * cursor; cursor < length;
+       cursor++, byte_cursor += match_width) {
+    if (memcmp(match_bytes, array_bytes + byte_cursor,
+               static_cast<size_t>(match_width)) != 0) {
+      break;
+    }
+  }
+  return std::min(cursor, length - offset);
+}
+
+using ExtendFunc = std::function<bool(const void*)>;
+constexpr bool kDefaultExtends = true;
+constexpr bool kEmptyExtends = true;
+
+Result<GroupingSegment> GetNextSegmentChunked(
+    const std::shared_ptr<ChunkedArray>& chunked_array, int64_t offset,
+    ExtendFunc extend) {
+  if (offset >= chunked_array->length()) {
+    return MakeSegment(chunked_array->length(), chunked_array->length(), 0,
+                       kEmptyExtends);
+  }
+  int64_t remaining_offset = offset;
+  const auto& arrays = chunked_array->chunks();
+  for (size_t i = 0; remaining_offset >= 0 && i < arrays.size(); i++) {
+    // look up chunk containing offset
+    int64_t array_length = arrays[i]->length();
+    if (remaining_offset < array_length) {
+      // found - switch to matching
+      int64_t match_width = arrays[i]->type()->byte_width();
+      const uint8_t* match_bytes = GetValuesAsBytes(*arrays[i]->data(), remaining_offset);
+      int64_t total_match_length = 0;
+      for (; i < arrays.size(); i++) {
+        int64_t array_length = arrays[i]->length();
+        if (array_length <= 0) continue;
+        const uint8_t* array_bytes = GetValuesAsBytes(*arrays[i]->data());
+        int64_t match_length = GetMatchLength(match_bytes, match_width, array_bytes,
+                                              remaining_offset, array_length);
+        total_match_length += match_length;
+        remaining_offset = 0;
+        if (match_length < array_length - remaining_offset) break;
+      }
+      bool extends = extend(match_bytes);
+      return MakeSegment(chunked_array->length(), offset, total_match_length, extends);
+    }
+    remaining_offset -= array_length;
+  }
+  return Status::Invalid("segmenting invalid chunked array value");
+}
+
+struct NoKeysGroupingSegmenter : public BaseGroupingSegmenter {
+  static std::unique_ptr<GroupingSegmenter> Make() {
+    return std::make_unique<NoKeysGroupingSegmenter>();
+  }
+
+  NoKeysGroupingSegmenter() : BaseGroupingSegmenter({}) {}
+
+  Status Reset() override { return Status::OK(); }
+
+  template <typename Batch>
+  Result<GroupingSegment> GetNextSegmentImpl(const Batch& batch, int64_t offset) {
+    ARROW_RETURN_NOT_OK(CheckForGetNextSegment(batch, offset, {}));
+    return MakeSegment(batch.length, offset, batch.length - offset, kDefaultExtends);
+  }
+
+  Result<GroupingSegment> GetNextSegment(const ExecSpan& batch, int64_t offset) override {
+    return GetNextSegmentImpl(batch, offset);
+  }
+
+  Result<GroupingSegment> GetNextSegment(const ExecBatch& batch,
+                                         int64_t offset) override {
+    return GetNextSegmentImpl(batch, offset);
+  }
+};
+
+struct SimpleKeyGroupingSegmenter : public BaseGroupingSegmenter {
+  static Result<std::unique_ptr<GroupingSegmenter>> Make(TypeHolder key_type) {
+    return std::make_unique<SimpleKeyGroupingSegmenter>(key_type);
+  }
+
+  explicit SimpleKeyGroupingSegmenter(TypeHolder key_type)
+      : BaseGroupingSegmenter({key_type}), key_type_(key_types_[0]), save_key_data_() {}
+
+  Status CheckType(const DataType& type) {
+    if (!is_fixed_width(type)) {
+      return Status::Invalid("SimpleKeyGroupingSegmenter does not support type ", type);
+    }
+    return Status::OK();
+  }
+
+  Status Reset() override {
+    save_key_data_.resize(0);
+    return Status::OK();
+  }
+
+  bool Extend(const void* data) {
+    size_t byte_width = static_cast<size_t>(key_type_.type->byte_width());
+    bool extends = save_key_data_.size() != byte_width
+                       ? kDefaultExtends
+                       : 0 == memcmp(save_key_data_.data(), data, byte_width);
+    save_key_data_.resize(byte_width);
+    memcpy(save_key_data_.data(), data, byte_width);
+    return extends;
+  }
+
+  Result<GroupingSegment> GetNextSegment(const Scalar& scalar, int64_t offset,
+                                         int64_t length) {
+    ARROW_RETURN_NOT_OK(CheckType(*scalar.type));
+    if (!scalar.is_valid) {
+      return Status::Invalid("segmenting an invalid scalar");
+    }
+    auto data = checked_cast<const PrimitiveScalarBase&>(scalar).data();
+    bool extends = length > 0 ? Extend(data) : kEmptyExtends;
+    return MakeSegment(length, 0, length, extends);
+  }
+
+  Result<GroupingSegment> GetNextSegment(const DataType& array_type,
+                                         const uint8_t* array_bytes, int64_t offset,
+                                         int64_t length) {
+    RETURN_NOT_OK(CheckType(array_type));
+    int64_t byte_width = array_type.byte_width();
+    int64_t match_length = GetMatchLength(array_bytes + offset * byte_width, byte_width,
+                                          array_bytes, offset, length);
+    bool extends = length > 0 ? Extend(array_bytes + offset * byte_width) : kEmptyExtends;
+    return MakeSegment(length, offset, match_length, extends);
+  }
+
+  Result<GroupingSegment> GetNextSegment(const ExecSpan& batch, int64_t offset) override {
+    ARROW_RETURN_NOT_OK(CheckForGetNextSegment(batch, offset, {key_type_}));
+    if (offset == batch.length) {
+      return MakeSegment(batch.length, offset, 0, kEmptyExtends);
+    }
+    const auto& value = batch.values[0];
+    if (value.is_scalar()) {
+      return GetNextSegment(*value.scalar, offset, batch.length);
+    }
+    ARROW_DCHECK(value.is_array());
+    const auto& array = value.array;
+    if (array.GetNullCount() > 0) {
+      return Status::NotImplemented("segmenting a nullable array");
+    }
+    return GetNextSegment(*array.type, GetValuesAsBytes(array), offset, batch.length);
+  }
+
+  Result<GroupingSegment> GetNextSegment(const ExecBatch& batch,
+                                         int64_t offset) override {
+    ARROW_RETURN_NOT_OK(CheckForGetNextSegment(batch, offset, {key_type_}));
+    if (offset == batch.length) {
+      return MakeSegment(batch.length, offset, 0, kEmptyExtends);
+    }
+    const auto& value = batch.values[0];
+    if (value.is_scalar()) {
+      return GetNextSegment(*value.scalar(), offset, batch.length);
+    }
+    if (value.is_array()) {
+      auto array = value.array();
+      if (array->GetNullCount() > 0) {
+        return Status::NotImplemented("segmenting a nullable array");
+      }
+      return GetNextSegment(*array->type, GetValuesAsBytes(*array), offset, batch.length);
+    }
+    if (value.is_chunked_array()) {
+      auto array = value.chunked_array();
+      if (array->null_count() > 0) {
+        return Status::NotImplemented("segmenting a nullable array");
+      }
+      return GetNextSegmentChunked(array, offset, bound_extend_);
+    }
+    return Status::Invalid("segmenting unsupported value kind ", value.kind());
+  }
+
+ private:
+  TypeHolder key_type_;
+  std::vector<uint8_t> save_key_data_;
+  ExtendFunc bound_extend_ = [this](const void* data) { return Extend(data); };
+};
+
+struct AnyKeysGroupingSegmenter : public BaseGroupingSegmenter {
+  static Result<std::unique_ptr<GroupingSegmenter>> Make(
+      const std::vector<TypeHolder>& key_types, ExecContext* ctx) {
+    ARROW_RETURN_NOT_OK(Grouper::Make(key_types, ctx));  // check types
+    return std::make_unique<AnyKeysGroupingSegmenter>(key_types, ctx);
+  }
+
+  AnyKeysGroupingSegmenter(const std::vector<TypeHolder>& key_types, ExecContext* ctx)
+      : BaseGroupingSegmenter(key_types),
+        ctx_(ctx),
+        grouper_(nullptr),
+        save_group_id_(kNoGroupId) {}
+
+  Status Reset() override {
+    grouper_ = nullptr;
+    save_group_id_ = kNoGroupId;
+    return Status::OK();
+  }
+
+  bool Extend(const void* data) {
+    auto group_id = *static_cast<const group_id_t*>(data);
+    bool extends =
+        save_group_id_ == kNoGroupId ? kDefaultExtends : save_group_id_ == group_id;
+    save_group_id_ = group_id;
+    return extends;
+  }
+
+  template <typename Batch>
+  Result<group_id_t> MapGroupIdAt(const Batch& batch, int64_t offset) {
+    if (offset < 0 || offset >= batch.length) {
+      return Status::Invalid("requesting group id out of bounds");
+    }
+    if (!grouper_) return kNoGroupId;
+    ARROW_ASSIGN_OR_RAISE(auto datum, grouper_->Consume(batch, offset,
+                                                        /*consume_length=*/1));
+    if (!(datum.is_array() || datum.is_chunked_array())) {

Review Comment:
   Can datum be a chunked array here?



##########
cpp/src/arrow/compute/row/grouper.cc:
##########
@@ -39,12 +43,521 @@
 namespace arrow {
 
 using internal::checked_cast;
+using internal::PrimitiveScalarBase;
 
 namespace compute {
 
 namespace {
 
-struct GrouperImpl : Grouper {
+constexpr uint32_t kNoGroupId = std::numeric_limits<uint32_t>::max();
+
+using group_id_t = std::remove_const<decltype(kNoGroupId)>::type;
+using GroupIdType = CTypeTraits<group_id_t>::ArrowType;
+auto group_id_type = std::make_shared<GroupIdType>();
+
+inline const uint8_t* GetValuesAsBytes(const ArrayData& data, int64_t offset = 0) {
+  DCHECK_GT(data.type->byte_width(), 0);
+  int64_t absolute_byte_offset = (data.offset + offset) * data.type->byte_width();
+  return data.GetValues<uint8_t>(1, absolute_byte_offset);
+}
+
+inline const uint8_t* GetValuesAsBytes(const ArraySpan& data, int64_t offset = 0) {
+  DCHECK_GT(data.type->byte_width(), 0);
+  int64_t absolute_byte_offset = (data.offset + offset) * data.type->byte_width();
+  return data.GetValues<uint8_t>(1, absolute_byte_offset);
+}
+
+template <typename Value>
+Status CheckForGetNextSegment(const std::vector<Value>& values, int64_t length,
+                              int64_t offset, const std::vector<TypeHolder>& key_types) {
+  if (offset < 0 || offset > length) {
+    return Status::Invalid("invalid grouping segmenter offset: ", offset);
+  }
+  if (values.size() != key_types.size()) {
+    return Status::Invalid("expected batch size ", key_types.size(), " but got ",
+                           values.size());
+  }
+  for (size_t i = 0; i < key_types.size(); i++) {
+    const auto& value = values[i];
+    const auto& key_type = key_types[i];
+    if (*value.type() != *key_type.type) {
+      return Status::Invalid("expected batch value ", i, " of type ", *key_type.type,
+                             " but got ", *value.type());
+    }
+  }
+  return Status::OK();
+}
+
+template <typename Batch>
+enable_if_t<std::is_same<Batch, ExecSpan>::value || std::is_same<Batch, ExecBatch>::value,
+            Status>
+CheckForGetNextSegment(const Batch& batch, int64_t offset,
+                       const std::vector<TypeHolder>& key_types) {
+  return CheckForGetNextSegment(batch.values, batch.length, offset, key_types);
+}
+
+struct BaseGroupingSegmenter : public GroupingSegmenter {
+  explicit BaseGroupingSegmenter(const std::vector<TypeHolder>& key_types)
+      : key_types_(key_types) {}
+
+  const std::vector<TypeHolder>& key_types() const override { return key_types_; }
+
+  std::vector<TypeHolder> key_types_;
+};
+
+GroupingSegment MakeSegment(int64_t batch_length, int64_t offset, int64_t length,
+                            bool extends) {
+  return GroupingSegment{offset, length, offset + length >= batch_length, extends};
+}
+
+int64_t GetMatchLength(const uint8_t* match_bytes, int64_t match_width,
+                       const uint8_t* array_bytes, int64_t offset, int64_t length) {
+  int64_t cursor, byte_cursor;
+  for (cursor = offset, byte_cursor = match_width * cursor; cursor < length;
+       cursor++, byte_cursor += match_width) {
+    if (memcmp(match_bytes, array_bytes + byte_cursor,
+               static_cast<size_t>(match_width)) != 0) {
+      break;
+    }
+  }
+  return std::min(cursor, length - offset);
+}
+
+using ExtendFunc = std::function<bool(const void*)>;
+constexpr bool kDefaultExtends = true;
+constexpr bool kEmptyExtends = true;
+
+Result<GroupingSegment> GetNextSegmentChunked(
+    const std::shared_ptr<ChunkedArray>& chunked_array, int64_t offset,
+    ExtendFunc extend) {
+  if (offset >= chunked_array->length()) {
+    return MakeSegment(chunked_array->length(), chunked_array->length(), 0,
+                       kEmptyExtends);
+  }
+  int64_t remaining_offset = offset;
+  const auto& arrays = chunked_array->chunks();
+  for (size_t i = 0; remaining_offset >= 0 && i < arrays.size(); i++) {
+    // look up chunk containing offset
+    int64_t array_length = arrays[i]->length();
+    if (remaining_offset < array_length) {
+      // found - switch to matching
+      int64_t match_width = arrays[i]->type()->byte_width();
+      const uint8_t* match_bytes = GetValuesAsBytes(*arrays[i]->data(), remaining_offset);
+      int64_t total_match_length = 0;
+      for (; i < arrays.size(); i++) {
+        int64_t array_length = arrays[i]->length();
+        if (array_length <= 0) continue;
+        const uint8_t* array_bytes = GetValuesAsBytes(*arrays[i]->data());
+        int64_t match_length = GetMatchLength(match_bytes, match_width, array_bytes,
+                                              remaining_offset, array_length);
+        total_match_length += match_length;
+        remaining_offset = 0;
+        if (match_length < array_length - remaining_offset) break;
+      }
+      bool extends = extend(match_bytes);
+      return MakeSegment(chunked_array->length(), offset, total_match_length, extends);
+    }
+    remaining_offset -= array_length;
+  }
+  return Status::Invalid("segmenting invalid chunked array value");
+}
+
+struct NoKeysGroupingSegmenter : public BaseGroupingSegmenter {
+  static std::unique_ptr<GroupingSegmenter> Make() {
+    return std::make_unique<NoKeysGroupingSegmenter>();
+  }
+
+  NoKeysGroupingSegmenter() : BaseGroupingSegmenter({}) {}
+
+  Status Reset() override { return Status::OK(); }
+
+  template <typename Batch>
+  Result<GroupingSegment> GetNextSegmentImpl(const Batch& batch, int64_t offset) {
+    ARROW_RETURN_NOT_OK(CheckForGetNextSegment(batch, offset, {}));
+    return MakeSegment(batch.length, offset, batch.length - offset, kDefaultExtends);
+  }
+
+  Result<GroupingSegment> GetNextSegment(const ExecSpan& batch, int64_t offset) override {
+    return GetNextSegmentImpl(batch, offset);
+  }
+
+  Result<GroupingSegment> GetNextSegment(const ExecBatch& batch,
+                                         int64_t offset) override {
+    return GetNextSegmentImpl(batch, offset);
+  }
+};
+
+struct SimpleKeyGroupingSegmenter : public BaseGroupingSegmenter {
+  static Result<std::unique_ptr<GroupingSegmenter>> Make(TypeHolder key_type) {
+    return std::make_unique<SimpleKeyGroupingSegmenter>(key_type);
+  }
+
+  explicit SimpleKeyGroupingSegmenter(TypeHolder key_type)
+      : BaseGroupingSegmenter({key_type}), key_type_(key_types_[0]), save_key_data_() {}
+
+  Status CheckType(const DataType& type) {
+    if (!is_fixed_width(type)) {
+      return Status::Invalid("SimpleKeyGroupingSegmenter does not support type ", type);
+    }
+    return Status::OK();
+  }
+
+  Status Reset() override {
+    save_key_data_.resize(0);
+    return Status::OK();
+  }
+
+  bool Extend(const void* data) {
+    size_t byte_width = static_cast<size_t>(key_type_.type->byte_width());
+    bool extends = save_key_data_.size() != byte_width
+                       ? kDefaultExtends
+                       : 0 == memcmp(save_key_data_.data(), data, byte_width);
+    save_key_data_.resize(byte_width);
+    memcpy(save_key_data_.data(), data, byte_width);
+    return extends;
+  }
+
+  Result<GroupingSegment> GetNextSegment(const Scalar& scalar, int64_t offset,
+                                         int64_t length) {
+    ARROW_RETURN_NOT_OK(CheckType(*scalar.type));
+    if (!scalar.is_valid) {
+      return Status::Invalid("segmenting an invalid scalar");
+    }
+    auto data = checked_cast<const PrimitiveScalarBase&>(scalar).data();
+    bool extends = length > 0 ? Extend(data) : kEmptyExtends;
+    return MakeSegment(length, 0, length, extends);
+  }
+
+  Result<GroupingSegment> GetNextSegment(const DataType& array_type,
+                                         const uint8_t* array_bytes, int64_t offset,
+                                         int64_t length) {
+    RETURN_NOT_OK(CheckType(array_type));
+    int64_t byte_width = array_type.byte_width();
+    int64_t match_length = GetMatchLength(array_bytes + offset * byte_width, byte_width,
+                                          array_bytes, offset, length);
+    bool extends = length > 0 ? Extend(array_bytes + offset * byte_width) : kEmptyExtends;
+    return MakeSegment(length, offset, match_length, extends);
+  }
+
+  Result<GroupingSegment> GetNextSegment(const ExecSpan& batch, int64_t offset) override {
+    ARROW_RETURN_NOT_OK(CheckForGetNextSegment(batch, offset, {key_type_}));
+    if (offset == batch.length) {
+      return MakeSegment(batch.length, offset, 0, kEmptyExtends);
+    }
+    const auto& value = batch.values[0];
+    if (value.is_scalar()) {
+      return GetNextSegment(*value.scalar, offset, batch.length);
+    }
+    ARROW_DCHECK(value.is_array());
+    const auto& array = value.array;
+    if (array.GetNullCount() > 0) {
+      return Status::NotImplemented("segmenting a nullable array");
+    }
+    return GetNextSegment(*array.type, GetValuesAsBytes(array), offset, batch.length);
+  }
+
+  Result<GroupingSegment> GetNextSegment(const ExecBatch& batch,
+                                         int64_t offset) override {
+    ARROW_RETURN_NOT_OK(CheckForGetNextSegment(batch, offset, {key_type_}));
+    if (offset == batch.length) {
+      return MakeSegment(batch.length, offset, 0, kEmptyExtends);
+    }
+    const auto& value = batch.values[0];
+    if (value.is_scalar()) {
+      return GetNextSegment(*value.scalar(), offset, batch.length);
+    }
+    if (value.is_array()) {
+      auto array = value.array();
+      if (array->GetNullCount() > 0) {
+        return Status::NotImplemented("segmenting a nullable array");
+      }
+      return GetNextSegment(*array->type, GetValuesAsBytes(*array), offset, batch.length);
+    }
+    if (value.is_chunked_array()) {
+      auto array = value.chunked_array();
+      if (array->null_count() > 0) {
+        return Status::NotImplemented("segmenting a nullable array");
+      }
+      return GetNextSegmentChunked(array, offset, bound_extend_);
+    }
+    return Status::Invalid("segmenting unsupported value kind ", value.kind());
+  }
+
+ private:
+  TypeHolder key_type_;
+  std::vector<uint8_t> save_key_data_;
+  ExtendFunc bound_extend_ = [this](const void* data) { return Extend(data); };
+};
+
+struct AnyKeysGroupingSegmenter : public BaseGroupingSegmenter {
+  static Result<std::unique_ptr<GroupingSegmenter>> Make(
+      const std::vector<TypeHolder>& key_types, ExecContext* ctx) {
+    ARROW_RETURN_NOT_OK(Grouper::Make(key_types, ctx));  // check types
+    return std::make_unique<AnyKeysGroupingSegmenter>(key_types, ctx);
+  }
+
+  AnyKeysGroupingSegmenter(const std::vector<TypeHolder>& key_types, ExecContext* ctx)
+      : BaseGroupingSegmenter(key_types),
+        ctx_(ctx),
+        grouper_(nullptr),
+        save_group_id_(kNoGroupId) {}
+
+  Status Reset() override {
+    grouper_ = nullptr;
+    save_group_id_ = kNoGroupId;
+    return Status::OK();
+  }
+
+  bool Extend(const void* data) {
+    auto group_id = *static_cast<const group_id_t*>(data);
+    bool extends =
+        save_group_id_ == kNoGroupId ? kDefaultExtends : save_group_id_ == group_id;
+    save_group_id_ = group_id;
+    return extends;
+  }
+
+  template <typename Batch>
+  Result<group_id_t> MapGroupIdAt(const Batch& batch, int64_t offset) {
+    if (offset < 0 || offset >= batch.length) {
+      return Status::Invalid("requesting group id out of bounds");
+    }
+    if (!grouper_) return kNoGroupId;
+    ARROW_ASSIGN_OR_RAISE(auto datum, grouper_->Consume(batch, offset,
+                                                        /*consume_length=*/1));
+    if (!(datum.is_array() || datum.is_chunked_array())) {
+      return Status::Invalid("accessing unsupported datum kind ", datum.kind());
+    }
+    const std::shared_ptr<ArrayData>& data =
+        datum.is_array() ? datum.array() : datum.chunked_array()->chunk(0)->data();
+    ARROW_DCHECK(data->GetNullCount() == 0);
+    DCHECK_EQ(data->type->id(), GroupIdType::type_id);
+    DCHECK_EQ(1, data->length);
+    const group_id_t* values = data->GetValues<group_id_t>(1);
+    return values[0];
+  }
+
+  template <typename Batch>
+  Result<GroupingSegment> GetNextSegmentImpl(const Batch& batch, int64_t offset) {
+    ARROW_RETURN_NOT_OK(CheckForGetNextSegment(batch, offset, key_types_));
+    if (offset == batch.length) {
+      return MakeSegment(batch.length, offset, 0, kEmptyExtends);
+    }
+    // ARROW-18311: make Grouper support Reset()
+    // so it can be cached instead of recreated below
+    //
+    // the group id must be computed prior to resetting the grouper, since it is compared
+    // to save_group_id_, and after resetting the grouper produces incomparable group ids
+    ARROW_ASSIGN_OR_RAISE(auto group_id, MapGroupIdAt(batch, offset));
+    ExtendFunc bound_extend = [this, group_id](const void* data) {
+      bool extends = Extend(&group_id);
+      save_group_id_ = *static_cast<const group_id_t*>(data);
+      return extends;
+    };
+    ARROW_ASSIGN_OR_RAISE(grouper_, Grouper::Make(key_types_, ctx_));  // TODO: reset it
+    ARROW_ASSIGN_OR_RAISE(auto datum, grouper_->Consume(batch, offset));
+    if (datum.is_array()) {
+      const std::shared_ptr<ArrayData>& data = datum.array();
+      ARROW_DCHECK(data->GetNullCount() == 0);
+      DCHECK_EQ(data->type->id(), GroupIdType::type_id);
+      const group_id_t* values = data->GetValues<group_id_t>(1);
+      int64_t cursor;
+      for (cursor = 1; cursor < data->length; cursor++) {
+        if (values[0] != values[cursor]) break;
+      }
+      int64_t length = std::min(cursor, batch.length - offset);
+      bool extends = length > 0 ? bound_extend(values) : kEmptyExtends;
+      return MakeSegment(batch.length, offset, length, extends);
+    } else if (datum.is_chunked_array()) {
+      ARROW_ASSIGN_OR_RAISE(
+          auto segment, GetNextSegmentChunked(datum.chunked_array(), 0, bound_extend));
+      segment.offset += offset;
+      return segment;

Review Comment:
   I think we can remove this case



##########
cpp/src/arrow/compute/row/grouper.cc:
##########
@@ -39,12 +43,521 @@
 namespace arrow {
 
 using internal::checked_cast;
+using internal::PrimitiveScalarBase;
 
 namespace compute {
 
 namespace {
 
-struct GrouperImpl : Grouper {
+constexpr uint32_t kNoGroupId = std::numeric_limits<uint32_t>::max();
+
+using group_id_t = std::remove_const<decltype(kNoGroupId)>::type;
+using GroupIdType = CTypeTraits<group_id_t>::ArrowType;
+auto group_id_type = std::make_shared<GroupIdType>();
+
+inline const uint8_t* GetValuesAsBytes(const ArrayData& data, int64_t offset = 0) {
+  DCHECK_GT(data.type->byte_width(), 0);
+  int64_t absolute_byte_offset = (data.offset + offset) * data.type->byte_width();
+  return data.GetValues<uint8_t>(1, absolute_byte_offset);
+}
+
+inline const uint8_t* GetValuesAsBytes(const ArraySpan& data, int64_t offset = 0) {
+  DCHECK_GT(data.type->byte_width(), 0);
+  int64_t absolute_byte_offset = (data.offset + offset) * data.type->byte_width();
+  return data.GetValues<uint8_t>(1, absolute_byte_offset);
+}
+
+template <typename Value>
+Status CheckForGetNextSegment(const std::vector<Value>& values, int64_t length,
+                              int64_t offset, const std::vector<TypeHolder>& key_types) {
+  if (offset < 0 || offset > length) {
+    return Status::Invalid("invalid grouping segmenter offset: ", offset);
+  }
+  if (values.size() != key_types.size()) {
+    return Status::Invalid("expected batch size ", key_types.size(), " but got ",
+                           values.size());
+  }
+  for (size_t i = 0; i < key_types.size(); i++) {
+    const auto& value = values[i];
+    const auto& key_type = key_types[i];
+    if (*value.type() != *key_type.type) {
+      return Status::Invalid("expected batch value ", i, " of type ", *key_type.type,
+                             " but got ", *value.type());
+    }
+  }
+  return Status::OK();
+}
+
+template <typename Batch>
+enable_if_t<std::is_same<Batch, ExecSpan>::value || std::is_same<Batch, ExecBatch>::value,
+            Status>
+CheckForGetNextSegment(const Batch& batch, int64_t offset,
+                       const std::vector<TypeHolder>& key_types) {
+  return CheckForGetNextSegment(batch.values, batch.length, offset, key_types);
+}
+
+struct BaseGroupingSegmenter : public GroupingSegmenter {
+  explicit BaseGroupingSegmenter(const std::vector<TypeHolder>& key_types)
+      : key_types_(key_types) {}
+
+  const std::vector<TypeHolder>& key_types() const override { return key_types_; }
+
+  std::vector<TypeHolder> key_types_;
+};
+
+GroupingSegment MakeSegment(int64_t batch_length, int64_t offset, int64_t length,
+                            bool extends) {
+  return GroupingSegment{offset, length, offset + length >= batch_length, extends};
+}
+
+int64_t GetMatchLength(const uint8_t* match_bytes, int64_t match_width,
+                       const uint8_t* array_bytes, int64_t offset, int64_t length) {
+  int64_t cursor, byte_cursor;
+  for (cursor = offset, byte_cursor = match_width * cursor; cursor < length;
+       cursor++, byte_cursor += match_width) {
+    if (memcmp(match_bytes, array_bytes + byte_cursor,
+               static_cast<size_t>(match_width)) != 0) {
+      break;
+    }
+  }
+  return std::min(cursor, length - offset);
+}
+
+using ExtendFunc = std::function<bool(const void*)>;
+constexpr bool kDefaultExtends = true;
+constexpr bool kEmptyExtends = true;
+
+Result<GroupingSegment> GetNextSegmentChunked(
+    const std::shared_ptr<ChunkedArray>& chunked_array, int64_t offset,
+    ExtendFunc extend) {
+  if (offset >= chunked_array->length()) {
+    return MakeSegment(chunked_array->length(), chunked_array->length(), 0,
+                       kEmptyExtends);
+  }
+  int64_t remaining_offset = offset;
+  const auto& arrays = chunked_array->chunks();
+  for (size_t i = 0; remaining_offset >= 0 && i < arrays.size(); i++) {
+    // look up chunk containing offset
+    int64_t array_length = arrays[i]->length();
+    if (remaining_offset < array_length) {
+      // found - switch to matching
+      int64_t match_width = arrays[i]->type()->byte_width();
+      const uint8_t* match_bytes = GetValuesAsBytes(*arrays[i]->data(), remaining_offset);
+      int64_t total_match_length = 0;
+      for (; i < arrays.size(); i++) {
+        int64_t array_length = arrays[i]->length();
+        if (array_length <= 0) continue;
+        const uint8_t* array_bytes = GetValuesAsBytes(*arrays[i]->data());
+        int64_t match_length = GetMatchLength(match_bytes, match_width, array_bytes,
+                                              remaining_offset, array_length);
+        total_match_length += match_length;
+        remaining_offset = 0;
+        if (match_length < array_length - remaining_offset) break;
+      }
+      bool extends = extend(match_bytes);
+      return MakeSegment(chunked_array->length(), offset, total_match_length, extends);
+    }
+    remaining_offset -= array_length;
+  }
+  return Status::Invalid("segmenting invalid chunked array value");
+}
+
+struct NoKeysGroupingSegmenter : public BaseGroupingSegmenter {
+  static std::unique_ptr<GroupingSegmenter> Make() {
+    return std::make_unique<NoKeysGroupingSegmenter>();
+  }
+
+  NoKeysGroupingSegmenter() : BaseGroupingSegmenter({}) {}
+
+  Status Reset() override { return Status::OK(); }
+
+  template <typename Batch>
+  Result<GroupingSegment> GetNextSegmentImpl(const Batch& batch, int64_t offset) {
+    ARROW_RETURN_NOT_OK(CheckForGetNextSegment(batch, offset, {}));
+    return MakeSegment(batch.length, offset, batch.length - offset, kDefaultExtends);
+  }
+
+  Result<GroupingSegment> GetNextSegment(const ExecSpan& batch, int64_t offset) override {
+    return GetNextSegmentImpl(batch, offset);
+  }
+
+  Result<GroupingSegment> GetNextSegment(const ExecBatch& batch,
+                                         int64_t offset) override {
+    return GetNextSegmentImpl(batch, offset);
+  }
+};
+
+struct SimpleKeyGroupingSegmenter : public BaseGroupingSegmenter {
+  static Result<std::unique_ptr<GroupingSegmenter>> Make(TypeHolder key_type) {
+    return std::make_unique<SimpleKeyGroupingSegmenter>(key_type);
+  }
+
+  explicit SimpleKeyGroupingSegmenter(TypeHolder key_type)
+      : BaseGroupingSegmenter({key_type}), key_type_(key_types_[0]), save_key_data_() {}
+
+  Status CheckType(const DataType& type) {
+    if (!is_fixed_width(type)) {
+      return Status::Invalid("SimpleKeyGroupingSegmenter does not support type ", type);
+    }
+    return Status::OK();
+  }
+
+  Status Reset() override {
+    save_key_data_.resize(0);
+    return Status::OK();
+  }
+
+  bool Extend(const void* data) {
+    size_t byte_width = static_cast<size_t>(key_type_.type->byte_width());
+    bool extends = save_key_data_.size() != byte_width
+                       ? kDefaultExtends
+                       : 0 == memcmp(save_key_data_.data(), data, byte_width);
+    save_key_data_.resize(byte_width);
+    memcpy(save_key_data_.data(), data, byte_width);
+    return extends;
+  }
+
+  Result<GroupingSegment> GetNextSegment(const Scalar& scalar, int64_t offset,
+                                         int64_t length) {
+    ARROW_RETURN_NOT_OK(CheckType(*scalar.type));
+    if (!scalar.is_valid) {
+      return Status::Invalid("segmenting an invalid scalar");
+    }
+    auto data = checked_cast<const PrimitiveScalarBase&>(scalar).data();
+    bool extends = length > 0 ? Extend(data) : kEmptyExtends;
+    return MakeSegment(length, 0, length, extends);
+  }
+
+  Result<GroupingSegment> GetNextSegment(const DataType& array_type,
+                                         const uint8_t* array_bytes, int64_t offset,
+                                         int64_t length) {
+    RETURN_NOT_OK(CheckType(array_type));
+    int64_t byte_width = array_type.byte_width();
+    int64_t match_length = GetMatchLength(array_bytes + offset * byte_width, byte_width,
+                                          array_bytes, offset, length);
+    bool extends = length > 0 ? Extend(array_bytes + offset * byte_width) : kEmptyExtends;
+    return MakeSegment(length, offset, match_length, extends);
+  }
+
+  Result<GroupingSegment> GetNextSegment(const ExecSpan& batch, int64_t offset) override {
+    ARROW_RETURN_NOT_OK(CheckForGetNextSegment(batch, offset, {key_type_}));
+    if (offset == batch.length) {
+      return MakeSegment(batch.length, offset, 0, kEmptyExtends);
+    }
+    const auto& value = batch.values[0];
+    if (value.is_scalar()) {
+      return GetNextSegment(*value.scalar, offset, batch.length);
+    }
+    ARROW_DCHECK(value.is_array());
+    const auto& array = value.array;
+    if (array.GetNullCount() > 0) {
+      return Status::NotImplemented("segmenting a nullable array");
+    }
+    return GetNextSegment(*array.type, GetValuesAsBytes(array), offset, batch.length);
+  }
+
+  Result<GroupingSegment> GetNextSegment(const ExecBatch& batch,
+                                         int64_t offset) override {
+    ARROW_RETURN_NOT_OK(CheckForGetNextSegment(batch, offset, {key_type_}));
+    if (offset == batch.length) {
+      return MakeSegment(batch.length, offset, 0, kEmptyExtends);
+    }
+    const auto& value = batch.values[0];
+    if (value.is_scalar()) {
+      return GetNextSegment(*value.scalar(), offset, batch.length);
+    }
+    if (value.is_array()) {
+      auto array = value.array();
+      if (array->GetNullCount() > 0) {
+        return Status::NotImplemented("segmenting a nullable array");
+      }
+      return GetNextSegment(*array->type, GetValuesAsBytes(*array), offset, batch.length);
+    }
+    if (value.is_chunked_array()) {
+      auto array = value.chunked_array();
+      if (array->null_count() > 0) {
+        return Status::NotImplemented("segmenting a nullable array");
+      }
+      return GetNextSegmentChunked(array, offset, bound_extend_);
+    }
+    return Status::Invalid("segmenting unsupported value kind ", value.kind());
+  }
+
+ private:
+  TypeHolder key_type_;
+  std::vector<uint8_t> save_key_data_;
+  ExtendFunc bound_extend_ = [this](const void* data) { return Extend(data); };
+};
+
+struct AnyKeysGroupingSegmenter : public BaseGroupingSegmenter {
+  static Result<std::unique_ptr<GroupingSegmenter>> Make(
+      const std::vector<TypeHolder>& key_types, ExecContext* ctx) {
+    ARROW_RETURN_NOT_OK(Grouper::Make(key_types, ctx));  // check types
+    return std::make_unique<AnyKeysGroupingSegmenter>(key_types, ctx);
+  }
+
+  AnyKeysGroupingSegmenter(const std::vector<TypeHolder>& key_types, ExecContext* ctx)
+      : BaseGroupingSegmenter(key_types),
+        ctx_(ctx),
+        grouper_(nullptr),
+        save_group_id_(kNoGroupId) {}
+
+  Status Reset() override {
+    grouper_ = nullptr;
+    save_group_id_ = kNoGroupId;
+    return Status::OK();
+  }
+
+  bool Extend(const void* data) {
+    auto group_id = *static_cast<const group_id_t*>(data);
+    bool extends =
+        save_group_id_ == kNoGroupId ? kDefaultExtends : save_group_id_ == group_id;
+    save_group_id_ = group_id;
+    return extends;
+  }
+
+  template <typename Batch>
+  Result<group_id_t> MapGroupIdAt(const Batch& batch, int64_t offset) {
+    if (offset < 0 || offset >= batch.length) {
+      return Status::Invalid("requesting group id out of bounds");
+    }
+    if (!grouper_) return kNoGroupId;
+    ARROW_ASSIGN_OR_RAISE(auto datum, grouper_->Consume(batch, offset,
+                                                        /*consume_length=*/1));
+    if (!(datum.is_array() || datum.is_chunked_array())) {
+      return Status::Invalid("accessing unsupported datum kind ", datum.kind());
+    }
+    const std::shared_ptr<ArrayData>& data =
+        datum.is_array() ? datum.array() : datum.chunked_array()->chunk(0)->data();
+    ARROW_DCHECK(data->GetNullCount() == 0);
+    DCHECK_EQ(data->type->id(), GroupIdType::type_id);
+    DCHECK_EQ(1, data->length);
+    const group_id_t* values = data->GetValues<group_id_t>(1);
+    return values[0];
+  }
+
+  template <typename Batch>
+  Result<GroupingSegment> GetNextSegmentImpl(const Batch& batch, int64_t offset) {
+    ARROW_RETURN_NOT_OK(CheckForGetNextSegment(batch, offset, key_types_));
+    if (offset == batch.length) {
+      return MakeSegment(batch.length, offset, 0, kEmptyExtends);
+    }
+    // ARROW-18311: make Grouper support Reset()
+    // so it can be cached instead of recreated below
+    //
+    // the group id must be computed prior to resetting the grouper, since it is compared
+    // to save_group_id_, and after resetting the grouper produces incomparable group ids
+    ARROW_ASSIGN_OR_RAISE(auto group_id, MapGroupIdAt(batch, offset));
+    ExtendFunc bound_extend = [this, group_id](const void* data) {
+      bool extends = Extend(&group_id);
+      save_group_id_ = *static_cast<const group_id_t*>(data);
+      return extends;
+    };
+    ARROW_ASSIGN_OR_RAISE(grouper_, Grouper::Make(key_types_, ctx_));  // TODO: reset it
+    ARROW_ASSIGN_OR_RAISE(auto datum, grouper_->Consume(batch, offset));
+    if (datum.is_array()) {
+      const std::shared_ptr<ArrayData>& data = datum.array();
+      ARROW_DCHECK(data->GetNullCount() == 0);

Review Comment:
   Why?



##########
cpp/src/arrow/compute/exec/aggregate_node.cc:
##########
@@ -169,17 +206,63 @@ void AggregatesToString(std::stringstream* ss, const Schema& input_schema,
   *ss << ']';
 }
 
+template <typename BatchHandler>
+Status HandleSegments(std::unique_ptr<GroupingSegmenter>& segmenter,
+                      const ExecBatch& batch, const std::vector<int>& ids,
+                      const BatchHandler& handle_batch) {
+  int64_t offset = 0;
+  ARROW_ASSIGN_OR_RAISE(auto segment_batch, batch.SelectValues(ids));
+  while (true) {
+    ARROW_ASSIGN_OR_RAISE(auto segment, segmenter->GetNextSegment(segment_batch, offset));
+    if (segment.offset >= segment_batch.length) break;
+    ARROW_RETURN_NOT_OK(handle_batch(batch, segment));
+    offset = segment.offset + segment.length;
+  }
+  return Status::OK();
+}
+
+Status GetScalarFields(std::vector<Datum>& values, const ExecBatch& input_batch,

Review Comment:
   ```suggestion
   Status GetScalarFields(std::vector<Datum>* values, const ExecBatch& input_batch,
   ```
   Prefer pointer over mutable reference since `GetScalarFields(&values)` is more clearly modifying `values` than `GetScalarFields(values)`.



##########
cpp/src/arrow/compute/exec/aggregate_node.cc:
##########
@@ -440,15 +623,33 @@ class GroupByNode : public ExecNode, public TracedNode {
       int key_field_id = key_field_ids[i];
       output_fields[base + i] = input_schema->field(key_field_id);
     }
+    base += keys.size();
+    for (size_t i = 0; i < segment_keys.size(); ++i) {
+      int segment_key_field_id = segment_key_field_ids[i];
+      output_fields[base + i] = input_schema->field(segment_key_field_id);

Review Comment:
   If a column was used both as a key field and a segment field does this mean we would output that column twice?



##########
cpp/src/arrow/compute/row/grouper.cc:
##########
@@ -39,12 +43,521 @@
 namespace arrow {
 
 using internal::checked_cast;
+using internal::PrimitiveScalarBase;
 
 namespace compute {
 
 namespace {
 
-struct GrouperImpl : Grouper {
+constexpr uint32_t kNoGroupId = std::numeric_limits<uint32_t>::max();
+
+using group_id_t = std::remove_const<decltype(kNoGroupId)>::type;
+using GroupIdType = CTypeTraits<group_id_t>::ArrowType;
+auto group_id_type = std::make_shared<GroupIdType>();
+
+inline const uint8_t* GetValuesAsBytes(const ArrayData& data, int64_t offset = 0) {
+  DCHECK_GT(data.type->byte_width(), 0);
+  int64_t absolute_byte_offset = (data.offset + offset) * data.type->byte_width();
+  return data.GetValues<uint8_t>(1, absolute_byte_offset);
+}
+
+inline const uint8_t* GetValuesAsBytes(const ArraySpan& data, int64_t offset = 0) {
+  DCHECK_GT(data.type->byte_width(), 0);
+  int64_t absolute_byte_offset = (data.offset + offset) * data.type->byte_width();
+  return data.GetValues<uint8_t>(1, absolute_byte_offset);
+}
+
+template <typename Value>
+Status CheckForGetNextSegment(const std::vector<Value>& values, int64_t length,
+                              int64_t offset, const std::vector<TypeHolder>& key_types) {
+  if (offset < 0 || offset > length) {
+    return Status::Invalid("invalid grouping segmenter offset: ", offset);
+  }
+  if (values.size() != key_types.size()) {
+    return Status::Invalid("expected batch size ", key_types.size(), " but got ",
+                           values.size());
+  }
+  for (size_t i = 0; i < key_types.size(); i++) {
+    const auto& value = values[i];
+    const auto& key_type = key_types[i];
+    if (*value.type() != *key_type.type) {
+      return Status::Invalid("expected batch value ", i, " of type ", *key_type.type,
+                             " but got ", *value.type());
+    }
+  }
+  return Status::OK();
+}
+
+template <typename Batch>
+enable_if_t<std::is_same<Batch, ExecSpan>::value || std::is_same<Batch, ExecBatch>::value,
+            Status>
+CheckForGetNextSegment(const Batch& batch, int64_t offset,
+                       const std::vector<TypeHolder>& key_types) {
+  return CheckForGetNextSegment(batch.values, batch.length, offset, key_types);
+}
+
+struct BaseGroupingSegmenter : public GroupingSegmenter {
+  explicit BaseGroupingSegmenter(const std::vector<TypeHolder>& key_types)
+      : key_types_(key_types) {}
+
+  const std::vector<TypeHolder>& key_types() const override { return key_types_; }
+
+  std::vector<TypeHolder> key_types_;
+};
+
+GroupingSegment MakeSegment(int64_t batch_length, int64_t offset, int64_t length,
+                            bool extends) {
+  return GroupingSegment{offset, length, offset + length >= batch_length, extends};
+}
+
+int64_t GetMatchLength(const uint8_t* match_bytes, int64_t match_width,
+                       const uint8_t* array_bytes, int64_t offset, int64_t length) {
+  int64_t cursor, byte_cursor;
+  for (cursor = offset, byte_cursor = match_width * cursor; cursor < length;
+       cursor++, byte_cursor += match_width) {
+    if (memcmp(match_bytes, array_bytes + byte_cursor,
+               static_cast<size_t>(match_width)) != 0) {
+      break;
+    }
+  }
+  return std::min(cursor, length - offset);
+}
+
+using ExtendFunc = std::function<bool(const void*)>;
+constexpr bool kDefaultExtends = true;
+constexpr bool kEmptyExtends = true;
+
+Result<GroupingSegment> GetNextSegmentChunked(
+    const std::shared_ptr<ChunkedArray>& chunked_array, int64_t offset,
+    ExtendFunc extend) {
+  if (offset >= chunked_array->length()) {
+    return MakeSegment(chunked_array->length(), chunked_array->length(), 0,
+                       kEmptyExtends);
+  }
+  int64_t remaining_offset = offset;
+  const auto& arrays = chunked_array->chunks();
+  for (size_t i = 0; remaining_offset >= 0 && i < arrays.size(); i++) {
+    // look up chunk containing offset
+    int64_t array_length = arrays[i]->length();
+    if (remaining_offset < array_length) {
+      // found - switch to matching
+      int64_t match_width = arrays[i]->type()->byte_width();
+      const uint8_t* match_bytes = GetValuesAsBytes(*arrays[i]->data(), remaining_offset);
+      int64_t total_match_length = 0;
+      for (; i < arrays.size(); i++) {
+        int64_t array_length = arrays[i]->length();
+        if (array_length <= 0) continue;
+        const uint8_t* array_bytes = GetValuesAsBytes(*arrays[i]->data());
+        int64_t match_length = GetMatchLength(match_bytes, match_width, array_bytes,
+                                              remaining_offset, array_length);
+        total_match_length += match_length;
+        remaining_offset = 0;
+        if (match_length < array_length - remaining_offset) break;
+      }
+      bool extends = extend(match_bytes);
+      return MakeSegment(chunked_array->length(), offset, total_match_length, extends);
+    }
+    remaining_offset -= array_length;
+  }
+  return Status::Invalid("segmenting invalid chunked array value");
+}
+
+struct NoKeysGroupingSegmenter : public BaseGroupingSegmenter {
+  static std::unique_ptr<GroupingSegmenter> Make() {
+    return std::make_unique<NoKeysGroupingSegmenter>();
+  }
+
+  NoKeysGroupingSegmenter() : BaseGroupingSegmenter({}) {}
+
+  Status Reset() override { return Status::OK(); }
+
+  template <typename Batch>
+  Result<GroupingSegment> GetNextSegmentImpl(const Batch& batch, int64_t offset) {
+    ARROW_RETURN_NOT_OK(CheckForGetNextSegment(batch, offset, {}));
+    return MakeSegment(batch.length, offset, batch.length - offset, kDefaultExtends);
+  }
+
+  Result<GroupingSegment> GetNextSegment(const ExecSpan& batch, int64_t offset) override {
+    return GetNextSegmentImpl(batch, offset);
+  }
+
+  Result<GroupingSegment> GetNextSegment(const ExecBatch& batch,
+                                         int64_t offset) override {
+    return GetNextSegmentImpl(batch, offset);
+  }
+};
+
+struct SimpleKeyGroupingSegmenter : public BaseGroupingSegmenter {
+  static Result<std::unique_ptr<GroupingSegmenter>> Make(TypeHolder key_type) {
+    return std::make_unique<SimpleKeyGroupingSegmenter>(key_type);
+  }
+
+  explicit SimpleKeyGroupingSegmenter(TypeHolder key_type)
+      : BaseGroupingSegmenter({key_type}), key_type_(key_types_[0]), save_key_data_() {}
+
+  Status CheckType(const DataType& type) {
+    if (!is_fixed_width(type)) {
+      return Status::Invalid("SimpleKeyGroupingSegmenter does not support type ", type);
+    }
+    return Status::OK();
+  }
+
+  Status Reset() override {
+    save_key_data_.resize(0);
+    return Status::OK();
+  }
+
+  bool Extend(const void* data) {
+    size_t byte_width = static_cast<size_t>(key_type_.type->byte_width());
+    bool extends = save_key_data_.size() != byte_width
+                       ? kDefaultExtends
+                       : 0 == memcmp(save_key_data_.data(), data, byte_width);
+    save_key_data_.resize(byte_width);
+    memcpy(save_key_data_.data(), data, byte_width);
+    return extends;
+  }
+
+  Result<GroupingSegment> GetNextSegment(const Scalar& scalar, int64_t offset,
+                                         int64_t length) {
+    ARROW_RETURN_NOT_OK(CheckType(*scalar.type));
+    if (!scalar.is_valid) {
+      return Status::Invalid("segmenting an invalid scalar");
+    }
+    auto data = checked_cast<const PrimitiveScalarBase&>(scalar).data();
+    bool extends = length > 0 ? Extend(data) : kEmptyExtends;
+    return MakeSegment(length, 0, length, extends);
+  }
+
+  Result<GroupingSegment> GetNextSegment(const DataType& array_type,
+                                         const uint8_t* array_bytes, int64_t offset,
+                                         int64_t length) {
+    RETURN_NOT_OK(CheckType(array_type));
+    int64_t byte_width = array_type.byte_width();
+    int64_t match_length = GetMatchLength(array_bytes + offset * byte_width, byte_width,
+                                          array_bytes, offset, length);
+    bool extends = length > 0 ? Extend(array_bytes + offset * byte_width) : kEmptyExtends;
+    return MakeSegment(length, offset, match_length, extends);
+  }
+
+  Result<GroupingSegment> GetNextSegment(const ExecSpan& batch, int64_t offset) override {
+    ARROW_RETURN_NOT_OK(CheckForGetNextSegment(batch, offset, {key_type_}));
+    if (offset == batch.length) {
+      return MakeSegment(batch.length, offset, 0, kEmptyExtends);
+    }
+    const auto& value = batch.values[0];
+    if (value.is_scalar()) {
+      return GetNextSegment(*value.scalar, offset, batch.length);
+    }
+    ARROW_DCHECK(value.is_array());
+    const auto& array = value.array;
+    if (array.GetNullCount() > 0) {
+      return Status::NotImplemented("segmenting a nullable array");
+    }
+    return GetNextSegment(*array.type, GetValuesAsBytes(array), offset, batch.length);
+  }
+
+  Result<GroupingSegment> GetNextSegment(const ExecBatch& batch,
+                                         int64_t offset) override {
+    ARROW_RETURN_NOT_OK(CheckForGetNextSegment(batch, offset, {key_type_}));
+    if (offset == batch.length) {
+      return MakeSegment(batch.length, offset, 0, kEmptyExtends);
+    }
+    const auto& value = batch.values[0];
+    if (value.is_scalar()) {
+      return GetNextSegment(*value.scalar(), offset, batch.length);
+    }
+    if (value.is_array()) {
+      auto array = value.array();
+      if (array->GetNullCount() > 0) {
+        return Status::NotImplemented("segmenting a nullable array");
+      }
+      return GetNextSegment(*array->type, GetValuesAsBytes(*array), offset, batch.length);
+    }
+    if (value.is_chunked_array()) {
+      auto array = value.chunked_array();
+      if (array->null_count() > 0) {
+        return Status::NotImplemented("segmenting a nullable array");
+      }
+      return GetNextSegmentChunked(array, offset, bound_extend_);
+    }
+    return Status::Invalid("segmenting unsupported value kind ", value.kind());
+  }
+
+ private:
+  TypeHolder key_type_;
+  std::vector<uint8_t> save_key_data_;
+  ExtendFunc bound_extend_ = [this](const void* data) { return Extend(data); };
+};
+
+struct AnyKeysGroupingSegmenter : public BaseGroupingSegmenter {
+  static Result<std::unique_ptr<GroupingSegmenter>> Make(
+      const std::vector<TypeHolder>& key_types, ExecContext* ctx) {
+    ARROW_RETURN_NOT_OK(Grouper::Make(key_types, ctx));  // check types
+    return std::make_unique<AnyKeysGroupingSegmenter>(key_types, ctx);
+  }
+
+  AnyKeysGroupingSegmenter(const std::vector<TypeHolder>& key_types, ExecContext* ctx)
+      : BaseGroupingSegmenter(key_types),
+        ctx_(ctx),
+        grouper_(nullptr),
+        save_group_id_(kNoGroupId) {}
+
+  Status Reset() override {
+    grouper_ = nullptr;
+    save_group_id_ = kNoGroupId;
+    return Status::OK();
+  }
+
+  bool Extend(const void* data) {
+    auto group_id = *static_cast<const group_id_t*>(data);
+    bool extends =
+        save_group_id_ == kNoGroupId ? kDefaultExtends : save_group_id_ == group_id;
+    save_group_id_ = group_id;
+    return extends;
+  }
+
+  template <typename Batch>
+  Result<group_id_t> MapGroupIdAt(const Batch& batch, int64_t offset) {
+    if (offset < 0 || offset >= batch.length) {
+      return Status::Invalid("requesting group id out of bounds");
+    }
+    if (!grouper_) return kNoGroupId;
+    ARROW_ASSIGN_OR_RAISE(auto datum, grouper_->Consume(batch, offset,
+                                                        /*consume_length=*/1));
+    if (!(datum.is_array() || datum.is_chunked_array())) {
+      return Status::Invalid("accessing unsupported datum kind ", datum.kind());
+    }
+    const std::shared_ptr<ArrayData>& data =
+        datum.is_array() ? datum.array() : datum.chunked_array()->chunk(0)->data();
+    ARROW_DCHECK(data->GetNullCount() == 0);
+    DCHECK_EQ(data->type->id(), GroupIdType::type_id);
+    DCHECK_EQ(1, data->length);
+    const group_id_t* values = data->GetValues<group_id_t>(1);
+    return values[0];
+  }
+
+  template <typename Batch>
+  Result<GroupingSegment> GetNextSegmentImpl(const Batch& batch, int64_t offset) {
+    ARROW_RETURN_NOT_OK(CheckForGetNextSegment(batch, offset, key_types_));
+    if (offset == batch.length) {
+      return MakeSegment(batch.length, offset, 0, kEmptyExtends);
+    }
+    // ARROW-18311: make Grouper support Reset()
+    // so it can be cached instead of recreated below
+    //
+    // the group id must be computed prior to resetting the grouper, since it is compared
+    // to save_group_id_, and after resetting the grouper produces incomparable group ids
+    ARROW_ASSIGN_OR_RAISE(auto group_id, MapGroupIdAt(batch, offset));
+    ExtendFunc bound_extend = [this, group_id](const void* data) {
+      bool extends = Extend(&group_id);
+      save_group_id_ = *static_cast<const group_id_t*>(data);
+      return extends;
+    };
+    ARROW_ASSIGN_OR_RAISE(grouper_, Grouper::Make(key_types_, ctx_));  // TODO: reset it
+    ARROW_ASSIGN_OR_RAISE(auto datum, grouper_->Consume(batch, offset));
+    if (datum.is_array()) {
+      const std::shared_ptr<ArrayData>& data = datum.array();
+      ARROW_DCHECK(data->GetNullCount() == 0);
+      DCHECK_EQ(data->type->id(), GroupIdType::type_id);
+      const group_id_t* values = data->GetValues<group_id_t>(1);
+      int64_t cursor;
+      for (cursor = 1; cursor < data->length; cursor++) {
+        if (values[0] != values[cursor]) break;
+      }
+      int64_t length = std::min(cursor, batch.length - offset);
+      bool extends = length > 0 ? bound_extend(values) : kEmptyExtends;
+      return MakeSegment(batch.length, offset, length, extends);
+    } else if (datum.is_chunked_array()) {
+      ARROW_ASSIGN_OR_RAISE(
+          auto segment, GetNextSegmentChunked(datum.chunked_array(), 0, bound_extend));
+      segment.offset += offset;
+      return segment;
+    } else {
+      return Status::Invalid("segmenting unsupported datum kind ", datum.kind());
+    }
+  }
+
+  Result<GroupingSegment> GetNextSegment(const ExecSpan& batch, int64_t offset) override {
+    return GetNextSegmentImpl(batch, offset);
+  }
+
+  Result<GroupingSegment> GetNextSegment(const ExecBatch& batch,
+                                         int64_t offset) override {
+    return GetNextSegmentImpl(batch, offset);
+  }
+
+ private:
+  ExecContext* const ctx_;
+  std::unique_ptr<Grouper> grouper_;
+  group_id_t save_group_id_;
+};
+
+Status CheckForConsume(int64_t batch_length, int64_t& consume_offset,
+                       int64_t* consume_length) {
+  if (consume_offset < 0) {
+    return Status::Invalid("invalid grouper consume offset: ", consume_offset);
+  }
+  if (*consume_length < 0) {
+    *consume_length = batch_length - consume_offset;
+  }
+  return Status::OK();
+}
+
+}  // namespace
+
+Result<std::unique_ptr<GroupingSegmenter>> GroupingSegmenter::Make(
+    const std::vector<TypeHolder>& key_types, bool nullable_keys, ExecContext* ctx) {
+  if (key_types.size() == 0) {
+    return NoKeysGroupingSegmenter::Make();
+  } else if (!nullable_keys && key_types.size() == 1) {
+    const DataType* type = key_types[0].type;
+    if (type != NULLPTR && is_fixed_width(*type)) {
+      return SimpleKeyGroupingSegmenter::Make(key_types[0]);
+    }
+  }
+  return AnyKeysGroupingSegmenter::Make(key_types, ctx);
+}
+
+namespace {
+
+struct BaseGrouper : public Grouper {
+  int IndexOfChunk(const ExecBatch& batch) {
+    int i = 0;
+    for (const auto& value : batch.values) {
+      if (value.is_chunked_array()) {
+        return i;
+      }
+      ++i;
+    }
+    return -1;
+  }
+
+  bool HasConsistentChunks(const ExecBatch& batch, int index_of_chunk) {
+    auto first_chunked_array = batch.values[index_of_chunk].chunked_array();
+    if (first_chunked_array < 0) {
+      // having no chunks is considered consistent
+      return true;
+    }
+    int num_chunks = first_chunked_array->num_chunks();
+    int64_t length = first_chunked_array->length();
+    for (const auto& value : batch.values) {
+      if (!value.is_chunked_array()) {
+        continue;
+      }
+      auto curr_chunk = value.chunked_array();
+      if (num_chunks != curr_chunk->num_chunks() || length != curr_chunk->length()) {
+        return false;
+      }
+    }
+    if (num_chunks > 0) {
+      for (int i = 0; i < num_chunks; i++) {
+        int64_t chunk_length = first_chunked_array->chunk(i)->length();
+        for (const auto& value : batch.values) {
+          if (!value.is_chunked_array()) {
+            continue;
+          }
+          auto curr_chunk = value.chunked_array();
+          if (chunk_length != curr_chunk->chunk(i)->length()) {
+            return false;
+          }
+        }
+      }
+    }
+    return true;
+  }
+
+  using Grouper::Consume;
+
+  Result<Datum> Consume(const ExecBatch& batch, int64_t consume_offset,

Review Comment:
   This method can be considerably simplified since we don't have to worry about chunked arrays.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org