You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2022/06/14 03:06:06 UTC

[arrow] branch master updated: ARROW-16590: [C++] Consolidate files dealing with row-major storage (#13218)

This is an automated email from the ASF dual-hosted git repository.

wesm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new 5b859fda1a ARROW-16590: [C++] Consolidate files dealing with row-major storage (#13218)
5b859fda1a is described below

commit 5b859fda1ae5baec20a3ef54c279d9915e66dd98
Author: Weston Pace <we...@gmail.com>
AuthorDate: Mon Jun 13 17:05:59 2022 -1000

    ARROW-16590: [C++] Consolidate files dealing with row-major storage (#13218)
    
    The primary goal of this refactor of old code was to improve the readability and clarity of the code base. I did not make any functional changes to the code and if any functional changes are suggested which modify existing code I will happily discuss them here but defer the changes themselves to follow-up PRs. I would very much appreciate any feedback on naming, making sure we have sufficient test coverage, and overall layout of the code.
    
    * KeyRowArray -> RowTableImpl KeyEncoder -> RowTableEncoder: The old name made sense because this data is currently represented physically as an array of rows. However, the data is conceptually tabular. We are storing rows & columns. In particular, I found it confusing that `KeyColumnArray` was a 1D data structure while `KeyRowArray` was a 2D table structure.
    * KeyEncoder::Context -> LightContext: There's nothing particular to the key encoder here and I worry keeping it there may lead to fracturing into many different "context" objects.
    * Overall structure: I created a new folder arrow/compute/row and put all row-based utilities in here. Most of the files are now marked as _internal and the content in these files is not used outside of arrow/compute/row. The grouper had previously been alongside the kernel code and it didn't really belong there as it relies very heavily on the internal structure of the row encoding.
    * Row structure: I documented the file arrow/compute/row/row_internal.h
    
    Lead-authored-by: Weston Pace <we...@gmail.com>
    Co-authored-by: Wes McKinney <we...@apache.org>
    Signed-off-by: Wes McKinney <we...@apache.org>
---
 cpp/src/arrow/CMakeLists.txt                       |   13 +-
 cpp/src/arrow/compute/CMakeLists.txt               |    1 +
 cpp/src/arrow/compute/api.h                        |    6 +
 cpp/src/arrow/compute/api_aggregate.h              |   85 --
 cpp/src/arrow/compute/exec/aggregate.cc            |  239 ++++
 cpp/src/arrow/compute/exec/aggregate.h             |   60 +
 cpp/src/arrow/compute/exec/aggregate_node.cc       |   25 +-
 cpp/src/arrow/compute/exec/hash_join.cc            |    3 +-
 cpp/src/arrow/compute/exec/key_encode.h            |  502 ---------
 cpp/src/arrow/compute/exec/key_hash.cc             |   10 +-
 cpp/src/arrow/compute/exec/key_hash.h              |   10 +-
 cpp/src/arrow/compute/exec/schema_util.h           |    4 +-
 .../arrow/compute/kernels/aggregate_benchmark.cc   |    1 +
 cpp/src/arrow/compute/kernels/hash_aggregate.cc    |  748 +------------
 .../arrow/compute/kernels/hash_aggregate_test.cc   |   57 +-
 cpp/src/arrow/compute/light_array.h                |   14 +
 cpp/src/arrow/compute/row/CMakeLists.txt           |   21 +
 .../key_compare.cc => row/compare_internal.cc}     |   50 +-
 .../{exec/key_compare.h => row/compare_internal.h} |   94 +-
 .../compare_internal_avx2.cc}                      |   33 +-
 .../{exec/key_encode.cc => row/encode_internal.cc} | 1151 +++++++-------------
 cpp/src/arrow/compute/row/encode_internal.h        |  323 ++++++
 .../encode_internal_avx2.cc}                       |   45 +-
 cpp/src/arrow/compute/row/grouper.cc               |  590 ++++++++++
 cpp/src/arrow/compute/row/grouper.h                |  112 ++
 cpp/src/arrow/compute/row/row_internal.cc          |  409 +++++++
 cpp/src/arrow/compute/row/row_internal.h           |  250 +++++
 cpp/src/arrow/dataset/partition.cc                 |    9 +-
 python/pyarrow/includes/libarrow.pxd               |    2 +-
 29 files changed, 2574 insertions(+), 2293 deletions(-)

diff --git a/cpp/src/arrow/CMakeLists.txt b/cpp/src/arrow/CMakeLists.txt
index fd2f10db2f..6e348aaf43 100644
--- a/cpp/src/arrow/CMakeLists.txt
+++ b/cpp/src/arrow/CMakeLists.txt
@@ -380,6 +380,7 @@ if(ARROW_COMPUTE)
        compute/api_vector.cc
        compute/cast.cc
        compute/exec.cc
+       compute/exec/aggregate.cc
        compute/exec/aggregate_node.cc
        compute/exec/bloom_filter.cc
        compute/exec/exec_plan.cc
@@ -389,8 +390,6 @@ if(ARROW_COMPUTE)
        compute/exec/hash_join_dict.cc
        compute/exec/hash_join_node.cc
        compute/exec/ir_consumer.cc
-       compute/exec/key_compare.cc
-       compute/exec/key_encode.cc
        compute/exec/key_hash.cc
        compute/exec/key_map.cc
        compute/exec/order_by_impl.cc
@@ -442,17 +441,21 @@ if(ARROW_COMPUTE)
        compute/kernels/vector_nested.cc
        compute/kernels/vector_replace.cc
        compute/kernels/vector_selection.cc
-       compute/kernels/vector_sort.cc)
+       compute/kernels/vector_sort.cc
+       compute/row/encode_internal.cc
+       compute/row/compare_internal.cc
+       compute/row/grouper.cc
+       compute/row/row_internal.cc)
 
   append_avx2_src(compute/kernels/aggregate_basic_avx2.cc)
   append_avx512_src(compute/kernels/aggregate_basic_avx512.cc)
 
   append_avx2_src(compute/exec/bloom_filter_avx2.cc)
-  append_avx2_src(compute/exec/key_compare_avx2.cc)
-  append_avx2_src(compute/exec/key_encode_avx2.cc)
   append_avx2_src(compute/exec/key_hash_avx2.cc)
   append_avx2_src(compute/exec/key_map_avx2.cc)
   append_avx2_src(compute/exec/util_avx2.cc)
+  append_avx2_src(compute/row/compare_internal_avx2.cc)
+  append_avx2_src(compute/row/encode_internal_avx2.cc)
 
   list(APPEND ARROW_TESTING_SRCS compute/exec/test_util.cc)
 endif()
diff --git a/cpp/src/arrow/compute/CMakeLists.txt b/cpp/src/arrow/compute/CMakeLists.txt
index 27e693d9a3..91fa796f6d 100644
--- a/cpp/src/arrow/compute/CMakeLists.txt
+++ b/cpp/src/arrow/compute/CMakeLists.txt
@@ -71,3 +71,4 @@ add_arrow_benchmark(function_benchmark PREFIX "arrow-compute")
 add_subdirectory(kernels)
 
 add_subdirectory(exec)
+add_subdirectory(row)
diff --git a/cpp/src/arrow/compute/api.h b/cpp/src/arrow/compute/api.h
index c8e0c2ee23..80582e47b7 100644
--- a/cpp/src/arrow/compute/api.h
+++ b/cpp/src/arrow/compute/api.h
@@ -46,3 +46,9 @@
 /// @}
 
 #include "arrow/compute/exec/options.h"  // IWYU pragma: export
+
+/// \defgroup execnode-row Utilities for working with data in a row-major format
+/// @{
+/// @}
+
+#include "arrow/compute/row/grouper.h"  // IWYU pragma: export
diff --git a/cpp/src/arrow/compute/api_aggregate.h b/cpp/src/arrow/compute/api_aggregate.h
index 1255a57ff1..7977889bf7 100644
--- a/cpp/src/arrow/compute/api_aggregate.h
+++ b/cpp/src/arrow/compute/api_aggregate.h
@@ -395,84 +395,6 @@ Result<Datum> Index(const Datum& value, const IndexOptions& options,
 
 namespace internal {
 
-/// Internal use only: streaming group identifier.
-/// Consumes batches of keys and yields batches of the group ids.
-class ARROW_EXPORT Grouper {
- public:
-  virtual ~Grouper() = default;
-
-  /// Construct a Grouper which receives the specified key types
-  static Result<std::unique_ptr<Grouper>> Make(const std::vector<ValueDescr>& descrs,
-                                               ExecContext* ctx = default_exec_context());
-
-  /// Consume a batch of keys, producing the corresponding group ids as an integer array.
-  /// Currently only uint32 indices will be produced, eventually the bit width will only
-  /// be as wide as necessary.
-  virtual Result<Datum> Consume(const ExecBatch& batch) = 0;
-
-  /// Get current unique keys. May be called multiple times.
-  virtual Result<ExecBatch> GetUniques() = 0;
-
-  /// Get the current number of groups.
-  virtual uint32_t num_groups() const = 0;
-
-  /// \brief Assemble lists of indices of identical elements.
-  ///
-  /// \param[in] ids An unsigned, all-valid integral array which will be
-  ///                used as grouping criteria.
-  /// \param[in] num_groups An upper bound for the elements of ids
-  /// \return A num_groups-long ListArray where the slot at i contains a
-  ///         list of indices where i appears in ids.
-  ///
-  ///   MakeGroupings([
-  ///       2,
-  ///       2,
-  ///       5,
-  ///       5,
-  ///       2,
-  ///       3
-  ///   ], 8) == [
-  ///       [],
-  ///       [],
-  ///       [0, 1, 4],
-  ///       [5],
-  ///       [],
-  ///       [2, 3],
-  ///       [],
-  ///       []
-  ///   ]
-  static Result<std::shared_ptr<ListArray>> MakeGroupings(
-      const UInt32Array& ids, uint32_t num_groups,
-      ExecContext* ctx = default_exec_context());
-
-  /// \brief Produce a ListArray whose slots are selections of `array` which correspond to
-  /// the provided groupings.
-  ///
-  /// For example,
-  ///   ApplyGroupings([
-  ///       [],
-  ///       [],
-  ///       [0, 1, 4],
-  ///       [5],
-  ///       [],
-  ///       [2, 3],
-  ///       [],
-  ///       []
-  ///   ], [2, 2, 5, 5, 2, 3]) == [
-  ///       [],
-  ///       [],
-  ///       [2, 2, 2],
-  ///       [3],
-  ///       [],
-  ///       [5, 5],
-  ///       [],
-  ///       []
-  ///   ]
-  static Result<std::shared_ptr<ListArray>> ApplyGroupings(
-      const ListArray& groupings, const Array& array,
-      ExecContext* ctx = default_exec_context());
-};
-
 /// \brief Configure a grouped aggregation
 struct ARROW_EXPORT Aggregate {
   /// the name of the aggregation function
@@ -482,13 +404,6 @@ struct ARROW_EXPORT Aggregate {
   const FunctionOptions* options;
 };
 
-/// Internal use only: helper function for testing HashAggregateKernels.
-/// This will be replaced by streaming execution operators.
-ARROW_EXPORT
-Result<Datum> GroupBy(const std::vector<Datum>& arguments, const std::vector<Datum>& keys,
-                      const std::vector<Aggregate>& aggregates, bool use_threads = false,
-                      ExecContext* ctx = default_exec_context());
-
 }  // namespace internal
 }  // namespace compute
 }  // namespace arrow
diff --git a/cpp/src/arrow/compute/exec/aggregate.cc b/cpp/src/arrow/compute/exec/aggregate.cc
new file mode 100644
index 0000000000..1a81fdac3d
--- /dev/null
+++ b/cpp/src/arrow/compute/exec/aggregate.cc
@@ -0,0 +1,239 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/compute/exec/aggregate.h"
+
+#include <mutex>
+
+#include "arrow/compute/exec_internal.h"
+#include "arrow/compute/registry.h"
+#include "arrow/compute/row/grouper.h"
+#include "arrow/util/task_group.h"
+
+namespace arrow {
+namespace compute {
+namespace internal {
+
+Result<std::vector<const HashAggregateKernel*>> GetKernels(
+    ExecContext* ctx, const std::vector<Aggregate>& aggregates,
+    const std::vector<ValueDescr>& in_descrs) {
+  if (aggregates.size() != in_descrs.size()) {
+    return Status::Invalid(aggregates.size(), " aggregate functions were specified but ",
+                           in_descrs.size(), " arguments were provided.");
+  }
+
+  std::vector<const HashAggregateKernel*> kernels(in_descrs.size());
+
+  for (size_t i = 0; i < aggregates.size(); ++i) {
+    ARROW_ASSIGN_OR_RAISE(auto function,
+                          ctx->func_registry()->GetFunction(aggregates[i].function));
+    ARROW_ASSIGN_OR_RAISE(
+        const Kernel* kernel,
+        function->DispatchExact({in_descrs[i], ValueDescr::Array(uint32())}));
+    kernels[i] = static_cast<const HashAggregateKernel*>(kernel);
+  }
+  return kernels;
+}
+
+Result<std::vector<std::unique_ptr<KernelState>>> InitKernels(
+    const std::vector<const HashAggregateKernel*>& kernels, ExecContext* ctx,
+    const std::vector<Aggregate>& aggregates, const std::vector<ValueDescr>& in_descrs) {
+  std::vector<std::unique_ptr<KernelState>> states(kernels.size());
+
+  for (size_t i = 0; i < aggregates.size(); ++i) {
+    auto options = aggregates[i].options;
+
+    if (options == nullptr) {
+      // use known default options for the named function if possible
+      auto maybe_function = ctx->func_registry()->GetFunction(aggregates[i].function);
+      if (maybe_function.ok()) {
+        options = maybe_function.ValueOrDie()->default_options();
+      }
+    }
+
+    KernelContext kernel_ctx{ctx};
+    ARROW_ASSIGN_OR_RAISE(
+        states[i],
+        kernels[i]->init(&kernel_ctx, KernelInitArgs{kernels[i],
+                                                     {
+                                                         in_descrs[i],
+                                                         ValueDescr::Array(uint32()),
+                                                     },
+                                                     options}));
+  }
+
+  return std::move(states);
+}
+
+Result<FieldVector> ResolveKernels(
+    const std::vector<Aggregate>& aggregates,
+    const std::vector<const HashAggregateKernel*>& kernels,
+    const std::vector<std::unique_ptr<KernelState>>& states, ExecContext* ctx,
+    const std::vector<ValueDescr>& descrs) {
+  FieldVector fields(descrs.size());
+
+  for (size_t i = 0; i < kernels.size(); ++i) {
+    KernelContext kernel_ctx{ctx};
+    kernel_ctx.SetState(states[i].get());
+
+    ARROW_ASSIGN_OR_RAISE(auto descr, kernels[i]->signature->out_type().Resolve(
+                                          &kernel_ctx, {
+                                                           descrs[i],
+                                                           ValueDescr::Array(uint32()),
+                                                       }));
+    fields[i] = field(aggregates[i].function, std::move(descr.type));
+  }
+  return fields;
+}
+
+Result<Datum> GroupBy(const std::vector<Datum>& arguments, const std::vector<Datum>& keys,
+                      const std::vector<Aggregate>& aggregates, bool use_threads,
+                      ExecContext* ctx) {
+  auto task_group =
+      use_threads
+          ? arrow::internal::TaskGroup::MakeThreaded(arrow::internal::GetCpuThreadPool())
+          : arrow::internal::TaskGroup::MakeSerial();
+
+  std::vector<const HashAggregateKernel*> kernels;
+  std::vector<std::vector<std::unique_ptr<KernelState>>> states;
+  FieldVector out_fields;
+
+  using arrow::compute::detail::ExecBatchIterator;
+  std::unique_ptr<ExecBatchIterator> argument_batch_iterator;
+
+  if (!arguments.empty()) {
+    ARROW_ASSIGN_OR_RAISE(ExecBatch args_batch, ExecBatch::Make(arguments));
+
+    // Construct and initialize HashAggregateKernels
+    auto argument_descrs = args_batch.GetDescriptors();
+
+    ARROW_ASSIGN_OR_RAISE(kernels, GetKernels(ctx, aggregates, argument_descrs));
+
+    states.resize(task_group->parallelism());
+    for (auto& state : states) {
+      ARROW_ASSIGN_OR_RAISE(state,
+                            InitKernels(kernels, ctx, aggregates, argument_descrs));
+    }
+
+    ARROW_ASSIGN_OR_RAISE(
+        out_fields, ResolveKernels(aggregates, kernels, states[0], ctx, argument_descrs));
+
+    ARROW_ASSIGN_OR_RAISE(
+        argument_batch_iterator,
+        ExecBatchIterator::Make(args_batch.values, ctx->exec_chunksize()));
+  }
+
+  // Construct Groupers
+  ARROW_ASSIGN_OR_RAISE(ExecBatch keys_batch, ExecBatch::Make(keys));
+  auto key_descrs = keys_batch.GetDescriptors();
+
+  std::vector<std::unique_ptr<Grouper>> groupers(task_group->parallelism());
+  for (auto& grouper : groupers) {
+    ARROW_ASSIGN_OR_RAISE(grouper, Grouper::Make(key_descrs, ctx));
+  }
+
+  std::mutex mutex;
+  std::unordered_map<std::thread::id, size_t> thread_ids;
+
+  int i = 0;
+  for (ValueDescr& key_descr : key_descrs) {
+    out_fields.push_back(field("key_" + std::to_string(i++), std::move(key_descr.type)));
+  }
+
+  ARROW_ASSIGN_OR_RAISE(
+      auto key_batch_iterator,
+      ExecBatchIterator::Make(keys_batch.values, ctx->exec_chunksize()));
+
+  // start "streaming" execution
+  ExecBatch key_batch, argument_batch;
+  while ((argument_batch_iterator == NULLPTR ||
+          argument_batch_iterator->Next(&argument_batch)) &&
+         key_batch_iterator->Next(&key_batch)) {
+    if (key_batch.length == 0) continue;
+
+    task_group->Append([&, key_batch, argument_batch] {
+      size_t thread_index;
+      {
+        std::unique_lock<std::mutex> lock(mutex);
+        auto it = thread_ids.emplace(std::this_thread::get_id(), thread_ids.size()).first;
+        thread_index = it->second;
+        DCHECK_LT(static_cast<int>(thread_index), task_group->parallelism());
+      }
+
+      auto grouper = groupers[thread_index].get();
+
+      // compute a batch of group ids
+      ARROW_ASSIGN_OR_RAISE(Datum id_batch, grouper->Consume(key_batch));
+
+      // consume group ids with HashAggregateKernels
+      for (size_t i = 0; i < kernels.size(); ++i) {
+        KernelContext batch_ctx{ctx};
+        batch_ctx.SetState(states[thread_index][i].get());
+        ARROW_ASSIGN_OR_RAISE(auto batch, ExecBatch::Make({argument_batch[i], id_batch}));
+        RETURN_NOT_OK(kernels[i]->resize(&batch_ctx, grouper->num_groups()));
+        RETURN_NOT_OK(kernels[i]->consume(&batch_ctx, batch));
+      }
+
+      return Status::OK();
+    });
+  }
+
+  RETURN_NOT_OK(task_group->Finish());
+
+  // Merge if necessary
+  for (size_t thread_index = 1; thread_index < thread_ids.size(); ++thread_index) {
+    ARROW_ASSIGN_OR_RAISE(ExecBatch other_keys, groupers[thread_index]->GetUniques());
+    ARROW_ASSIGN_OR_RAISE(Datum transposition, groupers[0]->Consume(other_keys));
+    groupers[thread_index].reset();
+
+    for (size_t idx = 0; idx < kernels.size(); ++idx) {
+      KernelContext batch_ctx{ctx};
+      batch_ctx.SetState(states[0][idx].get());
+
+      RETURN_NOT_OK(kernels[idx]->resize(&batch_ctx, groupers[0]->num_groups()));
+      RETURN_NOT_OK(kernels[idx]->merge(&batch_ctx, std::move(*states[thread_index][idx]),
+                                        *transposition.array()));
+      states[thread_index][idx].reset();
+    }
+  }
+
+  // Finalize output
+  ArrayDataVector out_data(arguments.size() + keys.size());
+  auto it = out_data.begin();
+
+  for (size_t idx = 0; idx < kernels.size(); ++idx) {
+    KernelContext batch_ctx{ctx};
+    batch_ctx.SetState(states[0][idx].get());
+    Datum out;
+    RETURN_NOT_OK(kernels[idx]->finalize(&batch_ctx, &out));
+    *it++ = out.array();
+  }
+
+  ARROW_ASSIGN_OR_RAISE(ExecBatch out_keys, groupers[0]->GetUniques());
+  for (const auto& key : out_keys.values) {
+    *it++ = key.array();
+  }
+
+  int64_t length = out_data[0]->length;
+  return ArrayData::Make(struct_(std::move(out_fields)), length,
+                         {/*null_bitmap=*/nullptr}, std::move(out_data),
+                         /*null_count=*/0);
+}
+
+}  // namespace internal
+}  // namespace compute
+}  // namespace arrow
diff --git a/cpp/src/arrow/compute/exec/aggregate.h b/cpp/src/arrow/compute/exec/aggregate.h
new file mode 100644
index 0000000000..2c62acf231
--- /dev/null
+++ b/cpp/src/arrow/compute/exec/aggregate.h
@@ -0,0 +1,60 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <memory>
+#include <vector>
+
+#include "arrow/compute/api_aggregate.h"
+#include "arrow/compute/exec.h"
+#include "arrow/compute/exec/exec_plan.h"
+#include "arrow/compute/kernel.h"
+#include "arrow/datum.h"
+#include "arrow/result.h"
+#include "arrow/util/visibility.h"
+
+namespace arrow {
+namespace compute {
+namespace internal {
+
+/// Internal use only: helper function for testing HashAggregateKernels.
+/// For public use see arrow::compute::Grouper or create an execution plan
+/// and use an aggregate node.
+ARROW_EXPORT
+Result<Datum> GroupBy(const std::vector<Datum>& arguments, const std::vector<Datum>& keys,
+                      const std::vector<Aggregate>& aggregates, bool use_threads = false,
+                      ExecContext* ctx = default_exec_context());
+
+Result<std::vector<const HashAggregateKernel*>> GetKernels(
+    ExecContext* ctx, const std::vector<internal::Aggregate>& aggregates,
+    const std::vector<ValueDescr>& in_descrs);
+
+Result<std::vector<std::unique_ptr<KernelState>>> InitKernels(
+    const std::vector<const HashAggregateKernel*>& kernels, ExecContext* ctx,
+    const std::vector<internal::Aggregate>& aggregates,
+    const std::vector<ValueDescr>& in_descrs);
+
+Result<FieldVector> ResolveKernels(
+    const std::vector<internal::Aggregate>& aggregates,
+    const std::vector<const HashAggregateKernel*>& kernels,
+    const std::vector<std::unique_ptr<KernelState>>& states, ExecContext* ctx,
+    const std::vector<ValueDescr>& descrs);
+
+}  // namespace internal
+}  // namespace compute
+}  // namespace arrow
diff --git a/cpp/src/arrow/compute/exec/aggregate_node.cc b/cpp/src/arrow/compute/exec/aggregate_node.cc
index f6813ecb68..712515bd58 100644
--- a/cpp/src/arrow/compute/exec/aggregate_node.cc
+++ b/cpp/src/arrow/compute/exec/aggregate_node.cc
@@ -21,11 +21,13 @@
 #include <unordered_map>
 
 #include "arrow/compute/exec.h"
+#include "arrow/compute/exec/aggregate.h"
 #include "arrow/compute/exec/exec_plan.h"
 #include "arrow/compute/exec/options.h"
 #include "arrow/compute/exec/util.h"
 #include "arrow/compute/exec_internal.h"
 #include "arrow/compute/registry.h"
+#include "arrow/compute/row/grouper.h"
 #include "arrow/datum.h"
 #include "arrow/result.h"
 #include "arrow/util/checked_cast.h"
@@ -39,25 +41,6 @@ using internal::checked_cast;
 
 namespace compute {
 
-namespace internal {
-
-Result<std::vector<const HashAggregateKernel*>> GetKernels(
-    ExecContext* ctx, const std::vector<internal::Aggregate>& aggregates,
-    const std::vector<ValueDescr>& in_descrs);
-
-Result<std::vector<std::unique_ptr<KernelState>>> InitKernels(
-    const std::vector<const HashAggregateKernel*>& kernels, ExecContext* ctx,
-    const std::vector<internal::Aggregate>& aggregates,
-    const std::vector<ValueDescr>& in_descrs);
-
-Result<FieldVector> ResolveKernels(
-    const std::vector<internal::Aggregate>& aggregates,
-    const std::vector<const HashAggregateKernel*>& kernels,
-    const std::vector<std::unique_ptr<KernelState>>& states, ExecContext* ctx,
-    const std::vector<ValueDescr>& descrs);
-
-}  // namespace internal
-
 namespace {
 
 void AggregatesToString(
@@ -647,7 +630,7 @@ class GroupByNode : public ExecNode {
 
  private:
   struct ThreadLocalState {
-    std::unique_ptr<internal::Grouper> grouper;
+    std::unique_ptr<Grouper> grouper;
     std::vector<std::unique_ptr<KernelState>> agg_states;
   };
 
@@ -670,7 +653,7 @@ class GroupByNode : public ExecNode {
     }
 
     // Construct grouper
-    ARROW_ASSIGN_OR_RAISE(state->grouper, internal::Grouper::Make(key_descrs, ctx_));
+    ARROW_ASSIGN_OR_RAISE(state->grouper, Grouper::Make(key_descrs, ctx_));
 
     // Build vector of aggregate source field data types
     std::vector<ValueDescr> agg_src_descrs(agg_kernels_.size());
diff --git a/cpp/src/arrow/compute/exec/hash_join.cc b/cpp/src/arrow/compute/exec/hash_join.cc
index 391c8b2cd4..9c124674d7 100644
--- a/cpp/src/arrow/compute/exec/hash_join.cc
+++ b/cpp/src/arrow/compute/exec/hash_join.cc
@@ -29,13 +29,12 @@
 #include "arrow/compute/exec/key_hash.h"
 #include "arrow/compute/exec/task_util.h"
 #include "arrow/compute/kernels/row_encoder.h"
+#include "arrow/compute/row/encode_internal.h"
 #include "arrow/util/tracing_internal.h"
 
 namespace arrow {
 namespace compute {
 
-using internal::RowEncoder;
-
 class HashJoinBasicImpl : public HashJoinImpl {
  private:
   struct ThreadLocalState;
diff --git a/cpp/src/arrow/compute/exec/key_encode.h b/cpp/src/arrow/compute/exec/key_encode.h
deleted file mode 100644
index 58d8fb233f..0000000000
--- a/cpp/src/arrow/compute/exec/key_encode.h
+++ /dev/null
@@ -1,502 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#pragma once
-
-#include <cstdint>
-#include <memory>
-#include <vector>
-
-#include "arrow/array/data.h"
-#include "arrow/compute/exec.h"
-#include "arrow/compute/exec/util.h"
-#include "arrow/compute/light_array.h"
-#include "arrow/memory_pool.h"
-#include "arrow/result.h"
-#include "arrow/status.h"
-#include "arrow/util/bit_util.h"
-
-namespace arrow {
-namespace compute {
-
-/// Converts between key representation as a collection of arrays for
-/// individual columns and another representation as a single array of rows
-/// combining data from all columns into one value.
-/// This conversion is reversible.
-/// Row-oriented storage is beneficial when there is a need for random access
-/// of individual rows and at the same time all included columns are likely to
-/// be accessed together, as in the case of hash table key.
-class KeyEncoder {
- public:
-  struct KeyEncoderContext {
-    bool has_avx2() const {
-      return (hardware_flags & arrow::internal::CpuInfo::AVX2) > 0;
-    }
-    int64_t hardware_flags;
-    util::TempVectorStack* stack;
-  };
-
-  /// Description of a storage format for rows produced by encoder.
-  struct KeyRowMetadata {
-    /// Is row a varying-length binary, using offsets array to find a beginning of a row,
-    /// or is it a fixed-length binary.
-    bool is_fixed_length;
-
-    /// For a fixed-length binary row, common size of rows in bytes,
-    /// rounded up to the multiple of alignment.
-    ///
-    /// For a varying-length binary, size of all encoded fixed-length key columns,
-    /// including lengths of varying-length columns, rounded up to the multiple of string
-    /// alignment.
-    uint32_t fixed_length;
-
-    /// Offset within a row to the array of 32-bit offsets within a row of
-    /// ends of varbinary fields.
-    /// Used only when the row is not fixed-length, zero for fixed-length row.
-    /// There are N elements for N varbinary fields.
-    /// Each element is the offset within a row of the first byte after
-    /// the corresponding varbinary field bytes in that row.
-    /// If varbinary fields begin at aligned addresses, than the end of the previous
-    /// varbinary field needs to be rounded up according to the specified alignment
-    /// to obtain the beginning of the next varbinary field.
-    /// The first varbinary field starts at offset specified by fixed_length,
-    /// which should already be aligned.
-    uint32_t varbinary_end_array_offset;
-
-    /// Fixed number of bytes per row that are used to encode null masks.
-    /// Null masks indicate for a single row which of its key columns are null.
-    /// Nth bit in the sequence of bytes assigned to a row represents null
-    /// information for Nth field according to the order in which they are encoded.
-    int null_masks_bytes_per_row;
-
-    /// Power of 2. Every row will start at the offset aligned to that number of bytes.
-    int row_alignment;
-
-    /// Power of 2. Must be no greater than row alignment.
-    /// Every non-power-of-2 binary field and every varbinary field bytes
-    /// will start aligned to that number of bytes.
-    int string_alignment;
-
-    /// Metadata of encoded columns in their original order.
-    std::vector<KeyColumnMetadata> column_metadatas;
-
-    /// Order in which fields are encoded.
-    std::vector<uint32_t> column_order;
-
-    /// Offsets within a row to fields in their encoding order.
-    std::vector<uint32_t> column_offsets;
-
-    /// Rounding up offset to the nearest multiple of alignment value.
-    /// Alignment must be a power of 2.
-    static inline uint32_t padding_for_alignment(uint32_t offset,
-                                                 int required_alignment) {
-      ARROW_DCHECK(ARROW_POPCOUNT64(required_alignment) == 1);
-      return static_cast<uint32_t>((-static_cast<int32_t>(offset)) &
-                                   (required_alignment - 1));
-    }
-
-    /// Rounding up offset to the beginning of next column,
-    /// chosing required alignment based on the data type of that column.
-    static inline uint32_t padding_for_alignment(uint32_t offset, int string_alignment,
-                                                 const KeyColumnMetadata& col_metadata) {
-      if (!col_metadata.is_fixed_length ||
-          ARROW_POPCOUNT64(col_metadata.fixed_length) <= 1) {
-        return 0;
-      } else {
-        return padding_for_alignment(offset, string_alignment);
-      }
-    }
-
-    /// Returns an array of offsets within a row of ends of varbinary fields.
-    inline const uint32_t* varbinary_end_array(const uint8_t* row) const {
-      ARROW_DCHECK(!is_fixed_length);
-      return reinterpret_cast<const uint32_t*>(row + varbinary_end_array_offset);
-    }
-    inline uint32_t* varbinary_end_array(uint8_t* row) const {
-      ARROW_DCHECK(!is_fixed_length);
-      return reinterpret_cast<uint32_t*>(row + varbinary_end_array_offset);
-    }
-
-    /// Returns the offset within the row and length of the first varbinary field.
-    inline void first_varbinary_offset_and_length(const uint8_t* row, uint32_t* offset,
-                                                  uint32_t* length) const {
-      ARROW_DCHECK(!is_fixed_length);
-      *offset = fixed_length;
-      *length = varbinary_end_array(row)[0] - fixed_length;
-    }
-
-    /// Returns the offset within the row and length of the second and further varbinary
-    /// fields.
-    inline void nth_varbinary_offset_and_length(const uint8_t* row, int varbinary_id,
-                                                uint32_t* out_offset,
-                                                uint32_t* out_length) const {
-      ARROW_DCHECK(!is_fixed_length);
-      ARROW_DCHECK(varbinary_id > 0);
-      const uint32_t* varbinary_end = varbinary_end_array(row);
-      uint32_t offset = varbinary_end[varbinary_id - 1];
-      offset += padding_for_alignment(offset, string_alignment);
-      *out_offset = offset;
-      *out_length = varbinary_end[varbinary_id] - offset;
-    }
-
-    uint32_t encoded_field_order(uint32_t icol) const { return column_order[icol]; }
-
-    uint32_t encoded_field_offset(uint32_t icol) const { return column_offsets[icol]; }
-
-    uint32_t num_cols() const { return static_cast<uint32_t>(column_metadatas.size()); }
-
-    uint32_t num_varbinary_cols() const;
-
-    void FromColumnMetadataVector(const std::vector<KeyColumnMetadata>& cols,
-                                  int in_row_alignment, int in_string_alignment);
-
-    bool is_compatible(const KeyRowMetadata& other) const;
-  };
-
-  class KeyRowArray {
-   public:
-    KeyRowArray();
-    Status Init(MemoryPool* pool, const KeyRowMetadata& metadata);
-    void Clean();
-    Status AppendEmpty(uint32_t num_rows_to_append, uint32_t num_extra_bytes_to_append);
-    Status AppendSelectionFrom(const KeyRowArray& from, uint32_t num_rows_to_append,
-                               const uint16_t* source_row_ids);
-    const KeyRowMetadata& metadata() const { return metadata_; }
-    int64_t length() const { return num_rows_; }
-    const uint8_t* data(int i) const {
-      ARROW_DCHECK(i >= 0 && i <= max_buffers_);
-      return buffers_[i];
-    }
-    uint8_t* mutable_data(int i) {
-      ARROW_DCHECK(i >= 0 && i <= max_buffers_);
-      return mutable_buffers_[i];
-    }
-    const uint32_t* offsets() const { return reinterpret_cast<const uint32_t*>(data(1)); }
-    uint32_t* mutable_offsets() { return reinterpret_cast<uint32_t*>(mutable_data(1)); }
-    const uint8_t* null_masks() const { return null_masks_->data(); }
-    uint8_t* null_masks() { return null_masks_->mutable_data(); }
-
-    bool has_any_nulls(const KeyEncoderContext* ctx) const;
-
-   private:
-    Status ResizeFixedLengthBuffers(int64_t num_extra_rows);
-    Status ResizeOptionalVaryingLengthBuffer(int64_t num_extra_bytes);
-
-    int64_t size_null_masks(int64_t num_rows);
-    int64_t size_offsets(int64_t num_rows);
-    int64_t size_rows_fixed_length(int64_t num_rows);
-    int64_t size_rows_varying_length(int64_t num_bytes);
-    void update_buffer_pointers();
-
-    static constexpr int64_t padding_for_vectors = 64;
-    MemoryPool* pool_;
-    KeyRowMetadata metadata_;
-    /// Buffers can only expand during lifetime and never shrink.
-    std::unique_ptr<ResizableBuffer> null_masks_;
-    std::unique_ptr<ResizableBuffer> offsets_;
-    std::unique_ptr<ResizableBuffer> rows_;
-    static constexpr int max_buffers_ = 3;
-    const uint8_t* buffers_[max_buffers_];
-    uint8_t* mutable_buffers_[max_buffers_];
-    int64_t num_rows_;
-    int64_t rows_capacity_;
-    int64_t bytes_capacity_;
-
-    // Mutable to allow lazy evaluation
-    mutable int64_t num_rows_for_has_any_nulls_;
-    mutable bool has_any_nulls_;
-  };
-
-  void Init(const std::vector<KeyColumnMetadata>& cols, KeyEncoderContext* ctx,
-            int row_alignment, int string_alignment);
-
-  const KeyRowMetadata& row_metadata() { return row_metadata_; }
-
-  void PrepareEncodeSelected(int64_t start_row, int64_t num_rows,
-                             const std::vector<KeyColumnArray>& cols);
-  Status EncodeSelected(KeyRowArray* rows, uint32_t num_selected,
-                        const uint16_t* selection);
-
-  /// Decode a window of row oriented data into a corresponding
-  /// window of column oriented storage.
-  /// The output buffers need to be correctly allocated and sized before
-  /// calling each method.
-  /// For that reason decoding is split into two functions.
-  /// The output of the first one, that processes everything except for
-  /// varying length buffers, can be used to find out required varying
-  /// length buffers sizes.
-  void DecodeFixedLengthBuffers(int64_t start_row_input, int64_t start_row_output,
-                                int64_t num_rows, const KeyRowArray& rows,
-                                std::vector<KeyColumnArray>* cols);
-
-  void DecodeVaryingLengthBuffers(int64_t start_row_input, int64_t start_row_output,
-                                  int64_t num_rows, const KeyRowArray& rows,
-                                  std::vector<KeyColumnArray>* cols);
-
-  const std::vector<KeyColumnArray>& GetBatchColumns() const { return batch_all_cols_; }
-
- private:
-  /// Prepare column array vectors.
-  /// Output column arrays represent a range of input column arrays
-  /// specified by starting row and number of rows.
-  /// Three vectors are generated:
-  /// - all columns
-  /// - fixed-length columns only
-  /// - varying-length columns only
-  void PrepareKeyColumnArrays(int64_t start_row, int64_t num_rows,
-                              const std::vector<KeyColumnArray>& cols_in);
-
-  class TransformBoolean {
-   public:
-    static KeyColumnArray ArrayReplace(const KeyColumnArray& column,
-                                       const KeyColumnArray& temp);
-    static void PostDecode(const KeyColumnArray& input, KeyColumnArray* output,
-                           KeyEncoderContext* ctx);
-  };
-
-  class EncoderInteger {
-   public:
-    static void Decode(uint32_t start_row, uint32_t num_rows, uint32_t offset_within_row,
-                       const KeyRowArray& rows, KeyColumnArray* col,
-                       KeyEncoderContext* ctx, KeyColumnArray* temp);
-    static bool UsesTransform(const KeyColumnArray& column);
-    static KeyColumnArray ArrayReplace(const KeyColumnArray& column,
-                                       const KeyColumnArray& temp);
-    static void PostDecode(const KeyColumnArray& input, KeyColumnArray* output,
-                           KeyEncoderContext* ctx);
-
-   private:
-    static bool IsBoolean(const KeyColumnMetadata& metadata);
-  };
-
-  class EncoderBinary {
-   public:
-    static void EncodeSelected(uint32_t offset_within_row, KeyRowArray* rows,
-                               const KeyColumnArray& col, uint32_t num_selected,
-                               const uint16_t* selection);
-    static void Decode(uint32_t start_row, uint32_t num_rows, uint32_t offset_within_row,
-                       const KeyRowArray& rows, KeyColumnArray* col,
-                       KeyEncoderContext* ctx, KeyColumnArray* temp);
-    static bool IsInteger(const KeyColumnMetadata& metadata);
-
-   private:
-    template <class COPY_FN, class SET_NULL_FN>
-    static void EncodeSelectedImp(uint32_t offset_within_row, KeyRowArray* rows,
-                                  const KeyColumnArray& col, uint32_t num_selected,
-                                  const uint16_t* selection, COPY_FN copy_fn,
-                                  SET_NULL_FN set_null_fn);
-
-    template <bool is_row_fixed_length, class COPY_FN>
-    static inline void DecodeHelper(uint32_t start_row, uint32_t num_rows,
-                                    uint32_t offset_within_row,
-                                    const KeyRowArray* rows_const,
-                                    KeyRowArray* rows_mutable_maybe_null,
-                                    const KeyColumnArray* col_const,
-                                    KeyColumnArray* col_mutable_maybe_null,
-                                    COPY_FN copy_fn);
-    template <bool is_row_fixed_length>
-    static void DecodeImp(uint32_t start_row, uint32_t num_rows,
-                          uint32_t offset_within_row, const KeyRowArray& rows,
-                          KeyColumnArray* col);
-#if defined(ARROW_HAVE_AVX2)
-    static void DecodeHelper_avx2(bool is_row_fixed_length, uint32_t start_row,
-                                  uint32_t num_rows, uint32_t offset_within_row,
-                                  const KeyRowArray& rows, KeyColumnArray* col);
-    template <bool is_row_fixed_length>
-    static void DecodeImp_avx2(uint32_t start_row, uint32_t num_rows,
-                               uint32_t offset_within_row, const KeyRowArray& rows,
-                               KeyColumnArray* col);
-#endif
-  };
-
-  class EncoderBinaryPair {
-   public:
-    static bool CanProcessPair(const KeyColumnMetadata& col1,
-                               const KeyColumnMetadata& col2) {
-      return EncoderBinary::IsInteger(col1) && EncoderBinary::IsInteger(col2);
-    }
-    static void Decode(uint32_t start_row, uint32_t num_rows, uint32_t offset_within_row,
-                       const KeyRowArray& rows, KeyColumnArray* col1,
-                       KeyColumnArray* col2, KeyEncoderContext* ctx,
-                       KeyColumnArray* temp1, KeyColumnArray* temp2);
-
-   private:
-    template <bool is_row_fixed_length, typename col1_type, typename col2_type>
-    static void DecodeImp(uint32_t num_rows_to_skip, uint32_t start_row,
-                          uint32_t num_rows, uint32_t offset_within_row,
-                          const KeyRowArray& rows, KeyColumnArray* col1,
-                          KeyColumnArray* col2);
-#if defined(ARROW_HAVE_AVX2)
-    static uint32_t DecodeHelper_avx2(bool is_row_fixed_length, uint32_t col_width,
-                                      uint32_t start_row, uint32_t num_rows,
-                                      uint32_t offset_within_row, const KeyRowArray& rows,
-                                      KeyColumnArray* col1, KeyColumnArray* col2);
-    template <bool is_row_fixed_length, uint32_t col_width>
-    static uint32_t DecodeImp_avx2(uint32_t start_row, uint32_t num_rows,
-                                   uint32_t offset_within_row, const KeyRowArray& rows,
-                                   KeyColumnArray* col1, KeyColumnArray* col2);
-#endif
-  };
-
-  class EncoderOffsets {
-   public:
-    static void GetRowOffsetsSelected(KeyRowArray* rows,
-                                      const std::vector<KeyColumnArray>& cols,
-                                      uint32_t num_selected, const uint16_t* selection);
-    static void EncodeSelected(KeyRowArray* rows, const std::vector<KeyColumnArray>& cols,
-                               uint32_t num_selected, const uint16_t* selection);
-
-    static void Decode(uint32_t start_row, uint32_t num_rows, const KeyRowArray& rows,
-                       std::vector<KeyColumnArray>* varbinary_cols,
-                       const std::vector<uint32_t>& varbinary_cols_base_offset,
-                       KeyEncoderContext* ctx);
-
-   private:
-    template <bool has_nulls, bool is_first_varbinary>
-    static void EncodeSelectedImp(uint32_t ivarbinary, KeyRowArray* rows,
-                                  const std::vector<KeyColumnArray>& cols,
-                                  uint32_t num_selected, const uint16_t* selection);
-  };
-
-  class EncoderVarBinary {
-   public:
-    static void EncodeSelected(uint32_t ivarbinary, KeyRowArray* rows,
-                               const KeyColumnArray& cols, uint32_t num_selected,
-                               const uint16_t* selection);
-
-    static void Decode(uint32_t start_row, uint32_t num_rows, uint32_t varbinary_col_id,
-                       const KeyRowArray& rows, KeyColumnArray* col,
-                       KeyEncoderContext* ctx);
-
-   private:
-    template <bool first_varbinary_col, class COPY_FN>
-    static inline void DecodeHelper(uint32_t start_row, uint32_t num_rows,
-                                    uint32_t varbinary_col_id,
-                                    const KeyRowArray* rows_const,
-                                    KeyRowArray* rows_mutable_maybe_null,
-                                    const KeyColumnArray* col_const,
-                                    KeyColumnArray* col_mutable_maybe_null,
-                                    COPY_FN copy_fn);
-    template <bool first_varbinary_col>
-    static void DecodeImp(uint32_t start_row, uint32_t num_rows,
-                          uint32_t varbinary_col_id, const KeyRowArray& rows,
-                          KeyColumnArray* col);
-#if defined(ARROW_HAVE_AVX2)
-    static void DecodeHelper_avx2(uint32_t start_row, uint32_t num_rows,
-                                  uint32_t varbinary_col_id, const KeyRowArray& rows,
-                                  KeyColumnArray* col);
-    template <bool first_varbinary_col>
-    static void DecodeImp_avx2(uint32_t start_row, uint32_t num_rows,
-                               uint32_t varbinary_col_id, const KeyRowArray& rows,
-                               KeyColumnArray* col);
-#endif
-  };
-
-  class EncoderNulls {
-   public:
-    static void EncodeSelected(KeyRowArray* rows, const std::vector<KeyColumnArray>& cols,
-                               uint32_t num_selected, const uint16_t* selection);
-
-    static void Decode(uint32_t start_row, uint32_t num_rows, const KeyRowArray& rows,
-                       std::vector<KeyColumnArray>* cols);
-  };
-
-  KeyEncoderContext* ctx_;
-
-  // Data initialized once, based on data types of key columns
-  KeyRowMetadata row_metadata_;
-
-  // Data initialized for each input batch.
-  // All elements are ordered according to the order of encoded fields in a row.
-  std::vector<KeyColumnArray> batch_all_cols_;
-  std::vector<KeyColumnArray> batch_varbinary_cols_;
-  std::vector<uint32_t> batch_varbinary_cols_base_offsets_;
-};
-
-template <bool is_row_fixed_length, class COPY_FN>
-inline void KeyEncoder::EncoderBinary::DecodeHelper(
-    uint32_t start_row, uint32_t num_rows, uint32_t offset_within_row,
-    const KeyRowArray* rows_const, KeyRowArray* rows_mutable_maybe_null,
-    const KeyColumnArray* col_const, KeyColumnArray* col_mutable_maybe_null,
-    COPY_FN copy_fn) {
-  ARROW_DCHECK(col_const && col_const->metadata().is_fixed_length);
-  uint32_t col_width = col_const->metadata().fixed_length;
-
-  if (is_row_fixed_length) {
-    uint32_t row_width = rows_const->metadata().fixed_length;
-    for (uint32_t i = 0; i < num_rows; ++i) {
-      const uint8_t* src;
-      uint8_t* dst;
-      src = rows_const->data(1) + row_width * (start_row + i) + offset_within_row;
-      dst = col_mutable_maybe_null->mutable_data(1) + col_width * i;
-      copy_fn(dst, src, col_width);
-    }
-  } else {
-    const uint32_t* row_offsets = rows_const->offsets();
-    for (uint32_t i = 0; i < num_rows; ++i) {
-      const uint8_t* src;
-      uint8_t* dst;
-      src = rows_const->data(2) + row_offsets[start_row + i] + offset_within_row;
-      dst = col_mutable_maybe_null->mutable_data(1) + col_width * i;
-      copy_fn(dst, src, col_width);
-    }
-  }
-}
-
-template <bool first_varbinary_col, class COPY_FN>
-inline void KeyEncoder::EncoderVarBinary::DecodeHelper(
-    uint32_t start_row, uint32_t num_rows, uint32_t varbinary_col_id,
-    const KeyRowArray* rows_const, KeyRowArray* rows_mutable_maybe_null,
-    const KeyColumnArray* col_const, KeyColumnArray* col_mutable_maybe_null,
-    COPY_FN copy_fn) {
-  // Column and rows need to be varying length
-  ARROW_DCHECK(!rows_const->metadata().is_fixed_length &&
-               !col_const->metadata().is_fixed_length);
-
-  const uint32_t* row_offsets_for_batch = rows_const->offsets() + start_row;
-  const uint32_t* col_offsets = col_const->offsets();
-
-  uint32_t col_offset_next = col_offsets[0];
-  for (uint32_t i = 0; i < num_rows; ++i) {
-    uint32_t col_offset = col_offset_next;
-    col_offset_next = col_offsets[i + 1];
-
-    uint32_t row_offset = row_offsets_for_batch[i];
-    const uint8_t* row = rows_const->data(2) + row_offset;
-
-    uint32_t offset_within_row;
-    uint32_t length;
-    if (first_varbinary_col) {
-      rows_const->metadata().first_varbinary_offset_and_length(row, &offset_within_row,
-                                                               &length);
-    } else {
-      rows_const->metadata().nth_varbinary_offset_and_length(row, varbinary_col_id,
-                                                             &offset_within_row, &length);
-    }
-
-    row_offset += offset_within_row;
-
-    const uint8_t* src;
-    uint8_t* dst;
-    src = rows_const->data(2) + row_offset;
-    dst = col_mutable_maybe_null->mutable_data(2) + col_offset;
-    copy_fn(dst, src, length);
-  }
-}
-
-}  // namespace compute
-}  // namespace arrow
diff --git a/cpp/src/arrow/compute/exec/key_hash.cc b/cpp/src/arrow/compute/exec/key_hash.cc
index e81ed64b6f..5a5d524c40 100644
--- a/cpp/src/arrow/compute/exec/key_hash.cc
+++ b/cpp/src/arrow/compute/exec/key_hash.cc
@@ -22,7 +22,7 @@
 #include <algorithm>
 #include <cstdint>
 
-#include "arrow/compute/exec/key_encode.h"
+#include "arrow/compute/light_array.h"
 #include "arrow/util/bit_util.h"
 #include "arrow/util/ubsan.h"
 
@@ -377,7 +377,7 @@ void Hashing32::HashFixed(int64_t hardware_flags, bool combine_hashes, uint32_t
 }
 
 void Hashing32::HashMultiColumn(const std::vector<KeyColumnArray>& cols,
-                                KeyEncoder::KeyEncoderContext* ctx, uint32_t* hashes) {
+                                LightContext* ctx, uint32_t* hashes) {
   uint32_t num_rows = static_cast<uint32_t>(cols[0].length());
 
   constexpr uint32_t max_batch_size = util::MiniBatch::kMiniBatchLength;
@@ -463,7 +463,7 @@ Status Hashing32::HashBatch(const ExecBatch& key_batch, uint32_t* hashes,
   std::vector<KeyColumnArray> column_arrays;
   RETURN_NOT_OK(ColumnArraysFromExecBatch(key_batch, offset, length, &column_arrays));
 
-  KeyEncoder::KeyEncoderContext ctx;
+  LightContext ctx;
   ctx.hardware_flags = hardware_flags;
   ctx.stack = temp_stack;
   HashMultiColumn(column_arrays, &ctx, hashes);
@@ -814,7 +814,7 @@ void Hashing64::HashFixed(bool combine_hashes, uint32_t num_rows, uint64_t lengt
 }
 
 void Hashing64::HashMultiColumn(const std::vector<KeyColumnArray>& cols,
-                                KeyEncoder::KeyEncoderContext* ctx, uint64_t* hashes) {
+                                LightContext* ctx, uint64_t* hashes) {
   uint32_t num_rows = static_cast<uint32_t>(cols[0].length());
 
   constexpr uint32_t max_batch_size = util::MiniBatch::kMiniBatchLength;
@@ -895,7 +895,7 @@ Status Hashing64::HashBatch(const ExecBatch& key_batch, uint64_t* hashes,
   std::vector<KeyColumnArray> column_arrays;
   RETURN_NOT_OK(ColumnArraysFromExecBatch(key_batch, offset, length, &column_arrays));
 
-  KeyEncoder::KeyEncoderContext ctx;
+  LightContext ctx;
   ctx.hardware_flags = hardware_flags;
   ctx.stack = temp_stack;
   HashMultiColumn(column_arrays, &ctx, hashes);
diff --git a/cpp/src/arrow/compute/exec/key_hash.h b/cpp/src/arrow/compute/exec/key_hash.h
index 05f39bb729..f8af798838 100644
--- a/cpp/src/arrow/compute/exec/key_hash.h
+++ b/cpp/src/arrow/compute/exec/key_hash.h
@@ -23,8 +23,8 @@
 
 #include <cstdint>
 
-#include "arrow/compute/exec/key_encode.h"
 #include "arrow/compute/exec/util.h"
+#include "arrow/compute/light_array.h"
 
 namespace arrow {
 namespace compute {
@@ -45,8 +45,8 @@ class ARROW_EXPORT Hashing32 {
   friend void TestBloomSmall(BloomFilterBuildStrategy, int64_t, int, bool, bool);
 
  public:
-  static void HashMultiColumn(const std::vector<KeyColumnArray>& cols,
-                              KeyEncoder::KeyEncoderContext* ctx, uint32_t* out_hash);
+  static void HashMultiColumn(const std::vector<KeyColumnArray>& cols, LightContext* ctx,
+                              uint32_t* out_hash);
 
   static Status HashBatch(const ExecBatch& key_batch, uint32_t* hashes,
                           int64_t hardware_flags, util::TempVectorStack* temp_stack,
@@ -157,8 +157,8 @@ class ARROW_EXPORT Hashing64 {
   friend void TestBloomSmall(BloomFilterBuildStrategy, int64_t, int, bool, bool);
 
  public:
-  static void HashMultiColumn(const std::vector<KeyColumnArray>& cols,
-                              KeyEncoder::KeyEncoderContext* ctx, uint64_t* hashes);
+  static void HashMultiColumn(const std::vector<KeyColumnArray>& cols, LightContext* ctx,
+                              uint64_t* hashes);
 
   static Status HashBatch(const ExecBatch& key_batch, uint64_t* hashes,
                           int64_t hardware_flags, util::TempVectorStack* temp_stack,
diff --git a/cpp/src/arrow/compute/exec/schema_util.h b/cpp/src/arrow/compute/exec/schema_util.h
index 4e307e2380..91b7e6cfc6 100644
--- a/cpp/src/arrow/compute/exec/schema_util.h
+++ b/cpp/src/arrow/compute/exec/schema_util.h
@@ -22,8 +22,8 @@
 #include <string>
 #include <vector>
 
-#include "arrow/compute/exec/key_encode.h"  // for KeyColumnMetadata
-#include "arrow/type.h"                     // for DataType, FieldRef, Field and Schema
+#include "arrow/compute/light_array.h"  // for KeyColumnMetadata
+#include "arrow/type.h"                 // for DataType, FieldRef, Field and Schema
 #include "arrow/util/mutex.h"
 
 namespace arrow {
diff --git a/cpp/src/arrow/compute/kernels/aggregate_benchmark.cc b/cpp/src/arrow/compute/kernels/aggregate_benchmark.cc
index 230d9649de..c271285434 100644
--- a/cpp/src/arrow/compute/kernels/aggregate_benchmark.cc
+++ b/cpp/src/arrow/compute/kernels/aggregate_benchmark.cc
@@ -21,6 +21,7 @@
 
 #include "arrow/array/array_primitive.h"
 #include "arrow/compute/api.h"
+#include "arrow/compute/exec/aggregate.h"
 #include "arrow/testing/gtest_util.h"
 #include "arrow/testing/random.h"
 #include "arrow/util/benchmark_util.h"
diff --git a/cpp/src/arrow/compute/kernels/hash_aggregate.cc b/cpp/src/arrow/compute/kernels/hash_aggregate.cc
index fc19ad4b7e..de63293595 100644
--- a/cpp/src/arrow/compute/kernels/hash_aggregate.cc
+++ b/cpp/src/arrow/compute/kernels/hash_aggregate.cc
@@ -29,8 +29,6 @@
 #include "arrow/buffer_builder.h"
 #include "arrow/compute/api_aggregate.h"
 #include "arrow/compute/api_vector.h"
-#include "arrow/compute/exec/key_compare.h"
-#include "arrow/compute/exec/key_encode.h"
 #include "arrow/compute/exec/key_hash.h"
 #include "arrow/compute/exec/key_map.h"
 #include "arrow/compute/exec/util.h"
@@ -41,6 +39,7 @@
 #include "arrow/compute/kernels/common.h"
 #include "arrow/compute/kernels/row_encoder.h"
 #include "arrow/compute/kernels/util_internal.h"
+#include "arrow/compute/row/grouper.h"
 #include "arrow/record_batch.h"
 #include "arrow/stl_allocator.h"
 #include "arrow/util/bit_run_reader.h"
@@ -65,485 +64,6 @@ namespace compute {
 namespace internal {
 namespace {
 
-struct GrouperImpl : Grouper {
-  static Result<std::unique_ptr<GrouperImpl>> Make(const std::vector<ValueDescr>& keys,
-                                                   ExecContext* ctx) {
-    auto impl = ::arrow::internal::make_unique<GrouperImpl>();
-
-    impl->encoders_.resize(keys.size());
-    impl->ctx_ = ctx;
-
-    for (size_t i = 0; i < keys.size(); ++i) {
-      const auto& key = keys[i].type;
-
-      if (key->id() == Type::BOOL) {
-        impl->encoders_[i] = ::arrow::internal::make_unique<BooleanKeyEncoder>();
-        continue;
-      }
-
-      if (key->id() == Type::DICTIONARY) {
-        impl->encoders_[i] =
-            ::arrow::internal::make_unique<DictionaryKeyEncoder>(key, ctx->memory_pool());
-        continue;
-      }
-
-      if (is_fixed_width(key->id())) {
-        impl->encoders_[i] = ::arrow::internal::make_unique<FixedWidthKeyEncoder>(key);
-        continue;
-      }
-
-      if (is_binary_like(key->id())) {
-        impl->encoders_[i] =
-            ::arrow::internal::make_unique<VarLengthKeyEncoder<BinaryType>>(key);
-        continue;
-      }
-
-      if (is_large_binary_like(key->id())) {
-        impl->encoders_[i] =
-            ::arrow::internal::make_unique<VarLengthKeyEncoder<LargeBinaryType>>(key);
-        continue;
-      }
-
-      if (key->id() == Type::NA) {
-        impl->encoders_[i] = ::arrow::internal::make_unique<NullKeyEncoder>();
-        continue;
-      }
-
-      return Status::NotImplemented("Keys of type ", *key);
-    }
-
-    return std::move(impl);
-  }
-
-  Result<Datum> Consume(const ExecBatch& batch) override {
-    std::vector<int32_t> offsets_batch(batch.length + 1);
-    for (int i = 0; i < batch.num_values(); ++i) {
-      encoders_[i]->AddLength(batch[i], batch.length, offsets_batch.data());
-    }
-
-    int32_t total_length = 0;
-    for (int64_t i = 0; i < batch.length; ++i) {
-      auto total_length_before = total_length;
-      total_length += offsets_batch[i];
-      offsets_batch[i] = total_length_before;
-    }
-    offsets_batch[batch.length] = total_length;
-
-    std::vector<uint8_t> key_bytes_batch(total_length);
-    std::vector<uint8_t*> key_buf_ptrs(batch.length);
-    for (int64_t i = 0; i < batch.length; ++i) {
-      key_buf_ptrs[i] = key_bytes_batch.data() + offsets_batch[i];
-    }
-
-    for (int i = 0; i < batch.num_values(); ++i) {
-      RETURN_NOT_OK(encoders_[i]->Encode(batch[i], batch.length, key_buf_ptrs.data()));
-    }
-
-    TypedBufferBuilder<uint32_t> group_ids_batch(ctx_->memory_pool());
-    RETURN_NOT_OK(group_ids_batch.Resize(batch.length));
-
-    for (int64_t i = 0; i < batch.length; ++i) {
-      int32_t key_length = offsets_batch[i + 1] - offsets_batch[i];
-      std::string key(
-          reinterpret_cast<const char*>(key_bytes_batch.data() + offsets_batch[i]),
-          key_length);
-
-      auto it_success = map_.emplace(key, num_groups_);
-      auto group_id = it_success.first->second;
-
-      if (it_success.second) {
-        // new key; update offsets and key_bytes
-        ++num_groups_;
-        // Skip if there are no keys
-        if (key_length > 0) {
-          auto next_key_offset = static_cast<int32_t>(key_bytes_.size());
-          key_bytes_.resize(next_key_offset + key_length);
-          offsets_.push_back(next_key_offset + key_length);
-          memcpy(key_bytes_.data() + next_key_offset, key.c_str(), key_length);
-        }
-      }
-
-      group_ids_batch.UnsafeAppend(group_id);
-    }
-
-    ARROW_ASSIGN_OR_RAISE(auto group_ids, group_ids_batch.Finish());
-    return Datum(UInt32Array(batch.length, std::move(group_ids)));
-  }
-
-  uint32_t num_groups() const override { return num_groups_; }
-
-  Result<ExecBatch> GetUniques() override {
-    ExecBatch out({}, num_groups_);
-
-    std::vector<uint8_t*> key_buf_ptrs(num_groups_);
-    for (int64_t i = 0; i < num_groups_; ++i) {
-      key_buf_ptrs[i] = key_bytes_.data() + offsets_[i];
-    }
-
-    out.values.resize(encoders_.size());
-    for (size_t i = 0; i < encoders_.size(); ++i) {
-      ARROW_ASSIGN_OR_RAISE(
-          out.values[i],
-          encoders_[i]->Decode(key_buf_ptrs.data(), static_cast<int32_t>(num_groups_),
-                               ctx_->memory_pool()));
-    }
-
-    return out;
-  }
-
-  ExecContext* ctx_;
-  std::unordered_map<std::string, uint32_t> map_;
-  std::vector<int32_t> offsets_ = {0};
-  std::vector<uint8_t> key_bytes_;
-  uint32_t num_groups_ = 0;
-  std::vector<std::unique_ptr<KeyEncoder>> encoders_;
-};
-
-struct GrouperFastImpl : Grouper {
-  static constexpr int kBitmapPaddingForSIMD = 64;  // bits
-  static constexpr int kPaddingForSIMD = 32;        // bytes
-
-  static bool CanUse(const std::vector<ValueDescr>& keys) {
-#if ARROW_LITTLE_ENDIAN
-    for (size_t i = 0; i < keys.size(); ++i) {
-      const auto& key = keys[i].type;
-      if (is_large_binary_like(key->id())) {
-        return false;
-      }
-    }
-    return true;
-#else
-    return false;
-#endif
-  }
-
-  static Result<std::unique_ptr<GrouperFastImpl>> Make(
-      const std::vector<ValueDescr>& keys, ExecContext* ctx) {
-    auto impl = ::arrow::internal::make_unique<GrouperFastImpl>();
-    impl->ctx_ = ctx;
-
-    RETURN_NOT_OK(impl->temp_stack_.Init(ctx->memory_pool(), 64 * minibatch_size_max_));
-    impl->encode_ctx_.hardware_flags =
-        arrow::internal::CpuInfo::GetInstance()->hardware_flags();
-    impl->encode_ctx_.stack = &impl->temp_stack_;
-
-    auto num_columns = keys.size();
-    impl->col_metadata_.resize(num_columns);
-    impl->key_types_.resize(num_columns);
-    impl->dictionaries_.resize(num_columns);
-    for (size_t icol = 0; icol < num_columns; ++icol) {
-      const auto& key = keys[icol].type;
-      if (key->id() == Type::DICTIONARY) {
-        auto bit_width = checked_cast<const FixedWidthType&>(*key).bit_width();
-        ARROW_DCHECK(bit_width % 8 == 0);
-        impl->col_metadata_[icol] =
-            arrow::compute::KeyColumnMetadata(true, bit_width / 8);
-      } else if (key->id() == Type::BOOL) {
-        impl->col_metadata_[icol] = arrow::compute::KeyColumnMetadata(true, 0);
-      } else if (is_fixed_width(key->id())) {
-        impl->col_metadata_[icol] = arrow::compute::KeyColumnMetadata(
-            true, checked_cast<const FixedWidthType&>(*key).bit_width() / 8);
-      } else if (is_binary_like(key->id())) {
-        impl->col_metadata_[icol] =
-            arrow::compute::KeyColumnMetadata(false, sizeof(uint32_t));
-      } else if (key->id() == Type::NA) {
-        impl->col_metadata_[icol] =
-            arrow::compute::KeyColumnMetadata(true, 0, /*is_null_type_in=*/true);
-      } else {
-        return Status::NotImplemented("Keys of type ", *key);
-      }
-      impl->key_types_[icol] = key;
-    }
-
-    impl->encoder_.Init(impl->col_metadata_, &impl->encode_ctx_,
-                        /* row_alignment = */ sizeof(uint64_t),
-                        /* string_alignment = */ sizeof(uint64_t));
-    RETURN_NOT_OK(impl->rows_.Init(ctx->memory_pool(), impl->encoder_.row_metadata()));
-    RETURN_NOT_OK(
-        impl->rows_minibatch_.Init(ctx->memory_pool(), impl->encoder_.row_metadata()));
-    impl->minibatch_size_ = impl->minibatch_size_min_;
-    GrouperFastImpl* impl_ptr = impl.get();
-    auto equal_func = [impl_ptr](
-                          int num_keys_to_compare, const uint16_t* selection_may_be_null,
-                          const uint32_t* group_ids, uint32_t* out_num_keys_mismatch,
-                          uint16_t* out_selection_mismatch) {
-      arrow::compute::KeyCompare::CompareColumnsToRows(
-          num_keys_to_compare, selection_may_be_null, group_ids, &impl_ptr->encode_ctx_,
-          out_num_keys_mismatch, out_selection_mismatch,
-          impl_ptr->encoder_.GetBatchColumns(), impl_ptr->rows_);
-    };
-    auto append_func = [impl_ptr](int num_keys, const uint16_t* selection) {
-      RETURN_NOT_OK(impl_ptr->encoder_.EncodeSelected(&impl_ptr->rows_minibatch_,
-                                                      num_keys, selection));
-      return impl_ptr->rows_.AppendSelectionFrom(impl_ptr->rows_minibatch_, num_keys,
-                                                 nullptr);
-    };
-    RETURN_NOT_OK(impl->map_.init(impl->encode_ctx_.hardware_flags, ctx->memory_pool(),
-                                  impl->encode_ctx_.stack, impl->log_minibatch_max_,
-                                  equal_func, append_func));
-    impl->cols_.resize(num_columns);
-    impl->minibatch_hashes_.resize(impl->minibatch_size_max_ +
-                                   kPaddingForSIMD / sizeof(uint32_t));
-
-    return std::move(impl);
-  }
-
-  ~GrouperFastImpl() { map_.cleanup(); }
-
-  Result<Datum> Consume(const ExecBatch& batch) override {
-    // ARROW-14027: broadcast scalar arguments for now
-    for (int i = 0; i < batch.num_values(); i++) {
-      if (batch.values[i].is_scalar()) {
-        ExecBatch expanded = batch;
-        for (int j = i; j < expanded.num_values(); j++) {
-          if (expanded.values[j].is_scalar()) {
-            ARROW_ASSIGN_OR_RAISE(
-                expanded.values[j],
-                MakeArrayFromScalar(*expanded.values[j].scalar(), expanded.length,
-                                    ctx_->memory_pool()));
-          }
-        }
-        return ConsumeImpl(expanded);
-      }
-    }
-    return ConsumeImpl(batch);
-  }
-
-  Result<Datum> ConsumeImpl(const ExecBatch& batch) {
-    int64_t num_rows = batch.length;
-    int num_columns = batch.num_values();
-    // Process dictionaries
-    for (int icol = 0; icol < num_columns; ++icol) {
-      if (key_types_[icol]->id() == Type::DICTIONARY) {
-        auto data = batch[icol].array();
-        auto dict = MakeArray(data->dictionary);
-        if (dictionaries_[icol]) {
-          if (!dictionaries_[icol]->Equals(dict)) {
-            // TODO(bkietz) unify if necessary. For now, just error if any batch's
-            // dictionary differs from the first we saw for this key
-            return Status::NotImplemented("Unifying differing dictionaries");
-          }
-        } else {
-          dictionaries_[icol] = std::move(dict);
-        }
-      }
-    }
-
-    std::shared_ptr<arrow::Buffer> group_ids;
-    ARROW_ASSIGN_OR_RAISE(
-        group_ids, AllocateBuffer(sizeof(uint32_t) * num_rows, ctx_->memory_pool()));
-
-    for (int icol = 0; icol < num_columns; ++icol) {
-      const uint8_t* non_nulls = NULLPTR;
-      const uint8_t* fixedlen = NULLPTR;
-      const uint8_t* varlen = NULLPTR;
-
-      // Skip if the key's type is NULL
-      if (key_types_[icol]->id() != Type::NA) {
-        if (batch[icol].array()->buffers[0] != NULLPTR) {
-          non_nulls = batch[icol].array()->buffers[0]->data();
-        }
-        fixedlen = batch[icol].array()->buffers[1]->data();
-        if (!col_metadata_[icol].is_fixed_length) {
-          varlen = batch[icol].array()->buffers[2]->data();
-        }
-      }
-
-      int64_t offset = batch[icol].array()->offset;
-
-      auto col_base = arrow::compute::KeyColumnArray(
-          col_metadata_[icol], offset + num_rows, non_nulls, fixedlen, varlen);
-
-      cols_[icol] = col_base.Slice(offset, num_rows);
-    }
-
-    // Split into smaller mini-batches
-    //
-    for (uint32_t start_row = 0; start_row < num_rows;) {
-      uint32_t batch_size_next = std::min(static_cast<uint32_t>(minibatch_size_),
-                                          static_cast<uint32_t>(num_rows) - start_row);
-
-      // Encode
-      rows_minibatch_.Clean();
-      encoder_.PrepareEncodeSelected(start_row, batch_size_next, cols_);
-
-      // Compute hash
-      Hashing32::HashMultiColumn(encoder_.GetBatchColumns(), &encode_ctx_,
-                                 minibatch_hashes_.data());
-
-      // Map
-      auto match_bitvector =
-          util::TempVectorHolder<uint8_t>(&temp_stack_, (batch_size_next + 7) / 8);
-      {
-        auto local_slots = util::TempVectorHolder<uint8_t>(&temp_stack_, batch_size_next);
-        map_.early_filter(batch_size_next, minibatch_hashes_.data(),
-                          match_bitvector.mutable_data(), local_slots.mutable_data());
-        map_.find(batch_size_next, minibatch_hashes_.data(),
-                  match_bitvector.mutable_data(), local_slots.mutable_data(),
-                  reinterpret_cast<uint32_t*>(group_ids->mutable_data()) + start_row);
-      }
-      auto ids = util::TempVectorHolder<uint16_t>(&temp_stack_, batch_size_next);
-      int num_ids;
-      util::bit_util::bits_to_indexes(0, encode_ctx_.hardware_flags, batch_size_next,
-                                      match_bitvector.mutable_data(), &num_ids,
-                                      ids.mutable_data());
-
-      RETURN_NOT_OK(map_.map_new_keys(
-          num_ids, ids.mutable_data(), minibatch_hashes_.data(),
-          reinterpret_cast<uint32_t*>(group_ids->mutable_data()) + start_row));
-
-      start_row += batch_size_next;
-
-      if (minibatch_size_ * 2 <= minibatch_size_max_) {
-        minibatch_size_ *= 2;
-      }
-    }
-
-    return Datum(UInt32Array(batch.length, std::move(group_ids)));
-  }
-
-  uint32_t num_groups() const override { return static_cast<uint32_t>(rows_.length()); }
-
-  // Make sure padded buffers end up with the right logical size
-
-  Result<std::shared_ptr<Buffer>> AllocatePaddedBitmap(int64_t length) {
-    ARROW_ASSIGN_OR_RAISE(
-        std::shared_ptr<Buffer> buf,
-        AllocateBitmap(length + kBitmapPaddingForSIMD, ctx_->memory_pool()));
-    return SliceMutableBuffer(buf, 0, bit_util::BytesForBits(length));
-  }
-
-  Result<std::shared_ptr<Buffer>> AllocatePaddedBuffer(int64_t size) {
-    ARROW_ASSIGN_OR_RAISE(
-        std::shared_ptr<Buffer> buf,
-        AllocateBuffer(size + kBitmapPaddingForSIMD, ctx_->memory_pool()));
-    return SliceMutableBuffer(buf, 0, size);
-  }
-
-  Result<ExecBatch> GetUniques() override {
-    auto num_columns = static_cast<uint32_t>(col_metadata_.size());
-    int64_t num_groups = rows_.length();
-
-    std::vector<std::shared_ptr<Buffer>> non_null_bufs(num_columns);
-    std::vector<std::shared_ptr<Buffer>> fixedlen_bufs(num_columns);
-    std::vector<std::shared_ptr<Buffer>> varlen_bufs(num_columns);
-
-    for (size_t i = 0; i < num_columns; ++i) {
-      if (col_metadata_[i].is_null_type) {
-        uint8_t* non_nulls = NULLPTR;
-        uint8_t* fixedlen = NULLPTR;
-        cols_[i] = arrow::compute::KeyColumnArray(col_metadata_[i], num_groups, non_nulls,
-                                                  fixedlen, NULLPTR);
-        continue;
-      }
-      ARROW_ASSIGN_OR_RAISE(non_null_bufs[i], AllocatePaddedBitmap(num_groups));
-      if (col_metadata_[i].is_fixed_length && !col_metadata_[i].is_null_type) {
-        if (col_metadata_[i].fixed_length == 0) {
-          ARROW_ASSIGN_OR_RAISE(fixedlen_bufs[i], AllocatePaddedBitmap(num_groups));
-        } else {
-          ARROW_ASSIGN_OR_RAISE(
-              fixedlen_bufs[i],
-              AllocatePaddedBuffer(num_groups * col_metadata_[i].fixed_length));
-        }
-      } else {
-        ARROW_ASSIGN_OR_RAISE(fixedlen_bufs[i],
-                              AllocatePaddedBuffer((num_groups + 1) * sizeof(uint32_t)));
-      }
-      cols_[i] = arrow::compute::KeyColumnArray(
-          col_metadata_[i], num_groups, non_null_bufs[i]->mutable_data(),
-          fixedlen_bufs[i]->mutable_data(), nullptr);
-    }
-
-    for (int64_t start_row = 0; start_row < num_groups;) {
-      int64_t batch_size_next =
-          std::min(num_groups - start_row, static_cast<int64_t>(minibatch_size_max_));
-      encoder_.DecodeFixedLengthBuffers(start_row, start_row, batch_size_next, rows_,
-                                        &cols_);
-      start_row += batch_size_next;
-    }
-
-    if (!rows_.metadata().is_fixed_length) {
-      for (size_t i = 0; i < num_columns; ++i) {
-        if (!col_metadata_[i].is_fixed_length) {
-          auto varlen_size =
-              reinterpret_cast<const uint32_t*>(fixedlen_bufs[i]->data())[num_groups];
-          ARROW_ASSIGN_OR_RAISE(varlen_bufs[i], AllocatePaddedBuffer(varlen_size));
-          cols_[i] = arrow::compute::KeyColumnArray(
-              col_metadata_[i], num_groups, non_null_bufs[i]->mutable_data(),
-              fixedlen_bufs[i]->mutable_data(), varlen_bufs[i]->mutable_data());
-        }
-      }
-
-      for (int64_t start_row = 0; start_row < num_groups;) {
-        int64_t batch_size_next =
-            std::min(num_groups - start_row, static_cast<int64_t>(minibatch_size_max_));
-        encoder_.DecodeVaryingLengthBuffers(start_row, start_row, batch_size_next, rows_,
-                                            &cols_);
-        start_row += batch_size_next;
-      }
-    }
-
-    ExecBatch out({}, num_groups);
-    out.values.resize(num_columns);
-    for (size_t i = 0; i < num_columns; ++i) {
-      if (col_metadata_[i].is_null_type) {
-        out.values[i] = ArrayData::Make(null(), num_groups, {nullptr}, num_groups);
-        continue;
-      }
-      auto valid_count = arrow::internal::CountSetBits(
-          non_null_bufs[i]->data(), /*offset=*/0, static_cast<int64_t>(num_groups));
-      int null_count = static_cast<int>(num_groups) - static_cast<int>(valid_count);
-
-      if (col_metadata_[i].is_fixed_length) {
-        out.values[i] = ArrayData::Make(
-            key_types_[i], num_groups,
-            {std::move(non_null_bufs[i]), std::move(fixedlen_bufs[i])}, null_count);
-      } else {
-        out.values[i] =
-            ArrayData::Make(key_types_[i], num_groups,
-                            {std::move(non_null_bufs[i]), std::move(fixedlen_bufs[i]),
-                             std::move(varlen_bufs[i])},
-                            null_count);
-      }
-    }
-
-    // Process dictionaries
-    for (size_t icol = 0; icol < num_columns; ++icol) {
-      if (key_types_[icol]->id() == Type::DICTIONARY) {
-        if (dictionaries_[icol]) {
-          out.values[icol].array()->dictionary = dictionaries_[icol]->data();
-        } else {
-          ARROW_ASSIGN_OR_RAISE(auto dict, MakeArrayOfNull(key_types_[icol], 0));
-          out.values[icol].array()->dictionary = dict->data();
-        }
-      }
-    }
-
-    return out;
-  }
-
-  static constexpr int log_minibatch_max_ = 10;
-  static constexpr int minibatch_size_max_ = 1 << log_minibatch_max_;
-  static constexpr int minibatch_size_min_ = 128;
-  int minibatch_size_;
-
-  ExecContext* ctx_;
-  arrow::util::TempVectorStack temp_stack_;
-  arrow::compute::KeyEncoder::KeyEncoderContext encode_ctx_;
-
-  std::vector<std::shared_ptr<arrow::DataType>> key_types_;
-  std::vector<arrow::compute::KeyColumnMetadata> col_metadata_;
-  std::vector<arrow::compute::KeyColumnArray> cols_;
-  std::vector<uint32_t> minibatch_hashes_;
-
-  std::vector<std::shared_ptr<Array>> dictionaries_;
-
-  arrow::compute::KeyEncoder::KeyRowArray rows_;
-  arrow::compute::KeyEncoder::KeyRowArray rows_minibatch_;
-  arrow::compute::KeyEncoder encoder_;
-  arrow::compute::SwissTable map_;
-};
-
 /// C++ abstract base class for the HashAggregateKernel interface.
 /// Implementations should be default constructible and perform initialization in
 /// Init().
@@ -3172,272 +2692,6 @@ struct GroupedListFactory {
 };
 }  // namespace
 
-Result<std::vector<const HashAggregateKernel*>> GetKernels(
-    ExecContext* ctx, const std::vector<Aggregate>& aggregates,
-    const std::vector<ValueDescr>& in_descrs) {
-  if (aggregates.size() != in_descrs.size()) {
-    return Status::Invalid(aggregates.size(), " aggregate functions were specified but ",
-                           in_descrs.size(), " arguments were provided.");
-  }
-
-  std::vector<const HashAggregateKernel*> kernels(in_descrs.size());
-
-  for (size_t i = 0; i < aggregates.size(); ++i) {
-    ARROW_ASSIGN_OR_RAISE(auto function,
-                          ctx->func_registry()->GetFunction(aggregates[i].function));
-    ARROW_ASSIGN_OR_RAISE(
-        const Kernel* kernel,
-        function->DispatchExact({in_descrs[i], ValueDescr::Array(uint32())}));
-    kernels[i] = static_cast<const HashAggregateKernel*>(kernel);
-  }
-  return kernels;
-}
-
-Result<std::vector<std::unique_ptr<KernelState>>> InitKernels(
-    const std::vector<const HashAggregateKernel*>& kernels, ExecContext* ctx,
-    const std::vector<Aggregate>& aggregates, const std::vector<ValueDescr>& in_descrs) {
-  std::vector<std::unique_ptr<KernelState>> states(kernels.size());
-
-  for (size_t i = 0; i < aggregates.size(); ++i) {
-    auto options = aggregates[i].options;
-
-    if (options == nullptr) {
-      // use known default options for the named function if possible
-      auto maybe_function = ctx->func_registry()->GetFunction(aggregates[i].function);
-      if (maybe_function.ok()) {
-        options = maybe_function.ValueOrDie()->default_options();
-      }
-    }
-
-    KernelContext kernel_ctx{ctx};
-    ARROW_ASSIGN_OR_RAISE(
-        states[i],
-        kernels[i]->init(&kernel_ctx, KernelInitArgs{kernels[i],
-                                                     {
-                                                         in_descrs[i],
-                                                         ValueDescr::Array(uint32()),
-                                                     },
-                                                     options}));
-  }
-
-  return std::move(states);
-}
-
-Result<FieldVector> ResolveKernels(
-    const std::vector<Aggregate>& aggregates,
-    const std::vector<const HashAggregateKernel*>& kernels,
-    const std::vector<std::unique_ptr<KernelState>>& states, ExecContext* ctx,
-    const std::vector<ValueDescr>& descrs) {
-  FieldVector fields(descrs.size());
-
-  for (size_t i = 0; i < kernels.size(); ++i) {
-    KernelContext kernel_ctx{ctx};
-    kernel_ctx.SetState(states[i].get());
-
-    ARROW_ASSIGN_OR_RAISE(auto descr, kernels[i]->signature->out_type().Resolve(
-                                          &kernel_ctx, {
-                                                           descrs[i],
-                                                           ValueDescr::Array(uint32()),
-                                                       }));
-    fields[i] = field(aggregates[i].function, std::move(descr.type));
-  }
-  return fields;
-}
-
-Result<std::unique_ptr<Grouper>> Grouper::Make(const std::vector<ValueDescr>& descrs,
-                                               ExecContext* ctx) {
-  if (GrouperFastImpl::CanUse(descrs)) {
-    return GrouperFastImpl::Make(descrs, ctx);
-  }
-  return GrouperImpl::Make(descrs, ctx);
-}
-
-Result<Datum> GroupBy(const std::vector<Datum>& arguments, const std::vector<Datum>& keys,
-                      const std::vector<Aggregate>& aggregates, bool use_threads,
-                      ExecContext* ctx) {
-  auto task_group =
-      use_threads
-          ? arrow::internal::TaskGroup::MakeThreaded(arrow::internal::GetCpuThreadPool())
-          : arrow::internal::TaskGroup::MakeSerial();
-
-  std::vector<const HashAggregateKernel*> kernels;
-  std::vector<std::vector<std::unique_ptr<KernelState>>> states;
-  FieldVector out_fields;
-
-  using arrow::compute::detail::ExecBatchIterator;
-  std::unique_ptr<ExecBatchIterator> argument_batch_iterator;
-
-  if (!arguments.empty()) {
-    ARROW_ASSIGN_OR_RAISE(ExecBatch args_batch, ExecBatch::Make(arguments));
-
-    // Construct and initialize HashAggregateKernels
-    auto argument_descrs = args_batch.GetDescriptors();
-
-    ARROW_ASSIGN_OR_RAISE(kernels, GetKernels(ctx, aggregates, argument_descrs));
-
-    states.resize(task_group->parallelism());
-    for (auto& state : states) {
-      ARROW_ASSIGN_OR_RAISE(state,
-                            InitKernels(kernels, ctx, aggregates, argument_descrs));
-    }
-
-    ARROW_ASSIGN_OR_RAISE(
-        out_fields, ResolveKernels(aggregates, kernels, states[0], ctx, argument_descrs));
-
-    ARROW_ASSIGN_OR_RAISE(
-        argument_batch_iterator,
-        ExecBatchIterator::Make(args_batch.values, ctx->exec_chunksize()));
-  }
-
-  // Construct Groupers
-  ARROW_ASSIGN_OR_RAISE(ExecBatch keys_batch, ExecBatch::Make(keys));
-  auto key_descrs = keys_batch.GetDescriptors();
-
-  std::vector<std::unique_ptr<Grouper>> groupers(task_group->parallelism());
-  for (auto& grouper : groupers) {
-    ARROW_ASSIGN_OR_RAISE(grouper, Grouper::Make(key_descrs, ctx));
-  }
-
-  std::mutex mutex;
-  std::unordered_map<std::thread::id, size_t> thread_ids;
-
-  int i = 0;
-  for (ValueDescr& key_descr : key_descrs) {
-    out_fields.push_back(field("key_" + std::to_string(i++), std::move(key_descr.type)));
-  }
-
-  ARROW_ASSIGN_OR_RAISE(
-      auto key_batch_iterator,
-      ExecBatchIterator::Make(keys_batch.values, ctx->exec_chunksize()));
-
-  // start "streaming" execution
-  ExecBatch key_batch, argument_batch;
-  while ((argument_batch_iterator == NULLPTR ||
-          argument_batch_iterator->Next(&argument_batch)) &&
-         key_batch_iterator->Next(&key_batch)) {
-    if (key_batch.length == 0) continue;
-
-    task_group->Append([&, key_batch, argument_batch] {
-      size_t thread_index;
-      {
-        std::unique_lock<std::mutex> lock(mutex);
-        auto it = thread_ids.emplace(std::this_thread::get_id(), thread_ids.size()).first;
-        thread_index = it->second;
-        DCHECK_LT(static_cast<int>(thread_index), task_group->parallelism());
-      }
-
-      auto grouper = groupers[thread_index].get();
-
-      // compute a batch of group ids
-      ARROW_ASSIGN_OR_RAISE(Datum id_batch, grouper->Consume(key_batch));
-
-      // consume group ids with HashAggregateKernels
-      for (size_t i = 0; i < kernels.size(); ++i) {
-        KernelContext batch_ctx{ctx};
-        batch_ctx.SetState(states[thread_index][i].get());
-        ARROW_ASSIGN_OR_RAISE(auto batch, ExecBatch::Make({argument_batch[i], id_batch}));
-        RETURN_NOT_OK(kernels[i]->resize(&batch_ctx, grouper->num_groups()));
-        RETURN_NOT_OK(kernels[i]->consume(&batch_ctx, batch));
-      }
-
-      return Status::OK();
-    });
-  }
-
-  RETURN_NOT_OK(task_group->Finish());
-
-  // Merge if necessary
-  for (size_t thread_index = 1; thread_index < thread_ids.size(); ++thread_index) {
-    ARROW_ASSIGN_OR_RAISE(ExecBatch other_keys, groupers[thread_index]->GetUniques());
-    ARROW_ASSIGN_OR_RAISE(Datum transposition, groupers[0]->Consume(other_keys));
-    groupers[thread_index].reset();
-
-    for (size_t idx = 0; idx < kernels.size(); ++idx) {
-      KernelContext batch_ctx{ctx};
-      batch_ctx.SetState(states[0][idx].get());
-
-      RETURN_NOT_OK(kernels[idx]->resize(&batch_ctx, groupers[0]->num_groups()));
-      RETURN_NOT_OK(kernels[idx]->merge(&batch_ctx, std::move(*states[thread_index][idx]),
-                                        *transposition.array()));
-      states[thread_index][idx].reset();
-    }
-  }
-
-  // Finalize output
-  ArrayDataVector out_data(arguments.size() + keys.size());
-  auto it = out_data.begin();
-
-  for (size_t idx = 0; idx < kernels.size(); ++idx) {
-    KernelContext batch_ctx{ctx};
-    batch_ctx.SetState(states[0][idx].get());
-    Datum out;
-    RETURN_NOT_OK(kernels[idx]->finalize(&batch_ctx, &out));
-    *it++ = out.array();
-  }
-
-  ARROW_ASSIGN_OR_RAISE(ExecBatch out_keys, groupers[0]->GetUniques());
-  for (const auto& key : out_keys.values) {
-    *it++ = key.array();
-  }
-
-  int64_t length = out_data[0]->length;
-  return ArrayData::Make(struct_(std::move(out_fields)), length,
-                         {/*null_bitmap=*/nullptr}, std::move(out_data),
-                         /*null_count=*/0);
-}
-
-Result<std::shared_ptr<ListArray>> Grouper::ApplyGroupings(const ListArray& groupings,
-                                                           const Array& array,
-                                                           ExecContext* ctx) {
-  ARROW_ASSIGN_OR_RAISE(Datum sorted,
-                        compute::Take(array, groupings.data()->child_data[0],
-                                      TakeOptions::NoBoundsCheck(), ctx));
-
-  return std::make_shared<ListArray>(list(array.type()), groupings.length(),
-                                     groupings.value_offsets(), sorted.make_array());
-}
-
-Result<std::shared_ptr<ListArray>> Grouper::MakeGroupings(const UInt32Array& ids,
-                                                          uint32_t num_groups,
-                                                          ExecContext* ctx) {
-  if (ids.null_count() != 0) {
-    return Status::Invalid("MakeGroupings with null ids");
-  }
-
-  ARROW_ASSIGN_OR_RAISE(auto offsets, AllocateBuffer(sizeof(int32_t) * (num_groups + 1),
-                                                     ctx->memory_pool()));
-  auto raw_offsets = reinterpret_cast<int32_t*>(offsets->mutable_data());
-
-  std::memset(raw_offsets, 0, offsets->size());
-  for (int i = 0; i < ids.length(); ++i) {
-    DCHECK_LT(ids.Value(i), num_groups);
-    raw_offsets[ids.Value(i)] += 1;
-  }
-  int32_t length = 0;
-  for (uint32_t id = 0; id < num_groups; ++id) {
-    auto offset = raw_offsets[id];
-    raw_offsets[id] = length;
-    length += offset;
-  }
-  raw_offsets[num_groups] = length;
-  DCHECK_EQ(ids.length(), length);
-
-  ARROW_ASSIGN_OR_RAISE(auto offsets_copy,
-                        offsets->CopySlice(0, offsets->size(), ctx->memory_pool()));
-  raw_offsets = reinterpret_cast<int32_t*>(offsets_copy->mutable_data());
-
-  ARROW_ASSIGN_OR_RAISE(auto sort_indices, AllocateBuffer(sizeof(int32_t) * ids.length(),
-                                                          ctx->memory_pool()));
-  auto raw_sort_indices = reinterpret_cast<int32_t*>(sort_indices->mutable_data());
-  for (int i = 0; i < ids.length(); ++i) {
-    raw_sort_indices[raw_offsets[ids.Value(i)]++] = i;
-  }
-
-  return std::make_shared<ListArray>(
-      list(int32()), num_groups, std::move(offsets),
-      std::make_shared<Int32Array>(ids.length(), std::move(sort_indices)));
-}
-
 namespace {
 const FunctionDoc hash_count_doc{
     "Count the number of null / non-null values in each group",
diff --git a/cpp/src/arrow/compute/kernels/hash_aggregate_test.cc b/cpp/src/arrow/compute/kernels/hash_aggregate_test.cc
index 932bda4891..2ed10b9b2b 100644
--- a/cpp/src/arrow/compute/kernels/hash_aggregate_test.cc
+++ b/cpp/src/arrow/compute/kernels/hash_aggregate_test.cc
@@ -33,6 +33,7 @@
 #include "arrow/compute/api_vector.h"
 #include "arrow/compute/cast.h"
 #include "arrow/compute/exec.h"
+#include "arrow/compute/exec/aggregate.h"
 #include "arrow/compute/exec/exec_plan.h"
 #include "arrow/compute/exec/options.h"
 #include "arrow/compute/exec/test_util.h"
@@ -41,6 +42,7 @@
 #include "arrow/compute/kernels/codegen_internal.h"
 #include "arrow/compute/kernels/test_util.h"
 #include "arrow/compute/registry.h"
+#include "arrow/compute/row/grouper.h"
 #include "arrow/table.h"
 #include "arrow/testing/generator.h"
 #include "arrow/testing/gtest_util.h"
@@ -73,14 +75,13 @@ Result<Datum> NaiveGroupBy(std::vector<Datum> arguments, std::vector<Datum> keys
                            const std::vector<internal::Aggregate>& aggregates) {
   ARROW_ASSIGN_OR_RAISE(auto key_batch, ExecBatch::Make(std::move(keys)));
 
-  ARROW_ASSIGN_OR_RAISE(auto grouper,
-                        internal::Grouper::Make(key_batch.GetDescriptors()));
+  ARROW_ASSIGN_OR_RAISE(auto grouper, Grouper::Make(key_batch.GetDescriptors()));
 
   ARROW_ASSIGN_OR_RAISE(Datum id_batch, grouper->Consume(key_batch));
 
   ARROW_ASSIGN_OR_RAISE(
-      auto groupings, internal::Grouper::MakeGroupings(*id_batch.array_as<UInt32Array>(),
-                                                       grouper->num_groups()));
+      auto groupings,
+      Grouper::MakeGroupings(*id_batch.array_as<UInt32Array>(), grouper->num_groups()));
 
   ArrayVector out_columns;
   std::vector<std::string> out_names;
@@ -93,7 +94,7 @@ Result<Datum> NaiveGroupBy(std::vector<Datum> arguments, std::vector<Datum> keys
 
     ARROW_ASSIGN_OR_RAISE(
         auto grouped_argument,
-        internal::Grouper::ApplyGroupings(*groupings, *arguments[i].make_array()));
+        Grouper::ApplyGroupings(*groupings, *arguments[i].make_array()));
 
     ScalarVector aggregated_scalars;
 
@@ -261,21 +262,21 @@ Result<Datum> GroupByTest(
 }  // namespace
 
 TEST(Grouper, SupportedKeys) {
-  ASSERT_OK(internal::Grouper::Make({boolean()}));
+  ASSERT_OK(Grouper::Make({boolean()}));
 
-  ASSERT_OK(internal::Grouper::Make({int8(), uint16(), int32(), uint64()}));
+  ASSERT_OK(Grouper::Make({int8(), uint16(), int32(), uint64()}));
 
-  ASSERT_OK(internal::Grouper::Make({dictionary(int64(), utf8())}));
+  ASSERT_OK(Grouper::Make({dictionary(int64(), utf8())}));
 
-  ASSERT_OK(internal::Grouper::Make({float16(), float32(), float64()}));
+  ASSERT_OK(Grouper::Make({float16(), float32(), float64()}));
 
-  ASSERT_OK(internal::Grouper::Make({utf8(), binary(), large_utf8(), large_binary()}));
+  ASSERT_OK(Grouper::Make({utf8(), binary(), large_utf8(), large_binary()}));
 
-  ASSERT_OK(internal::Grouper::Make({fixed_size_binary(16), fixed_size_binary(32)}));
+  ASSERT_OK(Grouper::Make({fixed_size_binary(16), fixed_size_binary(32)}));
 
-  ASSERT_OK(internal::Grouper::Make({decimal128(32, 10), decimal256(76, 20)}));
+  ASSERT_OK(Grouper::Make({decimal128(32, 10), decimal256(76, 20)}));
 
-  ASSERT_OK(internal::Grouper::Make({date32(), date64()}));
+  ASSERT_OK(Grouper::Make({date32(), date64()}));
 
   for (auto unit : {
            TimeUnit::SECOND,
@@ -283,29 +284,28 @@ TEST(Grouper, SupportedKeys) {
            TimeUnit::MICRO,
            TimeUnit::NANO,
        }) {
-    ASSERT_OK(internal::Grouper::Make({timestamp(unit), duration(unit)}));
+    ASSERT_OK(Grouper::Make({timestamp(unit), duration(unit)}));
   }
 
-  ASSERT_OK(internal::Grouper::Make(
-      {day_time_interval(), month_interval(), month_day_nano_interval()}));
+  ASSERT_OK(
+      Grouper::Make({day_time_interval(), month_interval(), month_day_nano_interval()}));
 
-  ASSERT_OK(internal::Grouper::Make({null()}));
+  ASSERT_OK(Grouper::Make({null()}));
 
-  ASSERT_RAISES(NotImplemented, internal::Grouper::Make({struct_({field("", int64())})}));
+  ASSERT_RAISES(NotImplemented, Grouper::Make({struct_({field("", int64())})}));
 
-  ASSERT_RAISES(NotImplemented, internal::Grouper::Make({struct_({})}));
+  ASSERT_RAISES(NotImplemented, Grouper::Make({struct_({})}));
 
-  ASSERT_RAISES(NotImplemented, internal::Grouper::Make({list(int32())}));
+  ASSERT_RAISES(NotImplemented, Grouper::Make({list(int32())}));
 
-  ASSERT_RAISES(NotImplemented, internal::Grouper::Make({fixed_size_list(int32(), 5)}));
+  ASSERT_RAISES(NotImplemented, Grouper::Make({fixed_size_list(int32(), 5)}));
 
-  ASSERT_RAISES(NotImplemented,
-                internal::Grouper::Make({dense_union({field("", int32())})}));
+  ASSERT_RAISES(NotImplemented, Grouper::Make({dense_union({field("", int32())})}));
 }
 
 struct TestGrouper {
   explicit TestGrouper(std::vector<ValueDescr> descrs) : descrs_(std::move(descrs)) {
-    grouper_ = internal::Grouper::Make(descrs_).ValueOrDie();
+    grouper_ = Grouper::Make(descrs_).ValueOrDie();
 
     FieldVector fields;
     for (const auto& descr : descrs_) {
@@ -423,7 +423,7 @@ struct TestGrouper {
 
   std::vector<ValueDescr> descrs_;
   std::shared_ptr<Schema> key_schema_;
-  std::unique_ptr<internal::Grouper> grouper_;
+  std::unique_ptr<Grouper> grouper_;
   ExecBatch uniques_ = ExecBatch({}, -1);
 };
 
@@ -666,12 +666,11 @@ TEST(Grouper, MakeGroupings) {
     auto expected = ArrayFromJSON(list(int32()), expected_json);
 
     auto num_groups = static_cast<uint32_t>(expected->length());
-    ASSERT_OK_AND_ASSIGN(auto actual, internal::Grouper::MakeGroupings(*ids, num_groups));
+    ASSERT_OK_AND_ASSIGN(auto actual, Grouper::MakeGroupings(*ids, num_groups));
     AssertArraysEqual(*expected, *actual, /*verbose=*/true);
 
     // validate ApplyGroupings
-    ASSERT_OK_AND_ASSIGN(auto grouped_ids,
-                         internal::Grouper::ApplyGroupings(*actual, *ids));
+    ASSERT_OK_AND_ASSIGN(auto grouped_ids, Grouper::ApplyGroupings(*actual, *ids));
 
     for (uint32_t group = 0; group < num_groups; ++group) {
       auto ids_slice = checked_pointer_cast<UInt32Array>(grouped_ids->value_slice(group));
@@ -693,7 +692,7 @@ TEST(Grouper, MakeGroupings) {
 
   auto ids = checked_pointer_cast<UInt32Array>(ArrayFromJSON(uint32(), "[0, null, 1]"));
   EXPECT_RAISES_WITH_MESSAGE_THAT(Invalid, HasSubstr("MakeGroupings with null ids"),
-                                  internal::Grouper::MakeGroupings(*ids, 5));
+                                  Grouper::MakeGroupings(*ids, 5));
 }
 
 TEST(Grouper, ScalarValues) {
diff --git a/cpp/src/arrow/compute/light_array.h b/cpp/src/arrow/compute/light_array.h
index dd13aa0647..f0e5c70687 100644
--- a/cpp/src/arrow/compute/light_array.h
+++ b/cpp/src/arrow/compute/light_array.h
@@ -21,7 +21,9 @@
 
 #include "arrow/array.h"
 #include "arrow/compute/exec.h"
+#include "arrow/compute/exec/util.h"
 #include "arrow/type.h"
+#include "arrow/util/cpu_info.h"
 #include "arrow/util/logging.h"
 
 /// This file contains lightweight containers for Arrow buffers.  These containers
@@ -31,6 +33,18 @@
 namespace arrow {
 namespace compute {
 
+/// \brief Context needed by various execution engine operations
+///
+/// In the execution engine this context is provided by either the node or the
+/// plan and the context exists for the lifetime of the plan.  Defining this here
+/// allows us to take advantage of these resources without coupling the logic with
+/// the execution engine.
+struct LightContext {
+  bool has_avx2() const { return (hardware_flags & arrow::internal::CpuInfo::AVX2) > 0; }
+  int64_t hardware_flags;
+  util::TempVectorStack* stack;
+};
+
 /// \brief Description of the layout of a "key" column
 ///
 /// A "key" column is a non-nested, non-union column.
diff --git a/cpp/src/arrow/compute/row/CMakeLists.txt b/cpp/src/arrow/compute/row/CMakeLists.txt
new file mode 100644
index 0000000000..6ae982dbaf
--- /dev/null
+++ b/cpp/src/arrow/compute/row/CMakeLists.txt
@@ -0,0 +1,21 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Contains utilities for working with Arrow data been stored
+# in a row-major order.
+
+arrow_install_all_headers("arrow/compute/row")
diff --git a/cpp/src/arrow/compute/exec/key_compare.cc b/cpp/src/arrow/compute/row/compare_internal.cc
similarity index 90%
rename from cpp/src/arrow/compute/exec/key_compare.cc
rename to cpp/src/arrow/compute/row/compare_internal.cc
index d873aec692..e863c9cd05 100644
--- a/cpp/src/arrow/compute/exec/key_compare.cc
+++ b/cpp/src/arrow/compute/row/compare_internal.cc
@@ -15,7 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
-#include "arrow/compute/exec/key_compare.h"
+#include "arrow/compute/row/compare_internal.h"
 
 #include <memory.h>
 
@@ -33,9 +33,8 @@ template <bool use_selection>
 void KeyCompare::NullUpdateColumnToRow(uint32_t id_col, uint32_t num_rows_to_compare,
                                        const uint16_t* sel_left_maybe_null,
                                        const uint32_t* left_to_right_map,
-                                       KeyEncoder::KeyEncoderContext* ctx,
-                                       const KeyColumnArray& col,
-                                       const KeyEncoder::KeyRowArray& rows,
+                                       LightContext* ctx, const KeyColumnArray& col,
+                                       const RowTableImpl& rows,
                                        uint8_t* match_bytevector) {
   if (!rows.has_any_nulls(ctx) && !col.data(0)) {
     return;
@@ -90,9 +89,8 @@ template <bool use_selection, class COMPARE_FN>
 void KeyCompare::CompareBinaryColumnToRowHelper(
     uint32_t offset_within_row, uint32_t first_row_to_compare,
     uint32_t num_rows_to_compare, const uint16_t* sel_left_maybe_null,
-    const uint32_t* left_to_right_map, KeyEncoder::KeyEncoderContext* ctx,
-    const KeyColumnArray& col, const KeyEncoder::KeyRowArray& rows,
-    uint8_t* match_bytevector, COMPARE_FN compare_fn) {
+    const uint32_t* left_to_right_map, LightContext* ctx, const KeyColumnArray& col,
+    const RowTableImpl& rows, uint8_t* match_bytevector, COMPARE_FN compare_fn) {
   bool is_fixed_length = rows.metadata().is_fixed_length;
   if (is_fixed_length) {
     uint32_t fixed_length = rows.metadata().fixed_length;
@@ -118,11 +116,13 @@ void KeyCompare::CompareBinaryColumnToRowHelper(
 }
 
 template <bool use_selection>
-void KeyCompare::CompareBinaryColumnToRow(
-    uint32_t offset_within_row, uint32_t num_rows_to_compare,
-    const uint16_t* sel_left_maybe_null, const uint32_t* left_to_right_map,
-    KeyEncoder::KeyEncoderContext* ctx, const KeyColumnArray& col,
-    const KeyEncoder::KeyRowArray& rows, uint8_t* match_bytevector) {
+void KeyCompare::CompareBinaryColumnToRow(uint32_t offset_within_row,
+                                          uint32_t num_rows_to_compare,
+                                          const uint16_t* sel_left_maybe_null,
+                                          const uint32_t* left_to_right_map,
+                                          LightContext* ctx, const KeyColumnArray& col,
+                                          const RowTableImpl& rows,
+                                          uint8_t* match_bytevector) {
   uint32_t num_processed = 0;
 #if defined(ARROW_HAVE_AVX2)
   if (ctx->has_avx2()) {
@@ -228,11 +228,13 @@ void KeyCompare::CompareBinaryColumnToRow(
 
 // Overwrites the match_bytevector instead of updating it
 template <bool use_selection, bool is_first_varbinary_col>
-void KeyCompare::CompareVarBinaryColumnToRow(
-    uint32_t id_varbinary_col, uint32_t num_rows_to_compare,
-    const uint16_t* sel_left_maybe_null, const uint32_t* left_to_right_map,
-    KeyEncoder::KeyEncoderContext* ctx, const KeyColumnArray& col,
-    const KeyEncoder::KeyRowArray& rows, uint8_t* match_bytevector) {
+void KeyCompare::CompareVarBinaryColumnToRow(uint32_t id_varbinary_col,
+                                             uint32_t num_rows_to_compare,
+                                             const uint16_t* sel_left_maybe_null,
+                                             const uint32_t* left_to_right_map,
+                                             LightContext* ctx, const KeyColumnArray& col,
+                                             const RowTableImpl& rows,
+                                             uint8_t* match_bytevector) {
 #if defined(ARROW_HAVE_AVX2)
   if (ctx->has_avx2()) {
     CompareVarBinaryColumnToRow_avx2(
@@ -290,7 +292,7 @@ void KeyCompare::CompareVarBinaryColumnToRow(
   }
 }
 
-void KeyCompare::AndByteVectors(KeyEncoder::KeyEncoderContext* ctx, uint32_t num_elements,
+void KeyCompare::AndByteVectors(LightContext* ctx, uint32_t num_elements,
                                 uint8_t* bytevector_A, const uint8_t* bytevector_B) {
   uint32_t num_processed = 0;
 #if defined(ARROW_HAVE_AVX2)
@@ -306,11 +308,13 @@ void KeyCompare::AndByteVectors(KeyEncoder::KeyEncoderContext* ctx, uint32_t num
   }
 }
 
-void KeyCompare::CompareColumnsToRows(
-    uint32_t num_rows_to_compare, const uint16_t* sel_left_maybe_null,
-    const uint32_t* left_to_right_map, KeyEncoder::KeyEncoderContext* ctx,
-    uint32_t* out_num_rows, uint16_t* out_sel_left_maybe_same,
-    const std::vector<KeyColumnArray>& cols, const KeyEncoder::KeyRowArray& rows) {
+void KeyCompare::CompareColumnsToRows(uint32_t num_rows_to_compare,
+                                      const uint16_t* sel_left_maybe_null,
+                                      const uint32_t* left_to_right_map,
+                                      LightContext* ctx, uint32_t* out_num_rows,
+                                      uint16_t* out_sel_left_maybe_same,
+                                      const std::vector<KeyColumnArray>& cols,
+                                      const RowTableImpl& rows) {
   if (num_rows_to_compare == 0) {
     *out_num_rows = 0;
     return;
diff --git a/cpp/src/arrow/compute/exec/key_compare.h b/cpp/src/arrow/compute/row/compare_internal.h
similarity index 52%
rename from cpp/src/arrow/compute/exec/key_compare.h
rename to cpp/src/arrow/compute/row/compare_internal.h
index 773b32d46c..e3b9057115 100644
--- a/cpp/src/arrow/compute/exec/key_compare.h
+++ b/cpp/src/arrow/compute/row/compare_internal.h
@@ -19,8 +19,10 @@
 
 #include <cstdint>
 
-#include "arrow/compute/exec/key_encode.h"
 #include "arrow/compute/exec/util.h"
+#include "arrow/compute/light_array.h"
+#include "arrow/compute/row/encode_internal.h"
+#include "arrow/compute/row/row_internal.h"
 #include "arrow/memory_pool.h"
 #include "arrow/result.h"
 #include "arrow/status.h"
@@ -33,45 +35,48 @@ class KeyCompare {
   // Returns a single 16-bit selection vector of rows that failed comparison.
   // If there is input selection on the left, the resulting selection is a filtered image
   // of input selection.
-  static void CompareColumnsToRows(
-      uint32_t num_rows_to_compare, const uint16_t* sel_left_maybe_null,
-      const uint32_t* left_to_right_map, KeyEncoder::KeyEncoderContext* ctx,
-      uint32_t* out_num_rows, uint16_t* out_sel_left_maybe_same,
-      const std::vector<KeyColumnArray>& cols, const KeyEncoder::KeyRowArray& rows);
+  static void CompareColumnsToRows(uint32_t num_rows_to_compare,
+                                   const uint16_t* sel_left_maybe_null,
+                                   const uint32_t* left_to_right_map, LightContext* ctx,
+                                   uint32_t* out_num_rows,
+                                   uint16_t* out_sel_left_maybe_same,
+                                   const std::vector<KeyColumnArray>& cols,
+                                   const RowTableImpl& rows);
 
  private:
   template <bool use_selection>
   static void NullUpdateColumnToRow(uint32_t id_col, uint32_t num_rows_to_compare,
                                     const uint16_t* sel_left_maybe_null,
-                                    const uint32_t* left_to_right_map,
-                                    KeyEncoder::KeyEncoderContext* ctx,
-                                    const KeyColumnArray& col,
-                                    const KeyEncoder::KeyRowArray& rows,
+                                    const uint32_t* left_to_right_map, LightContext* ctx,
+                                    const KeyColumnArray& col, const RowTableImpl& rows,
                                     uint8_t* match_bytevector);
 
   template <bool use_selection, class COMPARE_FN>
   static void CompareBinaryColumnToRowHelper(
       uint32_t offset_within_row, uint32_t first_row_to_compare,
       uint32_t num_rows_to_compare, const uint16_t* sel_left_maybe_null,
-      const uint32_t* left_to_right_map, KeyEncoder::KeyEncoderContext* ctx,
-      const KeyColumnArray& col, const KeyEncoder::KeyRowArray& rows,
-      uint8_t* match_bytevector, COMPARE_FN compare_fn);
+      const uint32_t* left_to_right_map, LightContext* ctx, const KeyColumnArray& col,
+      const RowTableImpl& rows, uint8_t* match_bytevector, COMPARE_FN compare_fn);
 
   template <bool use_selection>
-  static void CompareBinaryColumnToRow(
-      uint32_t offset_within_row, uint32_t num_rows_to_compare,
-      const uint16_t* sel_left_maybe_null, const uint32_t* left_to_right_map,
-      KeyEncoder::KeyEncoderContext* ctx, const KeyColumnArray& col,
-      const KeyEncoder::KeyRowArray& rows, uint8_t* match_bytevector);
+  static void CompareBinaryColumnToRow(uint32_t offset_within_row,
+                                       uint32_t num_rows_to_compare,
+                                       const uint16_t* sel_left_maybe_null,
+                                       const uint32_t* left_to_right_map,
+                                       LightContext* ctx, const KeyColumnArray& col,
+                                       const RowTableImpl& rows,
+                                       uint8_t* match_bytevector);
 
   template <bool use_selection, bool is_first_varbinary_col>
-  static void CompareVarBinaryColumnToRow(
-      uint32_t id_varlen_col, uint32_t num_rows_to_compare,
-      const uint16_t* sel_left_maybe_null, const uint32_t* left_to_right_map,
-      KeyEncoder::KeyEncoderContext* ctx, const KeyColumnArray& col,
-      const KeyEncoder::KeyRowArray& rows, uint8_t* match_bytevector);
-
-  static void AndByteVectors(KeyEncoder::KeyEncoderContext* ctx, uint32_t num_elements,
+  static void CompareVarBinaryColumnToRow(uint32_t id_varlen_col,
+                                          uint32_t num_rows_to_compare,
+                                          const uint16_t* sel_left_maybe_null,
+                                          const uint32_t* left_to_right_map,
+                                          LightContext* ctx, const KeyColumnArray& col,
+                                          const RowTableImpl& rows,
+                                          uint8_t* match_bytevector);
+
+  static void AndByteVectors(LightContext* ctx, uint32_t num_elements,
                              uint8_t* bytevector_A, const uint8_t* bytevector_B);
 
 #if defined(ARROW_HAVE_AVX2)
@@ -79,53 +84,52 @@ class KeyCompare {
   template <bool use_selection>
   static uint32_t NullUpdateColumnToRowImp_avx2(
       uint32_t id_col, uint32_t num_rows_to_compare, const uint16_t* sel_left_maybe_null,
-      const uint32_t* left_to_right_map, KeyEncoder::KeyEncoderContext* ctx,
-      const KeyColumnArray& col, const KeyEncoder::KeyRowArray& rows,
-      uint8_t* match_bytevector);
+      const uint32_t* left_to_right_map, LightContext* ctx, const KeyColumnArray& col,
+      const RowTableImpl& rows, uint8_t* match_bytevector);
 
   template <bool use_selection, class COMPARE8_FN>
   static uint32_t CompareBinaryColumnToRowHelper_avx2(
       uint32_t offset_within_row, uint32_t num_rows_to_compare,
       const uint16_t* sel_left_maybe_null, const uint32_t* left_to_right_map,
-      KeyEncoder::KeyEncoderContext* ctx, const KeyColumnArray& col,
-      const KeyEncoder::KeyRowArray& rows, uint8_t* match_bytevector,
-      COMPARE8_FN compare8_fn);
+      LightContext* ctx, const KeyColumnArray& col, const RowTableImpl& rows,
+      uint8_t* match_bytevector, COMPARE8_FN compare8_fn);
 
   template <bool use_selection>
   static uint32_t CompareBinaryColumnToRowImp_avx2(
       uint32_t offset_within_row, uint32_t num_rows_to_compare,
       const uint16_t* sel_left_maybe_null, const uint32_t* left_to_right_map,
-      KeyEncoder::KeyEncoderContext* ctx, const KeyColumnArray& col,
-      const KeyEncoder::KeyRowArray& rows, uint8_t* match_bytevector);
+      LightContext* ctx, const KeyColumnArray& col, const RowTableImpl& rows,
+      uint8_t* match_bytevector);
 
   template <bool use_selection, bool is_first_varbinary_col>
   static void CompareVarBinaryColumnToRowImp_avx2(
       uint32_t id_varlen_col, uint32_t num_rows_to_compare,
       const uint16_t* sel_left_maybe_null, const uint32_t* left_to_right_map,
-      KeyEncoder::KeyEncoderContext* ctx, const KeyColumnArray& col,
-      const KeyEncoder::KeyRowArray& rows, uint8_t* match_bytevector);
+      LightContext* ctx, const KeyColumnArray& col, const RowTableImpl& rows,
+      uint8_t* match_bytevector);
 
   static uint32_t AndByteVectors_avx2(uint32_t num_elements, uint8_t* bytevector_A,
                                       const uint8_t* bytevector_B);
 
-  static uint32_t NullUpdateColumnToRow_avx2(
-      bool use_selection, uint32_t id_col, uint32_t num_rows_to_compare,
-      const uint16_t* sel_left_maybe_null, const uint32_t* left_to_right_map,
-      KeyEncoder::KeyEncoderContext* ctx, const KeyColumnArray& col,
-      const KeyEncoder::KeyRowArray& rows, uint8_t* match_bytevector);
+  static uint32_t NullUpdateColumnToRow_avx2(bool use_selection, uint32_t id_col,
+                                             uint32_t num_rows_to_compare,
+                                             const uint16_t* sel_left_maybe_null,
+                                             const uint32_t* left_to_right_map,
+                                             LightContext* ctx, const KeyColumnArray& col,
+                                             const RowTableImpl& rows,
+                                             uint8_t* match_bytevector);
 
   static uint32_t CompareBinaryColumnToRow_avx2(
       bool use_selection, uint32_t offset_within_row, uint32_t num_rows_to_compare,
       const uint16_t* sel_left_maybe_null, const uint32_t* left_to_right_map,
-      KeyEncoder::KeyEncoderContext* ctx, const KeyColumnArray& col,
-      const KeyEncoder::KeyRowArray& rows, uint8_t* match_bytevector);
+      LightContext* ctx, const KeyColumnArray& col, const RowTableImpl& rows,
+      uint8_t* match_bytevector);
 
   static void CompareVarBinaryColumnToRow_avx2(
       bool use_selection, bool is_first_varbinary_col, uint32_t id_varlen_col,
       uint32_t num_rows_to_compare, const uint16_t* sel_left_maybe_null,
-      const uint32_t* left_to_right_map, KeyEncoder::KeyEncoderContext* ctx,
-      const KeyColumnArray& col, const KeyEncoder::KeyRowArray& rows,
-      uint8_t* match_bytevector);
+      const uint32_t* left_to_right_map, LightContext* ctx, const KeyColumnArray& col,
+      const RowTableImpl& rows, uint8_t* match_bytevector);
 
 #endif
 };
diff --git a/cpp/src/arrow/compute/exec/key_compare_avx2.cc b/cpp/src/arrow/compute/row/compare_internal_avx2.cc
similarity index 96%
rename from cpp/src/arrow/compute/exec/key_compare_avx2.cc
rename to cpp/src/arrow/compute/row/compare_internal_avx2.cc
index e45486b2eb..818f4c4fe7 100644
--- a/cpp/src/arrow/compute/exec/key_compare_avx2.cc
+++ b/cpp/src/arrow/compute/row/compare_internal_avx2.cc
@@ -17,7 +17,7 @@
 
 #include <immintrin.h>
 
-#include "arrow/compute/exec/key_compare.h"
+#include "arrow/compute/row/compare_internal.h"
 #include "arrow/util/bit_util.h"
 
 namespace arrow {
@@ -39,9 +39,8 @@ inline __m256i set_first_n_bytes_avx2(int n) {
 template <bool use_selection>
 uint32_t KeyCompare::NullUpdateColumnToRowImp_avx2(
     uint32_t id_col, uint32_t num_rows_to_compare, const uint16_t* sel_left_maybe_null,
-    const uint32_t* left_to_right_map, KeyEncoder::KeyEncoderContext* ctx,
-    const KeyColumnArray& col, const KeyEncoder::KeyRowArray& rows,
-    uint8_t* match_bytevector) {
+    const uint32_t* left_to_right_map, LightContext* ctx, const KeyColumnArray& col,
+    const RowTableImpl& rows, uint8_t* match_bytevector) {
   if (!rows.has_any_nulls(ctx) && !col.data(0)) {
     return num_rows_to_compare;
   }
@@ -180,9 +179,8 @@ template <bool use_selection, class COMPARE8_FN>
 uint32_t KeyCompare::CompareBinaryColumnToRowHelper_avx2(
     uint32_t offset_within_row, uint32_t num_rows_to_compare,
     const uint16_t* sel_left_maybe_null, const uint32_t* left_to_right_map,
-    KeyEncoder::KeyEncoderContext* ctx, const KeyColumnArray& col,
-    const KeyEncoder::KeyRowArray& rows, uint8_t* match_bytevector,
-    COMPARE8_FN compare8_fn) {
+    LightContext* ctx, const KeyColumnArray& col, const RowTableImpl& rows,
+    uint8_t* match_bytevector, COMPARE8_FN compare8_fn) {
   bool is_fixed_length = rows.metadata().is_fixed_length;
   if (is_fixed_length) {
     uint32_t fixed_length = rows.metadata().fixed_length;
@@ -419,8 +417,8 @@ template <bool use_selection>
 uint32_t KeyCompare::CompareBinaryColumnToRowImp_avx2(
     uint32_t offset_within_row, uint32_t num_rows_to_compare,
     const uint16_t* sel_left_maybe_null, const uint32_t* left_to_right_map,
-    KeyEncoder::KeyEncoderContext* ctx, const KeyColumnArray& col,
-    const KeyEncoder::KeyRowArray& rows, uint8_t* match_bytevector) {
+    LightContext* ctx, const KeyColumnArray& col, const RowTableImpl& rows,
+    uint8_t* match_bytevector) {
   uint32_t col_width = col.metadata().fixed_length;
   if (col_width == 0) {
     int bit_offset = col.bit_offset(1);
@@ -503,8 +501,8 @@ template <bool use_selection, bool is_first_varbinary_col>
 void KeyCompare::CompareVarBinaryColumnToRowImp_avx2(
     uint32_t id_varbinary_col, uint32_t num_rows_to_compare,
     const uint16_t* sel_left_maybe_null, const uint32_t* left_to_right_map,
-    KeyEncoder::KeyEncoderContext* ctx, const KeyColumnArray& col,
-    const KeyEncoder::KeyRowArray& rows, uint8_t* match_bytevector) {
+    LightContext* ctx, const KeyColumnArray& col, const RowTableImpl& rows,
+    uint8_t* match_bytevector) {
   const uint32_t* offsets_left = col.offsets();
   const uint32_t* offsets_right = rows.offsets();
   const uint8_t* rows_left = col.data(2);
@@ -569,8 +567,8 @@ uint32_t KeyCompare::AndByteVectors_avx2(uint32_t num_elements, uint8_t* bytevec
 uint32_t KeyCompare::NullUpdateColumnToRow_avx2(
     bool use_selection, uint32_t id_col, uint32_t num_rows_to_compare,
     const uint16_t* sel_left_maybe_null, const uint32_t* left_to_right_map,
-    KeyEncoder::KeyEncoderContext* ctx, const KeyColumnArray& col,
-    const KeyEncoder::KeyRowArray& rows, uint8_t* match_bytevector) {
+    LightContext* ctx, const KeyColumnArray& col, const RowTableImpl& rows,
+    uint8_t* match_bytevector) {
   if (use_selection) {
     return NullUpdateColumnToRowImp_avx2<true>(id_col, num_rows_to_compare,
                                                sel_left_maybe_null, left_to_right_map,
@@ -585,8 +583,8 @@ uint32_t KeyCompare::NullUpdateColumnToRow_avx2(
 uint32_t KeyCompare::CompareBinaryColumnToRow_avx2(
     bool use_selection, uint32_t offset_within_row, uint32_t num_rows_to_compare,
     const uint16_t* sel_left_maybe_null, const uint32_t* left_to_right_map,
-    KeyEncoder::KeyEncoderContext* ctx, const KeyColumnArray& col,
-    const KeyEncoder::KeyRowArray& rows, uint8_t* match_bytevector) {
+    LightContext* ctx, const KeyColumnArray& col, const RowTableImpl& rows,
+    uint8_t* match_bytevector) {
   if (use_selection) {
     return CompareBinaryColumnToRowImp_avx2<true>(offset_within_row, num_rows_to_compare,
                                                   sel_left_maybe_null, left_to_right_map,
@@ -601,9 +599,8 @@ uint32_t KeyCompare::CompareBinaryColumnToRow_avx2(
 void KeyCompare::CompareVarBinaryColumnToRow_avx2(
     bool use_selection, bool is_first_varbinary_col, uint32_t id_varlen_col,
     uint32_t num_rows_to_compare, const uint16_t* sel_left_maybe_null,
-    const uint32_t* left_to_right_map, KeyEncoder::KeyEncoderContext* ctx,
-    const KeyColumnArray& col, const KeyEncoder::KeyRowArray& rows,
-    uint8_t* match_bytevector) {
+    const uint32_t* left_to_right_map, LightContext* ctx, const KeyColumnArray& col,
+    const RowTableImpl& rows, uint8_t* match_bytevector) {
   if (use_selection) {
     if (is_first_varbinary_col) {
       CompareVarBinaryColumnToRowImp_avx2<true, true>(
diff --git a/cpp/src/arrow/compute/exec/key_encode.cc b/cpp/src/arrow/compute/row/encode_internal.cc
similarity index 56%
rename from cpp/src/arrow/compute/exec/key_encode.cc
rename to cpp/src/arrow/compute/row/encode_internal.cc
index 3d92c77b09..cbfd169b44 100644
--- a/cpp/src/arrow/compute/exec/key_encode.cc
+++ b/cpp/src/arrow/compute/row/encode_internal.cc
@@ -15,318 +15,223 @@
 // specific language governing permissions and limitations
 // under the License.
 
-#include "arrow/compute/exec/key_encode.h"
-
-#include <memory.h>
-
-#include <algorithm>
-
-#include "arrow/compute/exec/util.h"
-#include "arrow/util/bit_util.h"
-#include "arrow/util/ubsan.h"
+#include "arrow/compute/row/encode_internal.h"
 
 namespace arrow {
 namespace compute {
 
-KeyEncoder::KeyRowArray::KeyRowArray()
-    : pool_(nullptr), rows_capacity_(0), bytes_capacity_(0) {}
-
-Status KeyEncoder::KeyRowArray::Init(MemoryPool* pool, const KeyRowMetadata& metadata) {
-  pool_ = pool;
-  metadata_ = metadata;
-
-  DCHECK(!null_masks_ && !offsets_ && !rows_);
-
-  constexpr int64_t rows_capacity = 8;
-  constexpr int64_t bytes_capacity = 1024;
-
-  // Null masks
-  ARROW_ASSIGN_OR_RAISE(auto null_masks,
-                        AllocateResizableBuffer(size_null_masks(rows_capacity), pool_));
-  null_masks_ = std::move(null_masks);
-  memset(null_masks_->mutable_data(), 0, size_null_masks(rows_capacity));
-
-  // Offsets and rows
-  if (!metadata.is_fixed_length) {
-    ARROW_ASSIGN_OR_RAISE(auto offsets,
-                          AllocateResizableBuffer(size_offsets(rows_capacity), pool_));
-    offsets_ = std::move(offsets);
-    memset(offsets_->mutable_data(), 0, size_offsets(rows_capacity));
-    reinterpret_cast<uint32_t*>(offsets_->mutable_data())[0] = 0;
-
-    ARROW_ASSIGN_OR_RAISE(
-        auto rows,
-        AllocateResizableBuffer(size_rows_varying_length(bytes_capacity), pool_));
-    rows_ = std::move(rows);
-    memset(rows_->mutable_data(), 0, size_rows_varying_length(bytes_capacity));
-    bytes_capacity_ = size_rows_varying_length(bytes_capacity) - padding_for_vectors;
-  } else {
-    ARROW_ASSIGN_OR_RAISE(
-        auto rows, AllocateResizableBuffer(size_rows_fixed_length(rows_capacity), pool_));
-    rows_ = std::move(rows);
-    memset(rows_->mutable_data(), 0, size_rows_fixed_length(rows_capacity));
-    bytes_capacity_ = size_rows_fixed_length(rows_capacity) - padding_for_vectors;
-  }
-
-  update_buffer_pointers();
-
-  rows_capacity_ = rows_capacity;
-
-  num_rows_ = 0;
-  num_rows_for_has_any_nulls_ = 0;
-  has_any_nulls_ = false;
-
-  return Status::OK();
-}
-
-void KeyEncoder::KeyRowArray::Clean() {
-  num_rows_ = 0;
-  num_rows_for_has_any_nulls_ = 0;
-  has_any_nulls_ = false;
-
-  if (!metadata_.is_fixed_length) {
-    reinterpret_cast<uint32_t*>(offsets_->mutable_data())[0] = 0;
-  }
-}
-
-int64_t KeyEncoder::KeyRowArray::size_null_masks(int64_t num_rows) {
-  return num_rows * metadata_.null_masks_bytes_per_row + padding_for_vectors;
-}
-
-int64_t KeyEncoder::KeyRowArray::size_offsets(int64_t num_rows) {
-  return (num_rows + 1) * sizeof(uint32_t) + padding_for_vectors;
+void RowTableEncoder::Init(const std::vector<KeyColumnMetadata>& cols, LightContext* ctx,
+                           int row_alignment, int string_alignment) {
+  ctx_ = ctx;
+  row_metadata_.FromColumnMetadataVector(cols, row_alignment, string_alignment);
+  uint32_t num_cols = row_metadata_.num_cols();
+  uint32_t num_varbinary_cols = row_metadata_.num_varbinary_cols();
+  batch_all_cols_.resize(num_cols);
+  batch_varbinary_cols_.resize(num_varbinary_cols);
+  batch_varbinary_cols_base_offsets_.resize(num_varbinary_cols);
 }
 
-int64_t KeyEncoder::KeyRowArray::size_rows_fixed_length(int64_t num_rows) {
-  return num_rows * metadata_.fixed_length + padding_for_vectors;
-}
+void RowTableEncoder::PrepareKeyColumnArrays(int64_t start_row, int64_t num_rows,
+                                             const std::vector<KeyColumnArray>& cols_in) {
+  const auto num_cols = static_cast<uint32_t>(cols_in.size());
+  DCHECK(batch_all_cols_.size() == num_cols);
 
-int64_t KeyEncoder::KeyRowArray::size_rows_varying_length(int64_t num_bytes) {
-  return num_bytes + padding_for_vectors;
-}
+  uint32_t num_varbinary_visited = 0;
+  for (uint32_t i = 0; i < num_cols; ++i) {
+    const KeyColumnArray& col = cols_in[row_metadata_.column_order[i]];
+    KeyColumnArray col_window = col.Slice(start_row, num_rows);
 
-void KeyEncoder::KeyRowArray::update_buffer_pointers() {
-  buffers_[0] = mutable_buffers_[0] = null_masks_->mutable_data();
-  if (metadata_.is_fixed_length) {
-    buffers_[1] = mutable_buffers_[1] = rows_->mutable_data();
-    buffers_[2] = mutable_buffers_[2] = nullptr;
-  } else {
-    buffers_[1] = mutable_buffers_[1] = offsets_->mutable_data();
-    buffers_[2] = mutable_buffers_[2] = rows_->mutable_data();
+    batch_all_cols_[i] = col_window;
+    if (!col.metadata().is_fixed_length) {
+      DCHECK(num_varbinary_visited < batch_varbinary_cols_.size());
+      // If start row is zero, then base offset of varbinary column is also zero.
+      if (start_row == 0) {
+        batch_varbinary_cols_base_offsets_[num_varbinary_visited] = 0;
+      } else {
+        batch_varbinary_cols_base_offsets_[num_varbinary_visited] =
+            col.offsets()[start_row];
+      }
+      batch_varbinary_cols_[num_varbinary_visited++] = col_window;
+    }
   }
 }
 
-Status KeyEncoder::KeyRowArray::ResizeFixedLengthBuffers(int64_t num_extra_rows) {
-  if (rows_capacity_ >= num_rows_ + num_extra_rows) {
-    return Status::OK();
-  }
-
-  int64_t rows_capacity_new = std::max(static_cast<int64_t>(1), 2 * rows_capacity_);
-  while (rows_capacity_new < num_rows_ + num_extra_rows) {
-    rows_capacity_new *= 2;
-  }
+void RowTableEncoder::DecodeFixedLengthBuffers(int64_t start_row_input,
+                                               int64_t start_row_output, int64_t num_rows,
+                                               const RowTableImpl& rows,
+                                               std::vector<KeyColumnArray>* cols) {
+  // Prepare column array vectors
+  PrepareKeyColumnArrays(start_row_output, num_rows, *cols);
 
-  // Null masks
-  RETURN_NOT_OK(null_masks_->Resize(size_null_masks(rows_capacity_new), false));
-  memset(null_masks_->mutable_data() + size_null_masks(rows_capacity_), 0,
-         size_null_masks(rows_capacity_new) - size_null_masks(rows_capacity_));
+  // Create two temp vectors with 16-bit elements
+  auto temp_buffer_holder_A =
+      util::TempVectorHolder<uint16_t>(ctx_->stack, static_cast<uint32_t>(num_rows));
+  auto temp_buffer_A = KeyColumnArray(
+      KeyColumnMetadata(true, sizeof(uint16_t)), num_rows, nullptr,
+      reinterpret_cast<uint8_t*>(temp_buffer_holder_A.mutable_data()), nullptr);
+  auto temp_buffer_holder_B =
+      util::TempVectorHolder<uint16_t>(ctx_->stack, static_cast<uint32_t>(num_rows));
+  auto temp_buffer_B = KeyColumnArray(
+      KeyColumnMetadata(true, sizeof(uint16_t)), num_rows, nullptr,
+      reinterpret_cast<uint8_t*>(temp_buffer_holder_B.mutable_data()), nullptr);
 
-  // Either offsets or rows
-  if (!metadata_.is_fixed_length) {
-    RETURN_NOT_OK(offsets_->Resize(size_offsets(rows_capacity_new), false));
-    memset(offsets_->mutable_data() + size_offsets(rows_capacity_), 0,
-           size_offsets(rows_capacity_new) - size_offsets(rows_capacity_));
-  } else {
-    RETURN_NOT_OK(rows_->Resize(size_rows_fixed_length(rows_capacity_new), false));
-    memset(rows_->mutable_data() + size_rows_fixed_length(rows_capacity_), 0,
-           size_rows_fixed_length(rows_capacity_new) -
-               size_rows_fixed_length(rows_capacity_));
-    bytes_capacity_ = size_rows_fixed_length(rows_capacity_new) - padding_for_vectors;
+  bool is_row_fixed_length = row_metadata_.is_fixed_length;
+  if (!is_row_fixed_length) {
+    EncoderOffsets::Decode(static_cast<uint32_t>(start_row_input),
+                           static_cast<uint32_t>(num_rows), rows, &batch_varbinary_cols_,
+                           batch_varbinary_cols_base_offsets_, ctx_);
   }
 
-  update_buffer_pointers();
-
-  rows_capacity_ = rows_capacity_new;
+  // Process fixed length columns
+  const auto num_cols = static_cast<uint32_t>(batch_all_cols_.size());
+  for (uint32_t i = 0; i < num_cols;) {
+    if (!batch_all_cols_[i].metadata().is_fixed_length ||
+        batch_all_cols_[i].metadata().is_null_type) {
+      i += 1;
+      continue;
+    }
+    bool can_process_pair =
+        (i + 1 < num_cols) && batch_all_cols_[i + 1].metadata().is_fixed_length &&
+        EncoderBinaryPair::CanProcessPair(batch_all_cols_[i].metadata(),
+                                          batch_all_cols_[i + 1].metadata());
+    if (!can_process_pair) {
+      EncoderBinary::Decode(static_cast<uint32_t>(start_row_input),
+                            static_cast<uint32_t>(num_rows),
+                            row_metadata_.column_offsets[i], rows, &batch_all_cols_[i],
+                            ctx_, &temp_buffer_A);
+      i += 1;
+    } else {
+      EncoderBinaryPair::Decode(
+          static_cast<uint32_t>(start_row_input), static_cast<uint32_t>(num_rows),
+          row_metadata_.column_offsets[i], rows, &batch_all_cols_[i],
+          &batch_all_cols_[i + 1], ctx_, &temp_buffer_A, &temp_buffer_B);
+      i += 2;
+    }
+  }
 
-  return Status::OK();
+  // Process nulls
+  EncoderNulls::Decode(static_cast<uint32_t>(start_row_input),
+                       static_cast<uint32_t>(num_rows), rows, &batch_all_cols_);
 }
 
-Status KeyEncoder::KeyRowArray::ResizeOptionalVaryingLengthBuffer(
-    int64_t num_extra_bytes) {
-  int64_t num_bytes = offsets()[num_rows_];
-  if (bytes_capacity_ >= num_bytes + num_extra_bytes || metadata_.is_fixed_length) {
-    return Status::OK();
-  }
+void RowTableEncoder::DecodeVaryingLengthBuffers(int64_t start_row_input,
+                                                 int64_t start_row_output,
+                                                 int64_t num_rows,
+                                                 const RowTableImpl& rows,
+                                                 std::vector<KeyColumnArray>* cols) {
+  // Prepare column array vectors
+  PrepareKeyColumnArrays(start_row_output, num_rows, *cols);
 
-  int64_t bytes_capacity_new = std::max(static_cast<int64_t>(1), 2 * bytes_capacity_);
-  while (bytes_capacity_new < num_bytes + num_extra_bytes) {
-    bytes_capacity_new *= 2;
+  bool is_row_fixed_length = row_metadata_.is_fixed_length;
+  if (!is_row_fixed_length) {
+    for (size_t i = 0; i < batch_varbinary_cols_.size(); ++i) {
+      // Memcpy varbinary fields into precomputed in the previous step
+      // positions in the output row buffer.
+      EncoderVarBinary::Decode(static_cast<uint32_t>(start_row_input),
+                               static_cast<uint32_t>(num_rows), static_cast<uint32_t>(i),
+                               rows, &batch_varbinary_cols_[i], ctx_);
+    }
   }
+}
 
-  RETURN_NOT_OK(rows_->Resize(size_rows_varying_length(bytes_capacity_new), false));
-  memset(rows_->mutable_data() + size_rows_varying_length(bytes_capacity_), 0,
-         size_rows_varying_length(bytes_capacity_new) -
-             size_rows_varying_length(bytes_capacity_));
-
-  update_buffer_pointers();
+void RowTableEncoder::PrepareEncodeSelected(int64_t start_row, int64_t num_rows,
+                                            const std::vector<KeyColumnArray>& cols) {
+  // Prepare column array vectors
+  PrepareKeyColumnArrays(start_row, num_rows, cols);
+}
 
-  bytes_capacity_ = bytes_capacity_new;
+Status RowTableEncoder::EncodeSelected(RowTableImpl* rows, uint32_t num_selected,
+                                       const uint16_t* selection) {
+  rows->Clean();
+  RETURN_NOT_OK(
+      rows->AppendEmpty(static_cast<uint32_t>(num_selected), static_cast<uint32_t>(0)));
 
-  return Status::OK();
-}
+  EncoderOffsets::GetRowOffsetsSelected(rows, batch_varbinary_cols_, num_selected,
+                                        selection);
 
-Status KeyEncoder::KeyRowArray::AppendSelectionFrom(const KeyRowArray& from,
-                                                    uint32_t num_rows_to_append,
-                                                    const uint16_t* source_row_ids) {
-  DCHECK(metadata_.is_compatible(from.metadata()));
-
-  RETURN_NOT_OK(ResizeFixedLengthBuffers(num_rows_to_append));
-
-  if (!metadata_.is_fixed_length) {
-    // Varying-length rows
-    auto from_offsets = reinterpret_cast<const uint32_t*>(from.offsets_->data());
-    auto to_offsets = reinterpret_cast<uint32_t*>(offsets_->mutable_data());
-    uint32_t total_length = to_offsets[num_rows_];
-    uint32_t total_length_to_append = 0;
-    for (uint32_t i = 0; i < num_rows_to_append; ++i) {
-      uint16_t row_id = source_row_ids ? source_row_ids[i] : i;
-      uint32_t length = from_offsets[row_id + 1] - from_offsets[row_id];
-      total_length_to_append += length;
-      to_offsets[num_rows_ + i + 1] = total_length + total_length_to_append;
-    }
+  RETURN_NOT_OK(rows->AppendEmpty(static_cast<uint32_t>(0),
+                                  static_cast<uint32_t>(rows->offsets()[num_selected])));
 
-    RETURN_NOT_OK(ResizeOptionalVaryingLengthBuffer(total_length_to_append));
-
-    const uint8_t* src = from.rows_->data();
-    uint8_t* dst = rows_->mutable_data() + total_length;
-    for (uint32_t i = 0; i < num_rows_to_append; ++i) {
-      uint16_t row_id = source_row_ids ? source_row_ids[i] : i;
-      uint32_t length = from_offsets[row_id + 1] - from_offsets[row_id];
-      auto src64 = reinterpret_cast<const uint64_t*>(src + from_offsets[row_id]);
-      auto dst64 = reinterpret_cast<uint64_t*>(dst);
-      for (uint32_t j = 0; j < bit_util::CeilDiv(length, 8); ++j) {
-        dst64[j] = src64[j];
-      }
-      dst += length;
-    }
-  } else {
-    // Fixed-length rows
-    const uint8_t* src = from.rows_->data();
-    uint8_t* dst = rows_->mutable_data() + num_rows_ * metadata_.fixed_length;
-    for (uint32_t i = 0; i < num_rows_to_append; ++i) {
-      uint16_t row_id = source_row_ids ? source_row_ids[i] : i;
-      uint32_t length = metadata_.fixed_length;
-      auto src64 = reinterpret_cast<const uint64_t*>(src + length * row_id);
-      auto dst64 = reinterpret_cast<uint64_t*>(dst);
-      for (uint32_t j = 0; j < bit_util::CeilDiv(length, 8); ++j) {
-        dst64[j] = src64[j];
-      }
-      dst += length;
+  for (size_t icol = 0; icol < batch_all_cols_.size(); ++icol) {
+    if (batch_all_cols_[icol].metadata().is_fixed_length) {
+      uint32_t offset_within_row = rows->metadata().column_offsets[icol];
+      EncoderBinary::EncodeSelected(offset_within_row, rows, batch_all_cols_[icol],
+                                    num_selected, selection);
     }
   }
 
-  // Null masks
-  uint32_t byte_length = metadata_.null_masks_bytes_per_row;
-  uint64_t dst_byte_offset = num_rows_ * byte_length;
-  const uint8_t* src_base = from.null_masks_->data();
-  uint8_t* dst_base = null_masks_->mutable_data();
-  for (uint32_t i = 0; i < num_rows_to_append; ++i) {
-    uint32_t row_id = source_row_ids ? source_row_ids[i] : i;
-    int64_t src_byte_offset = row_id * byte_length;
-    const uint8_t* src = src_base + src_byte_offset;
-    uint8_t* dst = dst_base + dst_byte_offset;
-    for (uint32_t ibyte = 0; ibyte < byte_length; ++ibyte) {
-      dst[ibyte] = src[ibyte];
-    }
-    dst_byte_offset += byte_length;
+  EncoderOffsets::EncodeSelected(rows, batch_varbinary_cols_, num_selected, selection);
+
+  for (size_t icol = 0; icol < batch_varbinary_cols_.size(); ++icol) {
+    EncoderVarBinary::EncodeSelected(static_cast<uint32_t>(icol), rows,
+                                     batch_varbinary_cols_[icol], num_selected,
+                                     selection);
   }
 
-  num_rows_ += num_rows_to_append;
+  EncoderNulls::EncodeSelected(rows, batch_all_cols_, num_selected, selection);
 
   return Status::OK();
 }
 
-Status KeyEncoder::KeyRowArray::AppendEmpty(uint32_t num_rows_to_append,
-                                            uint32_t num_extra_bytes_to_append) {
-  RETURN_NOT_OK(ResizeFixedLengthBuffers(num_rows_to_append));
-  RETURN_NOT_OK(ResizeOptionalVaryingLengthBuffer(num_extra_bytes_to_append));
-  num_rows_ += num_rows_to_append;
-  if (metadata_.row_alignment > 1 || metadata_.string_alignment > 1) {
-    memset(rows_->mutable_data(), 0, bytes_capacity_);
+namespace {
+struct TransformBoolean {
+  static KeyColumnArray ArrayReplace(const KeyColumnArray& column,
+                                     const KeyColumnArray& temp) {
+    // Make sure that the temp buffer is large enough
+    DCHECK(temp.length() >= column.length() && temp.metadata().is_fixed_length &&
+           temp.metadata().fixed_length >= sizeof(uint8_t));
+    KeyColumnMetadata metadata;
+    metadata.is_fixed_length = true;
+    metadata.fixed_length = sizeof(uint8_t);
+    constexpr int buffer_index = 1;
+    return column.WithBufferFrom(temp, buffer_index).WithMetadata(metadata);
   }
-  return Status::OK();
-}
 
-bool KeyEncoder::KeyRowArray::has_any_nulls(const KeyEncoderContext* ctx) const {
-  if (has_any_nulls_) {
-    return true;
-  }
-  if (num_rows_for_has_any_nulls_ < num_rows_) {
-    auto size_per_row = metadata().null_masks_bytes_per_row;
-    has_any_nulls_ = !util::bit_util::are_all_bytes_zero(
-        ctx->hardware_flags, null_masks() + size_per_row * num_rows_for_has_any_nulls_,
-        static_cast<uint32_t>(size_per_row * (num_rows_ - num_rows_for_has_any_nulls_)));
-    num_rows_for_has_any_nulls_ = num_rows_;
-  }
-  return has_any_nulls_;
-}
+  static void PostDecode(const KeyColumnArray& input, KeyColumnArray* output,
+                         LightContext* ctx) {
+    // Make sure that metadata and lengths are compatible.
+    DCHECK(output->metadata().is_fixed_length == input.metadata().is_fixed_length);
+    DCHECK(output->metadata().fixed_length == 0 && input.metadata().fixed_length == 1);
+    DCHECK(output->length() == input.length());
+    constexpr int buffer_index = 1;
+    DCHECK(input.data(buffer_index) != nullptr);
+    DCHECK(output->mutable_data(buffer_index) != nullptr);
 
-KeyColumnArray KeyEncoder::TransformBoolean::ArrayReplace(const KeyColumnArray& column,
-                                                          const KeyColumnArray& temp) {
-  // Make sure that the temp buffer is large enough
-  DCHECK(temp.length() >= column.length() && temp.metadata().is_fixed_length &&
-         temp.metadata().fixed_length >= sizeof(uint8_t));
-  KeyColumnMetadata metadata;
-  metadata.is_fixed_length = true;
-  metadata.fixed_length = sizeof(uint8_t);
-  constexpr int buffer_index = 1;
-  return column.WithBufferFrom(temp, buffer_index).WithMetadata(metadata);
-}
+    util::bit_util::bytes_to_bits(
+        ctx->hardware_flags, static_cast<int>(input.length()), input.data(buffer_index),
+        output->mutable_data(buffer_index), output->bit_offset(buffer_index));
+  }
+};
 
-void KeyEncoder::TransformBoolean::PostDecode(const KeyColumnArray& input,
-                                              KeyColumnArray* output,
-                                              KeyEncoderContext* ctx) {
-  // Make sure that metadata and lengths are compatible.
-  DCHECK(output->metadata().is_fixed_length == input.metadata().is_fixed_length);
-  DCHECK(output->metadata().fixed_length == 0 && input.metadata().fixed_length == 1);
-  DCHECK(output->length() == input.length());
-  constexpr int buffer_index = 1;
-  DCHECK(input.data(buffer_index) != nullptr);
-  DCHECK(output->mutable_data(buffer_index) != nullptr);
-
-  util::bit_util::bytes_to_bits(
-      ctx->hardware_flags, static_cast<int>(input.length()), input.data(buffer_index),
-      output->mutable_data(buffer_index), output->bit_offset(buffer_index));
-}
+}  // namespace
 
-bool KeyEncoder::EncoderInteger::IsBoolean(const KeyColumnMetadata& metadata) {
+bool EncoderInteger::IsBoolean(const KeyColumnMetadata& metadata) {
   return metadata.is_fixed_length && metadata.fixed_length == 0 && !metadata.is_null_type;
 }
 
-bool KeyEncoder::EncoderInteger::UsesTransform(const KeyColumnArray& column) {
+bool EncoderInteger::UsesTransform(const KeyColumnArray& column) {
   return IsBoolean(column.metadata());
 }
 
-KeyColumnArray KeyEncoder::EncoderInteger::ArrayReplace(const KeyColumnArray& column,
-                                                        const KeyColumnArray& temp) {
+KeyColumnArray EncoderInteger::ArrayReplace(const KeyColumnArray& column,
+                                            const KeyColumnArray& temp) {
   if (IsBoolean(column.metadata())) {
     return TransformBoolean::ArrayReplace(column, temp);
   }
   return column;
 }
 
-void KeyEncoder::EncoderInteger::PostDecode(const KeyColumnArray& input,
-                                            KeyColumnArray* output,
-                                            KeyEncoderContext* ctx) {
+void EncoderInteger::PostDecode(const KeyColumnArray& input, KeyColumnArray* output,
+                                LightContext* ctx) {
   if (IsBoolean(output->metadata())) {
     TransformBoolean::PostDecode(input, output, ctx);
   }
 }
 
-void KeyEncoder::EncoderInteger::Decode(uint32_t start_row, uint32_t num_rows,
-                                        uint32_t offset_within_row,
-                                        const KeyRowArray& rows, KeyColumnArray* col,
-                                        KeyEncoderContext* ctx, KeyColumnArray* temp) {
+void EncoderInteger::Decode(uint32_t start_row, uint32_t num_rows,
+                            uint32_t offset_within_row, const RowTableImpl& rows,
+                            KeyColumnArray* col, LightContext* ctx,
+                            KeyColumnArray* temp) {
   KeyColumnArray col_prep;
   if (UsesTransform(*col)) {
     col_prep = ArrayReplace(*col, *temp);
@@ -412,7 +317,111 @@ void KeyEncoder::EncoderInteger::Decode(uint32_t start_row, uint32_t num_rows,
   }
 }
 
-bool KeyEncoder::EncoderBinary::IsInteger(const KeyColumnMetadata& metadata) {
+template <class COPY_FN, class SET_NULL_FN>
+void EncoderBinary::EncodeSelectedImp(uint32_t offset_within_row, RowTableImpl* rows,
+                                      const KeyColumnArray& col, uint32_t num_selected,
+                                      const uint16_t* selection, COPY_FN copy_fn,
+                                      SET_NULL_FN set_null_fn) {
+  bool is_fixed_length = rows->metadata().is_fixed_length;
+  if (is_fixed_length) {
+    uint32_t row_width = rows->metadata().fixed_length;
+    const uint8_t* src_base = col.data(1);
+    uint8_t* dst = rows->mutable_data(1) + offset_within_row;
+    for (uint32_t i = 0; i < num_selected; ++i) {
+      copy_fn(dst, src_base, selection[i]);
+      dst += row_width;
+    }
+    if (col.data(0)) {
+      const uint8_t* non_null_bits = col.data(0);
+      uint8_t* dst = rows->mutable_data(1) + offset_within_row;
+      for (uint32_t i = 0; i < num_selected; ++i) {
+        bool is_null = !bit_util::GetBit(non_null_bits, selection[i] + col.bit_offset(0));
+        if (is_null) {
+          set_null_fn(dst);
+        }
+        dst += row_width;
+      }
+    }
+  } else {
+    const uint8_t* src_base = col.data(1);
+    uint8_t* dst = rows->mutable_data(2) + offset_within_row;
+    const uint32_t* offsets = rows->offsets();
+    for (uint32_t i = 0; i < num_selected; ++i) {
+      copy_fn(dst + offsets[i], src_base, selection[i]);
+    }
+    if (col.data(0)) {
+      const uint8_t* non_null_bits = col.data(0);
+      uint8_t* dst = rows->mutable_data(2) + offset_within_row;
+      const uint32_t* offsets = rows->offsets();
+      for (uint32_t i = 0; i < num_selected; ++i) {
+        bool is_null = !bit_util::GetBit(non_null_bits, selection[i] + col.bit_offset(0));
+        if (is_null) {
+          set_null_fn(dst + offsets[i]);
+        }
+      }
+    }
+  }
+}
+
+void EncoderBinary::EncodeSelected(uint32_t offset_within_row, RowTableImpl* rows,
+                                   const KeyColumnArray& col, uint32_t num_selected,
+                                   const uint16_t* selection) {
+  if (col.metadata().is_null_type) {
+    return;
+  }
+  uint32_t col_width = col.metadata().fixed_length;
+  if (col_width == 0) {
+    int bit_offset = col.bit_offset(1);
+    EncodeSelectedImp(
+        offset_within_row, rows, col, num_selected, selection,
+        [bit_offset](uint8_t* dst, const uint8_t* src_base, uint16_t irow) {
+          *dst = bit_util::GetBit(src_base, irow + bit_offset) ? 0xff : 0x00;
+        },
+        [](uint8_t* dst) { *dst = 0xae; });
+  } else if (col_width == 1) {
+    EncodeSelectedImp(
+        offset_within_row, rows, col, num_selected, selection,
+        [](uint8_t* dst, const uint8_t* src_base, uint16_t irow) {
+          *dst = src_base[irow];
+        },
+        [](uint8_t* dst) { *dst = 0xae; });
+  } else if (col_width == 2) {
+    EncodeSelectedImp(
+        offset_within_row, rows, col, num_selected, selection,
+        [](uint8_t* dst, const uint8_t* src_base, uint16_t irow) {
+          *reinterpret_cast<uint16_t*>(dst) =
+              reinterpret_cast<const uint16_t*>(src_base)[irow];
+        },
+        [](uint8_t* dst) { *reinterpret_cast<uint16_t*>(dst) = 0xaeae; });
+  } else if (col_width == 4) {
+    EncodeSelectedImp(
+        offset_within_row, rows, col, num_selected, selection,
+        [](uint8_t* dst, const uint8_t* src_base, uint16_t irow) {
+          *reinterpret_cast<uint32_t*>(dst) =
+              reinterpret_cast<const uint32_t*>(src_base)[irow];
+        },
+        [](uint8_t* dst) {
+          *reinterpret_cast<uint32_t*>(dst) = static_cast<uint32_t>(0xaeaeaeae);
+        });
+  } else if (col_width == 8) {
+    EncodeSelectedImp(
+        offset_within_row, rows, col, num_selected, selection,
+        [](uint8_t* dst, const uint8_t* src_base, uint16_t irow) {
+          *reinterpret_cast<uint64_t*>(dst) =
+              reinterpret_cast<const uint64_t*>(src_base)[irow];
+        },
+        [](uint8_t* dst) { *reinterpret_cast<uint64_t*>(dst) = 0xaeaeaeaeaeaeaeaeULL; });
+  } else {
+    EncodeSelectedImp(
+        offset_within_row, rows, col, num_selected, selection,
+        [col_width](uint8_t* dst, const uint8_t* src_base, uint16_t irow) {
+          memcpy(dst, src_base + col_width * irow, col_width);
+        },
+        [col_width](uint8_t* dst) { memset(dst, 0xae, col_width); });
+  }
+}
+
+bool EncoderBinary::IsInteger(const KeyColumnMetadata& metadata) {
   if (metadata.is_null_type) {
     return false;
   }
@@ -422,10 +431,9 @@ bool KeyEncoder::EncoderBinary::IsInteger(const KeyColumnMetadata& metadata) {
          (size == 0 || size == 1 || size == 2 || size == 4 || size == 8);
 }
 
-void KeyEncoder::EncoderBinary::Decode(uint32_t start_row, uint32_t num_rows,
-                                       uint32_t offset_within_row,
-                                       const KeyRowArray& rows, KeyColumnArray* col,
-                                       KeyEncoderContext* ctx, KeyColumnArray* temp) {
+void EncoderBinary::Decode(uint32_t start_row, uint32_t num_rows,
+                           uint32_t offset_within_row, const RowTableImpl& rows,
+                           KeyColumnArray* col, LightContext* ctx, KeyColumnArray* temp) {
   if (IsInteger(col->metadata())) {
     EncoderInteger::Decode(start_row, num_rows, offset_within_row, rows, col, ctx, temp);
   } else {
@@ -460,9 +468,9 @@ void KeyEncoder::EncoderBinary::Decode(uint32_t start_row, uint32_t num_rows,
 }
 
 template <bool is_row_fixed_length>
-void KeyEncoder::EncoderBinary::DecodeImp(uint32_t start_row, uint32_t num_rows,
-                                          uint32_t offset_within_row,
-                                          const KeyRowArray& rows, KeyColumnArray* col) {
+void EncoderBinary::DecodeImp(uint32_t start_row, uint32_t num_rows,
+                              uint32_t offset_within_row, const RowTableImpl& rows,
+                              KeyColumnArray* col) {
   DecodeHelper<is_row_fixed_length>(
       start_row, num_rows, offset_within_row, &rows, nullptr, col, col,
       [](uint8_t* dst, const uint8_t* src, int64_t length) {
@@ -474,11 +482,11 @@ void KeyEncoder::EncoderBinary::DecodeImp(uint32_t start_row, uint32_t num_rows,
       });
 }
 
-void KeyEncoder::EncoderBinaryPair::Decode(uint32_t start_row, uint32_t num_rows,
-                                           uint32_t offset_within_row,
-                                           const KeyRowArray& rows, KeyColumnArray* col1,
-                                           KeyColumnArray* col2, KeyEncoderContext* ctx,
-                                           KeyColumnArray* temp1, KeyColumnArray* temp2) {
+void EncoderBinaryPair::Decode(uint32_t start_row, uint32_t num_rows,
+                               uint32_t offset_within_row, const RowTableImpl& rows,
+                               KeyColumnArray* col1, KeyColumnArray* col2,
+                               LightContext* ctx, KeyColumnArray* temp1,
+                               KeyColumnArray* temp2) {
   DCHECK(CanProcessPair(col1->metadata(), col2->metadata()));
 
   KeyColumnArray col_prep[2];
@@ -516,7 +524,7 @@ void KeyEncoder::EncoderBinaryPair::Decode(uint32_t start_row, uint32_t num_rows
 #endif
   if (num_processed < num_rows) {
     using DecodeImp_t = void (*)(uint32_t, uint32_t, uint32_t, uint32_t,
-                                 const KeyRowArray&, KeyColumnArray*, KeyColumnArray*);
+                                 const RowTableImpl&, KeyColumnArray*, KeyColumnArray*);
     static const DecodeImp_t DecodeImp_fn[] = {
         DecodeImp<false, uint8_t, uint8_t>,   DecodeImp<false, uint16_t, uint8_t>,
         DecodeImp<false, uint32_t, uint8_t>,  DecodeImp<false, uint64_t, uint8_t>,
@@ -549,12 +557,10 @@ void KeyEncoder::EncoderBinaryPair::Decode(uint32_t start_row, uint32_t num_rows
 }
 
 template <bool is_row_fixed_length, typename col1_type, typename col2_type>
-void KeyEncoder::EncoderBinaryPair::DecodeImp(uint32_t num_rows_to_skip,
-                                              uint32_t start_row, uint32_t num_rows,
-                                              uint32_t offset_within_row,
-                                              const KeyRowArray& rows,
-                                              KeyColumnArray* col1,
-                                              KeyColumnArray* col2) {
+void EncoderBinaryPair::DecodeImp(uint32_t num_rows_to_skip, uint32_t start_row,
+                                  uint32_t num_rows, uint32_t offset_within_row,
+                                  const RowTableImpl& rows, KeyColumnArray* col1,
+                                  KeyColumnArray* col2) {
   DCHECK(rows.length() >= start_row + num_rows);
   DCHECK(col1->length() == num_rows && col2->length() == num_rows);
 
@@ -593,10 +599,11 @@ void KeyEncoder::EncoderBinaryPair::DecodeImp(uint32_t num_rows_to_skip,
   }
 }
 
-void KeyEncoder::EncoderOffsets::Decode(
-    uint32_t start_row, uint32_t num_rows, const KeyRowArray& rows,
-    std::vector<KeyColumnArray>* varbinary_cols,
-    const std::vector<uint32_t>& varbinary_cols_base_offset, KeyEncoderContext* ctx) {
+void EncoderOffsets::Decode(uint32_t start_row, uint32_t num_rows,
+                            const RowTableImpl& rows,
+                            std::vector<KeyColumnArray>* varbinary_cols,
+                            const std::vector<uint32_t>& varbinary_cols_base_offset,
+                            LightContext* ctx) {
   DCHECK(!varbinary_cols->empty());
   DCHECK(varbinary_cols->size() == varbinary_cols_base_offset.size());
 
@@ -635,7 +642,7 @@ void KeyEncoder::EncoderOffsets::Decode(
     uint32_t offset_within_row = rows.metadata().fixed_length;
     for (size_t col = 0; col < varbinary_cols->size(); ++col) {
       offset_within_row +=
-          KeyRowMetadata::padding_for_alignment(offset_within_row, string_alignment);
+          RowTableMetadata::padding_for_alignment(offset_within_row, string_alignment);
       uint32_t length = varbinary_ends[col] - offset_within_row;
       offset_within_row = varbinary_ends[col];
       uint32_t* col_offsets = (*varbinary_cols)[col].mutable_offsets();
@@ -644,440 +651,10 @@ void KeyEncoder::EncoderOffsets::Decode(
   }
 }
 
-void KeyEncoder::EncoderVarBinary::Decode(uint32_t start_row, uint32_t num_rows,
-                                          uint32_t varbinary_col_id,
-                                          const KeyRowArray& rows, KeyColumnArray* col,
-                                          KeyEncoderContext* ctx) {
-  // Output column varbinary buffer needs an extra 32B
-  // at the end in avx2 version and 8B otherwise.
-#if defined(ARROW_HAVE_AVX2)
-  if (ctx->has_avx2()) {
-    DecodeHelper_avx2(start_row, num_rows, varbinary_col_id, rows, col);
-  } else {
-#endif
-    if (varbinary_col_id == 0) {
-      DecodeImp<true>(start_row, num_rows, varbinary_col_id, rows, col);
-    } else {
-      DecodeImp<false>(start_row, num_rows, varbinary_col_id, rows, col);
-    }
-#if defined(ARROW_HAVE_AVX2)
-  }
-#endif
-}
-
-template <bool first_varbinary_col>
-void KeyEncoder::EncoderVarBinary::DecodeImp(uint32_t start_row, uint32_t num_rows,
-                                             uint32_t varbinary_col_id,
-                                             const KeyRowArray& rows,
-                                             KeyColumnArray* col) {
-  DecodeHelper<first_varbinary_col>(
-      start_row, num_rows, varbinary_col_id, &rows, nullptr, col, col,
-      [](uint8_t* dst, const uint8_t* src, int64_t length) {
-        for (uint32_t istripe = 0; istripe < bit_util::CeilDiv(length, 8); ++istripe) {
-          auto dst64 = reinterpret_cast<uint64_t*>(dst);
-          auto src64 = reinterpret_cast<const uint64_t*>(src);
-          util::SafeStore(dst64 + istripe, src64[istripe]);
-        }
-      });
-}
-
-void KeyEncoder::EncoderNulls::Decode(uint32_t start_row, uint32_t num_rows,
-                                      const KeyRowArray& rows,
-                                      std::vector<KeyColumnArray>* cols) {
-  // Every output column needs to have a space for exactly the required number
-  // of rows. It also needs to have non-nulls bit-vector allocated and mutable.
-  DCHECK_GT(cols->size(), 0);
-  for (auto& col : *cols) {
-    DCHECK(col.length() == num_rows);
-    DCHECK(col.mutable_data(0) || col.metadata().is_null_type);
-  }
-
-  const uint8_t* null_masks = rows.null_masks();
-  uint32_t null_masks_bytes_per_row = rows.metadata().null_masks_bytes_per_row;
-  for (size_t col = 0; col < cols->size(); ++col) {
-    if ((*cols)[col].metadata().is_null_type) {
-      continue;
-    }
-    uint8_t* non_nulls = (*cols)[col].mutable_data(0);
-    const int bit_offset = (*cols)[col].bit_offset(0);
-    DCHECK_LT(bit_offset, 8);
-    non_nulls[0] |= 0xff << (bit_offset);
-    if (bit_offset + num_rows > 8) {
-      int bits_in_first_byte = 8 - bit_offset;
-      memset(non_nulls + 1, 0xff, bit_util::BytesForBits(num_rows - bits_in_first_byte));
-    }
-    for (uint32_t row = 0; row < num_rows; ++row) {
-      uint32_t null_masks_bit_id =
-          (start_row + row) * null_masks_bytes_per_row * 8 + static_cast<uint32_t>(col);
-      bool is_set = bit_util::GetBit(null_masks, null_masks_bit_id);
-      if (is_set) {
-        bit_util::ClearBit(non_nulls, bit_offset + row);
-      }
-    }
-  }
-}
-
-uint32_t KeyEncoder::KeyRowMetadata::num_varbinary_cols() const {
-  uint32_t result = 0;
-  for (auto column_metadata : column_metadatas) {
-    if (!column_metadata.is_fixed_length) {
-      ++result;
-    }
-  }
-  return result;
-}
-
-bool KeyEncoder::KeyRowMetadata::is_compatible(const KeyRowMetadata& other) const {
-  if (other.num_cols() != num_cols()) {
-    return false;
-  }
-  if (row_alignment != other.row_alignment ||
-      string_alignment != other.string_alignment) {
-    return false;
-  }
-  for (size_t i = 0; i < column_metadatas.size(); ++i) {
-    if (column_metadatas[i].is_fixed_length !=
-        other.column_metadatas[i].is_fixed_length) {
-      return false;
-    }
-    if (column_metadatas[i].fixed_length != other.column_metadatas[i].fixed_length) {
-      return false;
-    }
-  }
-  return true;
-}
-
-void KeyEncoder::KeyRowMetadata::FromColumnMetadataVector(
-    const std::vector<KeyColumnMetadata>& cols, int in_row_alignment,
-    int in_string_alignment) {
-  column_metadatas.resize(cols.size());
-  for (size_t i = 0; i < cols.size(); ++i) {
-    column_metadatas[i] = cols[i];
-  }
-
-  const auto num_cols = static_cast<uint32_t>(cols.size());
-
-  // Sort columns.
-  //
-  // Columns are sorted based on the size in bytes of their fixed-length part.
-  // For the varying-length column, the fixed-length part is the 32-bit field storing
-  // cumulative length of varying-length fields.
-  //
-  // The rules are:
-  //
-  // a) Boolean column, marked with fixed-length 0, is considered to have fixed-length
-  // part of 1 byte.
-  //
-  // b) Columns with fixed-length part being power of 2 or multiple of row
-  // alignment precede other columns. They are sorted in decreasing order of the size of
-  // their fixed-length part.
-  //
-  // c) Fixed-length columns precede varying-length columns when
-  // both have the same size fixed-length part.
-  //
-  column_order.resize(num_cols);
-  for (uint32_t i = 0; i < num_cols; ++i) {
-    column_order[i] = i;
-  }
-  std::sort(
-      column_order.begin(), column_order.end(), [&cols](uint32_t left, uint32_t right) {
-        bool is_left_pow2 =
-            !cols[left].is_fixed_length || ARROW_POPCOUNT64(cols[left].fixed_length) <= 1;
-        bool is_right_pow2 = !cols[right].is_fixed_length ||
-                             ARROW_POPCOUNT64(cols[right].fixed_length) <= 1;
-        bool is_left_fixedlen = cols[left].is_fixed_length;
-        bool is_right_fixedlen = cols[right].is_fixed_length;
-        uint32_t width_left =
-            cols[left].is_fixed_length ? cols[left].fixed_length : sizeof(uint32_t);
-        uint32_t width_right =
-            cols[right].is_fixed_length ? cols[right].fixed_length : sizeof(uint32_t);
-        if (is_left_pow2 != is_right_pow2) {
-          return is_left_pow2;
-        }
-        if (!is_left_pow2) {
-          return left < right;
-        }
-        if (width_left != width_right) {
-          return width_left > width_right;
-        }
-        if (is_left_fixedlen != is_right_fixedlen) {
-          return is_left_fixedlen;
-        }
-        return left < right;
-      });
-
-  row_alignment = in_row_alignment;
-  string_alignment = in_string_alignment;
-  varbinary_end_array_offset = 0;
-
-  column_offsets.resize(num_cols);
-  uint32_t num_varbinary_cols = 0;
-  uint32_t offset_within_row = 0;
-  for (uint32_t i = 0; i < num_cols; ++i) {
-    const KeyColumnMetadata& col = cols[column_order[i]];
-    if (col.is_fixed_length && col.fixed_length != 0 &&
-        ARROW_POPCOUNT64(col.fixed_length) != 1) {
-      offset_within_row +=
-          KeyRowMetadata::padding_for_alignment(offset_within_row, string_alignment, col);
-    }
-    column_offsets[i] = offset_within_row;
-    if (!col.is_fixed_length) {
-      if (num_varbinary_cols == 0) {
-        varbinary_end_array_offset = offset_within_row;
-      }
-      DCHECK(column_offsets[i] - varbinary_end_array_offset ==
-             num_varbinary_cols * sizeof(uint32_t));
-      ++num_varbinary_cols;
-      offset_within_row += sizeof(uint32_t);
-    } else {
-      // Boolean column is a bit-vector, which is indicated by
-      // setting fixed length in column metadata to zero.
-      // It will be stored as a byte in output row.
-      if (col.fixed_length == 0) {
-        offset_within_row += 1;
-      } else {
-        offset_within_row += col.fixed_length;
-      }
-    }
-  }
-
-  is_fixed_length = (num_varbinary_cols == 0);
-  fixed_length =
-      offset_within_row +
-      KeyRowMetadata::padding_for_alignment(
-          offset_within_row, num_varbinary_cols == 0 ? row_alignment : string_alignment);
-
-  // We set the number of bytes per row storing null masks of individual key columns
-  // to be a power of two. This is not required. It could be also set to the minimal
-  // number of bytes required for a given number of bits (one bit per column).
-  null_masks_bytes_per_row = 1;
-  while (static_cast<uint32_t>(null_masks_bytes_per_row * 8) < num_cols) {
-    null_masks_bytes_per_row *= 2;
-  }
-}
-
-void KeyEncoder::Init(const std::vector<KeyColumnMetadata>& cols, KeyEncoderContext* ctx,
-                      int row_alignment, int string_alignment) {
-  ctx_ = ctx;
-  row_metadata_.FromColumnMetadataVector(cols, row_alignment, string_alignment);
-  uint32_t num_cols = row_metadata_.num_cols();
-  uint32_t num_varbinary_cols = row_metadata_.num_varbinary_cols();
-  batch_all_cols_.resize(num_cols);
-  batch_varbinary_cols_.resize(num_varbinary_cols);
-  batch_varbinary_cols_base_offsets_.resize(num_varbinary_cols);
-}
-
-void KeyEncoder::PrepareKeyColumnArrays(int64_t start_row, int64_t num_rows,
-                                        const std::vector<KeyColumnArray>& cols_in) {
-  const auto num_cols = static_cast<uint32_t>(cols_in.size());
-  DCHECK(batch_all_cols_.size() == num_cols);
-
-  uint32_t num_varbinary_visited = 0;
-  for (uint32_t i = 0; i < num_cols; ++i) {
-    const KeyColumnArray& col = cols_in[row_metadata_.column_order[i]];
-    KeyColumnArray col_window = col.Slice(start_row, num_rows);
-
-    batch_all_cols_[i] = col_window;
-    if (!col.metadata().is_fixed_length) {
-      DCHECK(num_varbinary_visited < batch_varbinary_cols_.size());
-      // If start row is zero, then base offset of varbinary column is also zero.
-      if (start_row == 0) {
-        batch_varbinary_cols_base_offsets_[num_varbinary_visited] = 0;
-      } else {
-        batch_varbinary_cols_base_offsets_[num_varbinary_visited] =
-            col.offsets()[start_row];
-      }
-      batch_varbinary_cols_[num_varbinary_visited++] = col_window;
-    }
-  }
-}
-
-void KeyEncoder::DecodeFixedLengthBuffers(int64_t start_row_input,
-                                          int64_t start_row_output, int64_t num_rows,
-                                          const KeyRowArray& rows,
-                                          std::vector<KeyColumnArray>* cols) {
-  // Prepare column array vectors
-  PrepareKeyColumnArrays(start_row_output, num_rows, *cols);
-
-  // Create two temp vectors with 16-bit elements
-  auto temp_buffer_holder_A =
-      util::TempVectorHolder<uint16_t>(ctx_->stack, static_cast<uint32_t>(num_rows));
-  auto temp_buffer_A = KeyColumnArray(
-      KeyColumnMetadata(true, sizeof(uint16_t)), num_rows, nullptr,
-      reinterpret_cast<uint8_t*>(temp_buffer_holder_A.mutable_data()), nullptr);
-  auto temp_buffer_holder_B =
-      util::TempVectorHolder<uint16_t>(ctx_->stack, static_cast<uint32_t>(num_rows));
-  auto temp_buffer_B = KeyColumnArray(
-      KeyColumnMetadata(true, sizeof(uint16_t)), num_rows, nullptr,
-      reinterpret_cast<uint8_t*>(temp_buffer_holder_B.mutable_data()), nullptr);
-
-  bool is_row_fixed_length = row_metadata_.is_fixed_length;
-  if (!is_row_fixed_length) {
-    EncoderOffsets::Decode(static_cast<uint32_t>(start_row_input),
-                           static_cast<uint32_t>(num_rows), rows, &batch_varbinary_cols_,
-                           batch_varbinary_cols_base_offsets_, ctx_);
-  }
-
-  // Process fixed length columns
-  const auto num_cols = static_cast<uint32_t>(batch_all_cols_.size());
-  for (uint32_t i = 0; i < num_cols;) {
-    if (!batch_all_cols_[i].metadata().is_fixed_length ||
-        batch_all_cols_[i].metadata().is_null_type) {
-      i += 1;
-      continue;
-    }
-    bool can_process_pair =
-        (i + 1 < num_cols) && batch_all_cols_[i + 1].metadata().is_fixed_length &&
-        EncoderBinaryPair::CanProcessPair(batch_all_cols_[i].metadata(),
-                                          batch_all_cols_[i + 1].metadata());
-    if (!can_process_pair) {
-      EncoderBinary::Decode(static_cast<uint32_t>(start_row_input),
-                            static_cast<uint32_t>(num_rows),
-                            row_metadata_.column_offsets[i], rows, &batch_all_cols_[i],
-                            ctx_, &temp_buffer_A);
-      i += 1;
-    } else {
-      EncoderBinaryPair::Decode(
-          static_cast<uint32_t>(start_row_input), static_cast<uint32_t>(num_rows),
-          row_metadata_.column_offsets[i], rows, &batch_all_cols_[i],
-          &batch_all_cols_[i + 1], ctx_, &temp_buffer_A, &temp_buffer_B);
-      i += 2;
-    }
-  }
-
-  // Process nulls
-  EncoderNulls::Decode(static_cast<uint32_t>(start_row_input),
-                       static_cast<uint32_t>(num_rows), rows, &batch_all_cols_);
-}
-
-void KeyEncoder::DecodeVaryingLengthBuffers(int64_t start_row_input,
-                                            int64_t start_row_output, int64_t num_rows,
-                                            const KeyRowArray& rows,
-                                            std::vector<KeyColumnArray>* cols) {
-  // Prepare column array vectors
-  PrepareKeyColumnArrays(start_row_output, num_rows, *cols);
-
-  bool is_row_fixed_length = row_metadata_.is_fixed_length;
-  if (!is_row_fixed_length) {
-    for (size_t i = 0; i < batch_varbinary_cols_.size(); ++i) {
-      // Memcpy varbinary fields into precomputed in the previous step
-      // positions in the output row buffer.
-      EncoderVarBinary::Decode(static_cast<uint32_t>(start_row_input),
-                               static_cast<uint32_t>(num_rows), static_cast<uint32_t>(i),
-                               rows, &batch_varbinary_cols_[i], ctx_);
-    }
-  }
-}
-
-template <class COPY_FN, class SET_NULL_FN>
-void KeyEncoder::EncoderBinary::EncodeSelectedImp(
-    uint32_t offset_within_row, KeyRowArray* rows, const KeyColumnArray& col,
-    uint32_t num_selected, const uint16_t* selection, COPY_FN copy_fn,
-    SET_NULL_FN set_null_fn) {
-  bool is_fixed_length = rows->metadata().is_fixed_length;
-  if (is_fixed_length) {
-    uint32_t row_width = rows->metadata().fixed_length;
-    const uint8_t* src_base = col.data(1);
-    uint8_t* dst = rows->mutable_data(1) + offset_within_row;
-    for (uint32_t i = 0; i < num_selected; ++i) {
-      copy_fn(dst, src_base, selection[i]);
-      dst += row_width;
-    }
-    if (col.data(0)) {
-      const uint8_t* non_null_bits = col.data(0);
-      uint8_t* dst = rows->mutable_data(1) + offset_within_row;
-      for (uint32_t i = 0; i < num_selected; ++i) {
-        bool is_null = !bit_util::GetBit(non_null_bits, selection[i] + col.bit_offset(0));
-        if (is_null) {
-          set_null_fn(dst);
-        }
-        dst += row_width;
-      }
-    }
-  } else {
-    const uint8_t* src_base = col.data(1);
-    uint8_t* dst = rows->mutable_data(2) + offset_within_row;
-    const uint32_t* offsets = rows->offsets();
-    for (uint32_t i = 0; i < num_selected; ++i) {
-      copy_fn(dst + offsets[i], src_base, selection[i]);
-    }
-    if (col.data(0)) {
-      const uint8_t* non_null_bits = col.data(0);
-      uint8_t* dst = rows->mutable_data(2) + offset_within_row;
-      const uint32_t* offsets = rows->offsets();
-      for (uint32_t i = 0; i < num_selected; ++i) {
-        bool is_null = !bit_util::GetBit(non_null_bits, selection[i] + col.bit_offset(0));
-        if (is_null) {
-          set_null_fn(dst + offsets[i]);
-        }
-      }
-    }
-  }
-}
-
-void KeyEncoder::EncoderBinary::EncodeSelected(uint32_t offset_within_row,
-                                               KeyRowArray* rows,
-                                               const KeyColumnArray& col,
-                                               uint32_t num_selected,
-                                               const uint16_t* selection) {
-  if (col.metadata().is_null_type) {
-    return;
-  }
-  uint32_t col_width = col.metadata().fixed_length;
-  if (col_width == 0) {
-    int bit_offset = col.bit_offset(1);
-    EncodeSelectedImp(
-        offset_within_row, rows, col, num_selected, selection,
-        [bit_offset](uint8_t* dst, const uint8_t* src_base, uint16_t irow) {
-          *dst = bit_util::GetBit(src_base, irow + bit_offset) ? 0xff : 0x00;
-        },
-        [](uint8_t* dst) { *dst = 0xae; });
-  } else if (col_width == 1) {
-    EncodeSelectedImp(
-        offset_within_row, rows, col, num_selected, selection,
-        [](uint8_t* dst, const uint8_t* src_base, uint16_t irow) {
-          *dst = src_base[irow];
-        },
-        [](uint8_t* dst) { *dst = 0xae; });
-  } else if (col_width == 2) {
-    EncodeSelectedImp(
-        offset_within_row, rows, col, num_selected, selection,
-        [](uint8_t* dst, const uint8_t* src_base, uint16_t irow) {
-          *reinterpret_cast<uint16_t*>(dst) =
-              reinterpret_cast<const uint16_t*>(src_base)[irow];
-        },
-        [](uint8_t* dst) { *reinterpret_cast<uint16_t*>(dst) = 0xaeae; });
-  } else if (col_width == 4) {
-    EncodeSelectedImp(
-        offset_within_row, rows, col, num_selected, selection,
-        [](uint8_t* dst, const uint8_t* src_base, uint16_t irow) {
-          *reinterpret_cast<uint32_t*>(dst) =
-              reinterpret_cast<const uint32_t*>(src_base)[irow];
-        },
-        [](uint8_t* dst) {
-          *reinterpret_cast<uint32_t*>(dst) = static_cast<uint32_t>(0xaeaeaeae);
-        });
-  } else if (col_width == 8) {
-    EncodeSelectedImp(
-        offset_within_row, rows, col, num_selected, selection,
-        [](uint8_t* dst, const uint8_t* src_base, uint16_t irow) {
-          *reinterpret_cast<uint64_t*>(dst) =
-              reinterpret_cast<const uint64_t*>(src_base)[irow];
-        },
-        [](uint8_t* dst) { *reinterpret_cast<uint64_t*>(dst) = 0xaeaeaeaeaeaeaeaeULL; });
-  } else {
-    EncodeSelectedImp(
-        offset_within_row, rows, col, num_selected, selection,
-        [col_width](uint8_t* dst, const uint8_t* src_base, uint16_t irow) {
-          memcpy(dst, src_base + col_width * irow, col_width);
-        },
-        [col_width](uint8_t* dst) { memset(dst, 0xae, col_width); });
-  }
-}
-
-void KeyEncoder::EncoderOffsets::GetRowOffsetsSelected(
-    KeyRowArray* rows, const std::vector<KeyColumnArray>& cols, uint32_t num_selected,
-    const uint16_t* selection) {
+void EncoderOffsets::GetRowOffsetsSelected(RowTableImpl* rows,
+                                           const std::vector<KeyColumnArray>& cols,
+                                           uint32_t num_selected,
+                                           const uint16_t* selection) {
   if (rows->metadata().is_fixed_length) {
     return;
   }
@@ -1094,7 +671,7 @@ void KeyEncoder::EncoderOffsets::GetRowOffsetsSelected(
       for (uint32_t i = 0; i < num_selected; ++i) {
         uint32_t irow = selection[i];
         uint32_t length = col_offsets[irow + 1] - col_offsets[irow];
-        row_offsets[i] += KeyRowMetadata::padding_for_alignment(
+        row_offsets[i] += RowTableMetadata::padding_for_alignment(
             row_offsets[i], rows->metadata().string_alignment);
         row_offsets[i] += length;
       }
@@ -1118,7 +695,7 @@ void KeyEncoder::EncoderOffsets::GetRowOffsetsSelected(
   int row_alignment = rows->metadata().row_alignment;
   for (uint32_t i = 0; i < num_selected; ++i) {
     uint32_t length = row_offsets[i];
-    length += KeyRowMetadata::padding_for_alignment(length, row_alignment);
+    length += RowTableMetadata::padding_for_alignment(length, row_alignment);
     row_offsets[i] = sum;
     sum += length;
   }
@@ -1126,9 +703,9 @@ void KeyEncoder::EncoderOffsets::GetRowOffsetsSelected(
 }
 
 template <bool has_nulls, bool is_first_varbinary>
-void KeyEncoder::EncoderOffsets::EncodeSelectedImp(
-    uint32_t ivarbinary, KeyRowArray* rows, const std::vector<KeyColumnArray>& cols,
-    uint32_t num_selected, const uint16_t* selection) {
+void EncoderOffsets::EncodeSelectedImp(uint32_t ivarbinary, RowTableImpl* rows,
+                                       const std::vector<KeyColumnArray>& cols,
+                                       uint32_t num_selected, const uint16_t* selection) {
   const uint32_t* row_offsets = rows->offsets();
   uint8_t* row_base = rows->mutable_data(2) +
                       rows->metadata().varbinary_end_array_offset +
@@ -1150,17 +727,16 @@ void KeyEncoder::EncoderOffsets::EncodeSelectedImp(
       row[0] = rows->metadata().fixed_length + length;
     } else {
       row[0] = row[-1] +
-               KeyRowMetadata::padding_for_alignment(row[-1],
-                                                     rows->metadata().string_alignment) +
+               RowTableMetadata::padding_for_alignment(
+                   row[-1], rows->metadata().string_alignment) +
                length;
     }
   }
 }
 
-void KeyEncoder::EncoderOffsets::EncodeSelected(KeyRowArray* rows,
-                                                const std::vector<KeyColumnArray>& cols,
-                                                uint32_t num_selected,
-                                                const uint16_t* selection) {
+void EncoderOffsets::EncodeSelected(RowTableImpl* rows,
+                                    const std::vector<KeyColumnArray>& cols,
+                                    uint32_t num_selected, const uint16_t* selection) {
   if (rows->metadata().is_fixed_length) {
     return;
   }
@@ -1182,10 +758,79 @@ void KeyEncoder::EncoderOffsets::EncodeSelected(KeyRowArray* rows,
   }
 }
 
-void KeyEncoder::EncoderVarBinary::EncodeSelected(uint32_t ivarbinary, KeyRowArray* rows,
-                                                  const KeyColumnArray& cols,
-                                                  uint32_t num_selected,
-                                                  const uint16_t* selection) {
+void EncoderVarBinary::Decode(uint32_t start_row, uint32_t num_rows,
+                              uint32_t varbinary_col_id, const RowTableImpl& rows,
+                              KeyColumnArray* col, LightContext* ctx) {
+  // Output column varbinary buffer needs an extra 32B
+  // at the end in avx2 version and 8B otherwise.
+#if defined(ARROW_HAVE_AVX2)
+  if (ctx->has_avx2()) {
+    DecodeHelper_avx2(start_row, num_rows, varbinary_col_id, rows, col);
+  } else {
+#endif
+    if (varbinary_col_id == 0) {
+      DecodeImp<true>(start_row, num_rows, varbinary_col_id, rows, col);
+    } else {
+      DecodeImp<false>(start_row, num_rows, varbinary_col_id, rows, col);
+    }
+#if defined(ARROW_HAVE_AVX2)
+  }
+#endif
+}
+
+template <bool first_varbinary_col>
+void EncoderVarBinary::DecodeImp(uint32_t start_row, uint32_t num_rows,
+                                 uint32_t varbinary_col_id, const RowTableImpl& rows,
+                                 KeyColumnArray* col) {
+  DecodeHelper<first_varbinary_col>(
+      start_row, num_rows, varbinary_col_id, &rows, nullptr, col, col,
+      [](uint8_t* dst, const uint8_t* src, int64_t length) {
+        for (uint32_t istripe = 0; istripe < bit_util::CeilDiv(length, 8); ++istripe) {
+          auto dst64 = reinterpret_cast<uint64_t*>(dst);
+          auto src64 = reinterpret_cast<const uint64_t*>(src);
+          util::SafeStore(dst64 + istripe, src64[istripe]);
+        }
+      });
+}
+
+void EncoderNulls::Decode(uint32_t start_row, uint32_t num_rows, const RowTableImpl& rows,
+                          std::vector<KeyColumnArray>* cols) {
+  // Every output column needs to have a space for exactly the required number
+  // of rows. It also needs to have non-nulls bit-vector allocated and mutable.
+  DCHECK_GT(cols->size(), 0);
+  for (auto& col : *cols) {
+    DCHECK(col.length() == num_rows);
+    DCHECK(col.mutable_data(0) || col.metadata().is_null_type);
+  }
+
+  const uint8_t* null_masks = rows.null_masks();
+  uint32_t null_masks_bytes_per_row = rows.metadata().null_masks_bytes_per_row;
+  for (size_t col = 0; col < cols->size(); ++col) {
+    if ((*cols)[col].metadata().is_null_type) {
+      continue;
+    }
+    uint8_t* non_nulls = (*cols)[col].mutable_data(0);
+    const int bit_offset = (*cols)[col].bit_offset(0);
+    DCHECK_LT(bit_offset, 8);
+    non_nulls[0] |= 0xff << (bit_offset);
+    if (bit_offset + num_rows > 8) {
+      int bits_in_first_byte = 8 - bit_offset;
+      memset(non_nulls + 1, 0xff, bit_util::BytesForBits(num_rows - bits_in_first_byte));
+    }
+    for (uint32_t row = 0; row < num_rows; ++row) {
+      uint32_t null_masks_bit_id =
+          (start_row + row) * null_masks_bytes_per_row * 8 + static_cast<uint32_t>(col);
+      bool is_set = bit_util::GetBit(null_masks, null_masks_bit_id);
+      if (is_set) {
+        bit_util::ClearBit(non_nulls, bit_offset + row);
+      }
+    }
+  }
+}
+
+void EncoderVarBinary::EncodeSelected(uint32_t ivarbinary, RowTableImpl* rows,
+                                      const KeyColumnArray& cols, uint32_t num_selected,
+                                      const uint16_t* selection) {
   const uint32_t* row_offsets = rows->offsets();
   uint8_t* row_base = rows->mutable_data(2);
   const uint32_t* col_offsets = cols.offsets();
@@ -1213,10 +858,9 @@ void KeyEncoder::EncoderVarBinary::EncodeSelected(uint32_t ivarbinary, KeyRowArr
   }
 }
 
-void KeyEncoder::EncoderNulls::EncodeSelected(KeyRowArray* rows,
-                                              const std::vector<KeyColumnArray>& cols,
-                                              uint32_t num_selected,
-                                              const uint16_t* selection) {
+void EncoderNulls::EncodeSelected(RowTableImpl* rows,
+                                  const std::vector<KeyColumnArray>& cols,
+                                  uint32_t num_selected, const uint16_t* selection) {
   uint8_t* null_masks = rows->null_masks();
   uint32_t null_mask_num_bytes = rows->metadata().null_masks_bytes_per_row;
   memset(null_masks, 0, null_mask_num_bytes * num_selected);
@@ -1234,44 +878,5 @@ void KeyEncoder::EncoderNulls::EncodeSelected(KeyRowArray* rows,
   }
 }
 
-void KeyEncoder::PrepareEncodeSelected(int64_t start_row, int64_t num_rows,
-                                       const std::vector<KeyColumnArray>& cols) {
-  // Prepare column array vectors
-  PrepareKeyColumnArrays(start_row, num_rows, cols);
-}
-
-Status KeyEncoder::EncodeSelected(KeyRowArray* rows, uint32_t num_selected,
-                                  const uint16_t* selection) {
-  rows->Clean();
-  RETURN_NOT_OK(
-      rows->AppendEmpty(static_cast<uint32_t>(num_selected), static_cast<uint32_t>(0)));
-
-  EncoderOffsets::GetRowOffsetsSelected(rows, batch_varbinary_cols_, num_selected,
-                                        selection);
-
-  RETURN_NOT_OK(rows->AppendEmpty(static_cast<uint32_t>(0),
-                                  static_cast<uint32_t>(rows->offsets()[num_selected])));
-
-  for (size_t icol = 0; icol < batch_all_cols_.size(); ++icol) {
-    if (batch_all_cols_[icol].metadata().is_fixed_length) {
-      uint32_t offset_within_row = rows->metadata().column_offsets[icol];
-      EncoderBinary::EncodeSelected(offset_within_row, rows, batch_all_cols_[icol],
-                                    num_selected, selection);
-    }
-  }
-
-  EncoderOffsets::EncodeSelected(rows, batch_varbinary_cols_, num_selected, selection);
-
-  for (size_t icol = 0; icol < batch_varbinary_cols_.size(); ++icol) {
-    EncoderVarBinary::EncodeSelected(static_cast<uint32_t>(icol), rows,
-                                     batch_varbinary_cols_[icol], num_selected,
-                                     selection);
-  }
-
-  EncoderNulls::EncodeSelected(rows, batch_all_cols_, num_selected, selection);
-
-  return Status::OK();
-}
-
 }  // namespace compute
 }  // namespace arrow
diff --git a/cpp/src/arrow/compute/row/encode_internal.h b/cpp/src/arrow/compute/row/encode_internal.h
new file mode 100644
index 0000000000..ce88731346
--- /dev/null
+++ b/cpp/src/arrow/compute/row/encode_internal.h
@@ -0,0 +1,323 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <cstdint>
+#include <memory>
+#include <vector>
+
+#include "arrow/array/data.h"
+#include "arrow/compute/exec.h"
+#include "arrow/compute/exec/util.h"
+#include "arrow/compute/light_array.h"
+#include "arrow/compute/row/row_internal.h"
+#include "arrow/memory_pool.h"
+#include "arrow/result.h"
+#include "arrow/status.h"
+#include "arrow/util/bit_util.h"
+
+namespace arrow {
+namespace compute {
+
+/// Converts between Arrow's typical column representation to a row-based representation
+///
+/// Data is stored as a single array of rows.  Each row combines data from all columns.
+/// The conversion is reversible.
+///
+/// Row-oriented storage is beneficial when there is a need for random access
+/// of individual rows and at the same time all included columns are likely to
+/// be accessed together, as in the case of hash table key.
+///
+/// Does not support nested types
+class RowTableEncoder {
+ public:
+  void Init(const std::vector<KeyColumnMetadata>& cols, LightContext* ctx,
+            int row_alignment, int string_alignment);
+
+  const RowTableMetadata& row_metadata() { return row_metadata_; }
+  // GrouperFastImpl right now needs somewhat intrusive visibility into RowTableEncoder
+  // This could be cleaned up at some point
+  const std::vector<KeyColumnArray>& batch_all_cols() { return batch_all_cols_; }
+
+  /// \brief Prepare to encode a collection of columns
+  /// \param start_row The starting row to encode
+  /// \param num_rows The number of rows to encode
+  /// \param cols The columns to encode.  The order of the columns should
+  ///             be consistent with the order used to create the RowTableMetadata
+  void PrepareEncodeSelected(int64_t start_row, int64_t num_rows,
+                             const std::vector<KeyColumnArray>& cols);
+  /// \brief Encode selection of prepared rows into a row table
+  /// \param rows The output row table
+  /// \param num_selected The number of rows to encode
+  /// \param selection indices of the rows to encode
+  Status EncodeSelected(RowTableImpl* rows, uint32_t num_selected,
+                        const uint16_t* selection);
+
+  /// \brief Decode a window of row oriented data into a corresponding
+  ///        window of column oriented storage.
+  /// \param start_row_input The starting row to decode
+  /// \param start_row_output An offset into the output array to write to
+  /// \param num_rows The number of rows to decode
+  /// \param rows The row table to decode from
+  /// \param cols The columns to decode into, should be sized appropriately
+  ///
+  /// The output buffers need to be correctly allocated and sized before
+  /// calling each method.  For that reason decoding is split into two functions.
+  /// DecodeFixedLengthBuffers processes everything except for varying length
+  /// buffers.
+  /// The output can be used to find out required varying length buffers sizes
+  /// for the call to DecodeVaryingLengthBuffers
+  void DecodeFixedLengthBuffers(int64_t start_row_input, int64_t start_row_output,
+                                int64_t num_rows, const RowTableImpl& rows,
+                                std::vector<KeyColumnArray>* cols);
+
+  /// \brief Decode the varlength columns of a row table into column storage
+  /// \param start_row_input The starting row to decode
+  /// \param start_row_output An offset into the output arrays
+  /// \param num_rows The number of rows to decode
+  /// \param rows The row table to decode from
+  /// \param cols The column arrays to decode into
+  void DecodeVaryingLengthBuffers(int64_t start_row_input, int64_t start_row_output,
+                                  int64_t num_rows, const RowTableImpl& rows,
+                                  std::vector<KeyColumnArray>* cols);
+
+ private:
+  /// Prepare column array vectors.
+  /// Output column arrays represent a range of input column arrays
+  /// specified by starting row and number of rows.
+  /// Three vectors are generated:
+  /// - all columns
+  /// - fixed-length columns only
+  /// - varying-length columns only
+  void PrepareKeyColumnArrays(int64_t start_row, int64_t num_rows,
+                              const std::vector<KeyColumnArray>& cols_in);
+
+  LightContext* ctx_;
+
+  // Data initialized once, based on data types of key columns
+  RowTableMetadata row_metadata_;
+
+  // Data initialized for each input batch.
+  // All elements are ordered according to the order of encoded fields in a row.
+  std::vector<KeyColumnArray> batch_all_cols_;
+  std::vector<KeyColumnArray> batch_varbinary_cols_;
+  std::vector<uint32_t> batch_varbinary_cols_base_offsets_;
+};
+
+class EncoderInteger {
+ public:
+  static void Decode(uint32_t start_row, uint32_t num_rows, uint32_t offset_within_row,
+                     const RowTableImpl& rows, KeyColumnArray* col, LightContext* ctx,
+                     KeyColumnArray* temp);
+  static bool UsesTransform(const KeyColumnArray& column);
+  static KeyColumnArray ArrayReplace(const KeyColumnArray& column,
+                                     const KeyColumnArray& temp);
+  static void PostDecode(const KeyColumnArray& input, KeyColumnArray* output,
+                         LightContext* ctx);
+
+ private:
+  static bool IsBoolean(const KeyColumnMetadata& metadata);
+};
+
+class EncoderBinary {
+ public:
+  static void EncodeSelected(uint32_t offset_within_row, RowTableImpl* rows,
+                             const KeyColumnArray& col, uint32_t num_selected,
+                             const uint16_t* selection);
+  static void Decode(uint32_t start_row, uint32_t num_rows, uint32_t offset_within_row,
+                     const RowTableImpl& rows, KeyColumnArray* col, LightContext* ctx,
+                     KeyColumnArray* temp);
+  static bool IsInteger(const KeyColumnMetadata& metadata);
+
+ private:
+  template <class COPY_FN, class SET_NULL_FN>
+  static void EncodeSelectedImp(uint32_t offset_within_row, RowTableImpl* rows,
+                                const KeyColumnArray& col, uint32_t num_selected,
+                                const uint16_t* selection, COPY_FN copy_fn,
+                                SET_NULL_FN set_null_fn);
+
+  template <bool is_row_fixed_length, class COPY_FN>
+  static inline void DecodeHelper(uint32_t start_row, uint32_t num_rows,
+                                  uint32_t offset_within_row,
+                                  const RowTableImpl* rows_const,
+                                  RowTableImpl* rows_mutable_maybe_null,
+                                  const KeyColumnArray* col_const,
+                                  KeyColumnArray* col_mutable_maybe_null,
+                                  COPY_FN copy_fn) {
+    ARROW_DCHECK(col_const && col_const->metadata().is_fixed_length);
+    uint32_t col_width = col_const->metadata().fixed_length;
+
+    if (is_row_fixed_length) {
+      uint32_t row_width = rows_const->metadata().fixed_length;
+      for (uint32_t i = 0; i < num_rows; ++i) {
+        const uint8_t* src;
+        uint8_t* dst;
+        src = rows_const->data(1) + row_width * (start_row + i) + offset_within_row;
+        dst = col_mutable_maybe_null->mutable_data(1) + col_width * i;
+        copy_fn(dst, src, col_width);
+      }
+    } else {
+      const uint32_t* row_offsets = rows_const->offsets();
+      for (uint32_t i = 0; i < num_rows; ++i) {
+        const uint8_t* src;
+        uint8_t* dst;
+        src = rows_const->data(2) + row_offsets[start_row + i] + offset_within_row;
+        dst = col_mutable_maybe_null->mutable_data(1) + col_width * i;
+        copy_fn(dst, src, col_width);
+      }
+    }
+  }
+
+  template <bool is_row_fixed_length>
+  static void DecodeImp(uint32_t start_row, uint32_t num_rows, uint32_t offset_within_row,
+                        const RowTableImpl& rows, KeyColumnArray* col);
+#if defined(ARROW_HAVE_AVX2)
+  static void DecodeHelper_avx2(bool is_row_fixed_length, uint32_t start_row,
+                                uint32_t num_rows, uint32_t offset_within_row,
+                                const RowTableImpl& rows, KeyColumnArray* col);
+  template <bool is_row_fixed_length>
+  static void DecodeImp_avx2(uint32_t start_row, uint32_t num_rows,
+                             uint32_t offset_within_row, const RowTableImpl& rows,
+                             KeyColumnArray* col);
+#endif
+};
+
+class EncoderBinaryPair {
+ public:
+  static bool CanProcessPair(const KeyColumnMetadata& col1,
+                             const KeyColumnMetadata& col2) {
+    return EncoderBinary::IsInteger(col1) && EncoderBinary::IsInteger(col2);
+  }
+  static void Decode(uint32_t start_row, uint32_t num_rows, uint32_t offset_within_row,
+                     const RowTableImpl& rows, KeyColumnArray* col1, KeyColumnArray* col2,
+                     LightContext* ctx, KeyColumnArray* temp1, KeyColumnArray* temp2);
+
+ private:
+  template <bool is_row_fixed_length, typename col1_type, typename col2_type>
+  static void DecodeImp(uint32_t num_rows_to_skip, uint32_t start_row, uint32_t num_rows,
+                        uint32_t offset_within_row, const RowTableImpl& rows,
+                        KeyColumnArray* col1, KeyColumnArray* col2);
+#if defined(ARROW_HAVE_AVX2)
+  static uint32_t DecodeHelper_avx2(bool is_row_fixed_length, uint32_t col_width,
+                                    uint32_t start_row, uint32_t num_rows,
+                                    uint32_t offset_within_row, const RowTableImpl& rows,
+                                    KeyColumnArray* col1, KeyColumnArray* col2);
+  template <bool is_row_fixed_length, uint32_t col_width>
+  static uint32_t DecodeImp_avx2(uint32_t start_row, uint32_t num_rows,
+                                 uint32_t offset_within_row, const RowTableImpl& rows,
+                                 KeyColumnArray* col1, KeyColumnArray* col2);
+#endif
+};
+
+class EncoderOffsets {
+ public:
+  static void GetRowOffsetsSelected(RowTableImpl* rows,
+                                    const std::vector<KeyColumnArray>& cols,
+                                    uint32_t num_selected, const uint16_t* selection);
+  static void EncodeSelected(RowTableImpl* rows, const std::vector<KeyColumnArray>& cols,
+                             uint32_t num_selected, const uint16_t* selection);
+
+  static void Decode(uint32_t start_row, uint32_t num_rows, const RowTableImpl& rows,
+                     std::vector<KeyColumnArray>* varbinary_cols,
+                     const std::vector<uint32_t>& varbinary_cols_base_offset,
+                     LightContext* ctx);
+
+ private:
+  template <bool has_nulls, bool is_first_varbinary>
+  static void EncodeSelectedImp(uint32_t ivarbinary, RowTableImpl* rows,
+                                const std::vector<KeyColumnArray>& cols,
+                                uint32_t num_selected, const uint16_t* selection);
+};
+
+class EncoderVarBinary {
+ public:
+  static void EncodeSelected(uint32_t ivarbinary, RowTableImpl* rows,
+                             const KeyColumnArray& cols, uint32_t num_selected,
+                             const uint16_t* selection);
+
+  static void Decode(uint32_t start_row, uint32_t num_rows, uint32_t varbinary_col_id,
+                     const RowTableImpl& rows, KeyColumnArray* col, LightContext* ctx);
+
+ private:
+  template <bool first_varbinary_col, class COPY_FN>
+  static inline void DecodeHelper(uint32_t start_row, uint32_t num_rows,
+                                  uint32_t varbinary_col_id,
+                                  const RowTableImpl* rows_const,
+                                  RowTableImpl* rows_mutable_maybe_null,
+                                  const KeyColumnArray* col_const,
+                                  KeyColumnArray* col_mutable_maybe_null,
+                                  COPY_FN copy_fn) {
+    // Column and rows need to be varying length
+    ARROW_DCHECK(!rows_const->metadata().is_fixed_length &&
+                 !col_const->metadata().is_fixed_length);
+
+    const uint32_t* row_offsets_for_batch = rows_const->offsets() + start_row;
+    const uint32_t* col_offsets = col_const->offsets();
+
+    uint32_t col_offset_next = col_offsets[0];
+    for (uint32_t i = 0; i < num_rows; ++i) {
+      uint32_t col_offset = col_offset_next;
+      col_offset_next = col_offsets[i + 1];
+
+      uint32_t row_offset = row_offsets_for_batch[i];
+      const uint8_t* row = rows_const->data(2) + row_offset;
+
+      uint32_t offset_within_row;
+      uint32_t length;
+      if (first_varbinary_col) {
+        rows_const->metadata().first_varbinary_offset_and_length(row, &offset_within_row,
+                                                                 &length);
+      } else {
+        rows_const->metadata().nth_varbinary_offset_and_length(
+            row, varbinary_col_id, &offset_within_row, &length);
+      }
+
+      row_offset += offset_within_row;
+
+      const uint8_t* src;
+      uint8_t* dst;
+      src = rows_const->data(2) + row_offset;
+      dst = col_mutable_maybe_null->mutable_data(2) + col_offset;
+      copy_fn(dst, src, length);
+    }
+  }
+  template <bool first_varbinary_col>
+  static void DecodeImp(uint32_t start_row, uint32_t num_rows, uint32_t varbinary_col_id,
+                        const RowTableImpl& rows, KeyColumnArray* col);
+#if defined(ARROW_HAVE_AVX2)
+  static void DecodeHelper_avx2(uint32_t start_row, uint32_t num_rows,
+                                uint32_t varbinary_col_id, const RowTableImpl& rows,
+                                KeyColumnArray* col);
+  template <bool first_varbinary_col>
+  static void DecodeImp_avx2(uint32_t start_row, uint32_t num_rows,
+                             uint32_t varbinary_col_id, const RowTableImpl& rows,
+                             KeyColumnArray* col);
+#endif
+};
+
+class EncoderNulls {
+ public:
+  static void EncodeSelected(RowTableImpl* rows, const std::vector<KeyColumnArray>& cols,
+                             uint32_t num_selected, const uint16_t* selection);
+
+  static void Decode(uint32_t start_row, uint32_t num_rows, const RowTableImpl& rows,
+                     std::vector<KeyColumnArray>* cols);
+};
+
+}  // namespace compute
+}  // namespace arrow
diff --git a/cpp/src/arrow/compute/exec/key_encode_avx2.cc b/cpp/src/arrow/compute/row/encode_internal_avx2.cc
similarity index 83%
rename from cpp/src/arrow/compute/exec/key_encode_avx2.cc
rename to cpp/src/arrow/compute/row/encode_internal_avx2.cc
index 832bb0361d..02ba310bde 100644
--- a/cpp/src/arrow/compute/exec/key_encode_avx2.cc
+++ b/cpp/src/arrow/compute/row/encode_internal_avx2.cc
@@ -17,18 +17,16 @@
 
 #include <immintrin.h>
 
-#include "arrow/compute/exec/key_encode.h"
+#include "arrow/compute/row/encode_internal.h"
 
 namespace arrow {
 namespace compute {
 
 #if defined(ARROW_HAVE_AVX2)
 
-void KeyEncoder::EncoderBinary::DecodeHelper_avx2(bool is_row_fixed_length,
-                                                  uint32_t start_row, uint32_t num_rows,
-                                                  uint32_t offset_within_row,
-                                                  const KeyRowArray& rows,
-                                                  KeyColumnArray* col) {
+void EncoderBinary::DecodeHelper_avx2(bool is_row_fixed_length, uint32_t start_row,
+                                      uint32_t num_rows, uint32_t offset_within_row,
+                                      const RowTableImpl& rows, KeyColumnArray* col) {
   if (is_row_fixed_length) {
     DecodeImp_avx2<true>(start_row, num_rows, offset_within_row, rows, col);
   } else {
@@ -37,10 +35,9 @@ void KeyEncoder::EncoderBinary::DecodeHelper_avx2(bool is_row_fixed_length,
 }
 
 template <bool is_row_fixed_length>
-void KeyEncoder::EncoderBinary::DecodeImp_avx2(uint32_t start_row, uint32_t num_rows,
-                                               uint32_t offset_within_row,
-                                               const KeyRowArray& rows,
-                                               KeyColumnArray* col) {
+void EncoderBinary::DecodeImp_avx2(uint32_t start_row, uint32_t num_rows,
+                                   uint32_t offset_within_row, const RowTableImpl& rows,
+                                   KeyColumnArray* col) {
   DecodeHelper<is_row_fixed_length>(
       start_row, num_rows, offset_within_row, &rows, nullptr, col, col,
       [](uint8_t* dst, const uint8_t* src, int64_t length) {
@@ -52,13 +49,13 @@ void KeyEncoder::EncoderBinary::DecodeImp_avx2(uint32_t start_row, uint32_t num_
       });
 }
 
-uint32_t KeyEncoder::EncoderBinaryPair::DecodeHelper_avx2(
+uint32_t EncoderBinaryPair::DecodeHelper_avx2(
     bool is_row_fixed_length, uint32_t col_width, uint32_t start_row, uint32_t num_rows,
-    uint32_t offset_within_row, const KeyRowArray& rows, KeyColumnArray* col1,
+    uint32_t offset_within_row, const RowTableImpl& rows, KeyColumnArray* col1,
     KeyColumnArray* col2) {
   using DecodeImp_avx2_t =
       uint32_t (*)(uint32_t start_row, uint32_t num_rows, uint32_t offset_within_row,
-                   const KeyRowArray& rows, KeyColumnArray* col1, KeyColumnArray* col2);
+                   const RowTableImpl& rows, KeyColumnArray* col1, KeyColumnArray* col2);
   static const DecodeImp_avx2_t DecodeImp_avx2_fn[] = {
       DecodeImp_avx2<false, 1>, DecodeImp_avx2<false, 2>, DecodeImp_avx2<false, 4>,
       DecodeImp_avx2<false, 8>, DecodeImp_avx2<true, 1>,  DecodeImp_avx2<true, 2>,
@@ -70,9 +67,10 @@ uint32_t KeyEncoder::EncoderBinaryPair::DecodeHelper_avx2(
 }
 
 template <bool is_row_fixed_length, uint32_t col_width>
-uint32_t KeyEncoder::EncoderBinaryPair::DecodeImp_avx2(
-    uint32_t start_row, uint32_t num_rows, uint32_t offset_within_row,
-    const KeyRowArray& rows, KeyColumnArray* col1, KeyColumnArray* col2) {
+uint32_t EncoderBinaryPair::DecodeImp_avx2(uint32_t start_row, uint32_t num_rows,
+                                           uint32_t offset_within_row,
+                                           const RowTableImpl& rows, KeyColumnArray* col1,
+                                           KeyColumnArray* col2) {
   ARROW_DCHECK(col_width == 1 || col_width == 2 || col_width == 4 || col_width == 8);
 
   uint8_t* col_vals_A = col1->mutable_data(1);
@@ -207,11 +205,9 @@ uint32_t KeyEncoder::EncoderBinaryPair::DecodeImp_avx2(
   return num_processed;
 }
 
-void KeyEncoder::EncoderVarBinary::DecodeHelper_avx2(uint32_t start_row,
-                                                     uint32_t num_rows,
-                                                     uint32_t varbinary_col_id,
-                                                     const KeyRowArray& rows,
-                                                     KeyColumnArray* col) {
+void EncoderVarBinary::DecodeHelper_avx2(uint32_t start_row, uint32_t num_rows,
+                                         uint32_t varbinary_col_id,
+                                         const RowTableImpl& rows, KeyColumnArray* col) {
   if (varbinary_col_id == 0) {
     DecodeImp_avx2<true>(start_row, num_rows, varbinary_col_id, rows, col);
   } else {
@@ -220,10 +216,9 @@ void KeyEncoder::EncoderVarBinary::DecodeHelper_avx2(uint32_t start_row,
 }
 
 template <bool first_varbinary_col>
-void KeyEncoder::EncoderVarBinary::DecodeImp_avx2(uint32_t start_row, uint32_t num_rows,
-                                                  uint32_t varbinary_col_id,
-                                                  const KeyRowArray& rows,
-                                                  KeyColumnArray* col) {
+void EncoderVarBinary::DecodeImp_avx2(uint32_t start_row, uint32_t num_rows,
+                                      uint32_t varbinary_col_id, const RowTableImpl& rows,
+                                      KeyColumnArray* col) {
   DecodeHelper<first_varbinary_col>(
       start_row, num_rows, varbinary_col_id, &rows, nullptr, col, col,
       [](uint8_t* dst, const uint8_t* src, int64_t length) {
diff --git a/cpp/src/arrow/compute/row/grouper.cc b/cpp/src/arrow/compute/row/grouper.cc
new file mode 100644
index 0000000000..eb55124b17
--- /dev/null
+++ b/cpp/src/arrow/compute/row/grouper.cc
@@ -0,0 +1,590 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/compute/row/grouper.h"
+
+#include <mutex>
+
+#include "arrow/compute/exec/key_hash.h"
+#include "arrow/compute/exec/key_map.h"
+#include "arrow/compute/exec/options.h"
+#include "arrow/compute/exec_internal.h"
+#include "arrow/compute/function.h"
+#include "arrow/compute/kernels/row_encoder.h"
+#include "arrow/compute/light_array.h"
+#include "arrow/compute/registry.h"
+#include "arrow/compute/row/compare_internal.h"
+#include "arrow/type.h"
+#include "arrow/util/bitmap_ops.h"
+#include "arrow/util/checked_cast.h"
+#include "arrow/util/cpu_info.h"
+#include "arrow/util/logging.h"
+#include "arrow/util/make_unique.h"
+#include "arrow/util/task_group.h"
+
+namespace arrow {
+
+using internal::checked_cast;
+
+namespace compute {
+
+namespace {
+
+struct GrouperImpl : Grouper {
+  static Result<std::unique_ptr<GrouperImpl>> Make(const std::vector<ValueDescr>& keys,
+                                                   ExecContext* ctx) {
+    auto impl = ::arrow::internal::make_unique<GrouperImpl>();
+
+    impl->encoders_.resize(keys.size());
+    impl->ctx_ = ctx;
+
+    for (size_t i = 0; i < keys.size(); ++i) {
+      const auto& key = keys[i].type;
+
+      if (key->id() == Type::BOOL) {
+        impl->encoders_[i] =
+            ::arrow::internal::make_unique<internal::BooleanKeyEncoder>();
+        continue;
+      }
+
+      if (key->id() == Type::DICTIONARY) {
+        impl->encoders_[i] =
+            ::arrow::internal::make_unique<internal::DictionaryKeyEncoder>(
+                key, ctx->memory_pool());
+        continue;
+      }
+
+      if (is_fixed_width(key->id())) {
+        impl->encoders_[i] =
+            ::arrow::internal::make_unique<internal::FixedWidthKeyEncoder>(key);
+        continue;
+      }
+
+      if (is_binary_like(key->id())) {
+        impl->encoders_[i] =
+            ::arrow::internal::make_unique<internal::VarLengthKeyEncoder<BinaryType>>(
+                key);
+        continue;
+      }
+
+      if (is_large_binary_like(key->id())) {
+        impl->encoders_[i] = ::arrow::internal::make_unique<
+            internal::VarLengthKeyEncoder<LargeBinaryType>>(key);
+        continue;
+      }
+
+      if (key->id() == Type::NA) {
+        impl->encoders_[i] = ::arrow::internal::make_unique<internal::NullKeyEncoder>();
+        continue;
+      }
+
+      return Status::NotImplemented("Keys of type ", *key);
+    }
+
+    return std::move(impl);
+  }
+
+  Result<Datum> Consume(const ExecBatch& batch) override {
+    std::vector<int32_t> offsets_batch(batch.length + 1);
+    for (int i = 0; i < batch.num_values(); ++i) {
+      encoders_[i]->AddLength(batch[i], batch.length, offsets_batch.data());
+    }
+
+    int32_t total_length = 0;
+    for (int64_t i = 0; i < batch.length; ++i) {
+      auto total_length_before = total_length;
+      total_length += offsets_batch[i];
+      offsets_batch[i] = total_length_before;
+    }
+    offsets_batch[batch.length] = total_length;
+
+    std::vector<uint8_t> key_bytes_batch(total_length);
+    std::vector<uint8_t*> key_buf_ptrs(batch.length);
+    for (int64_t i = 0; i < batch.length; ++i) {
+      key_buf_ptrs[i] = key_bytes_batch.data() + offsets_batch[i];
+    }
+
+    for (int i = 0; i < batch.num_values(); ++i) {
+      RETURN_NOT_OK(encoders_[i]->Encode(batch[i], batch.length, key_buf_ptrs.data()));
+    }
+
+    TypedBufferBuilder<uint32_t> group_ids_batch(ctx_->memory_pool());
+    RETURN_NOT_OK(group_ids_batch.Resize(batch.length));
+
+    for (int64_t i = 0; i < batch.length; ++i) {
+      int32_t key_length = offsets_batch[i + 1] - offsets_batch[i];
+      std::string key(
+          reinterpret_cast<const char*>(key_bytes_batch.data() + offsets_batch[i]),
+          key_length);
+
+      auto it_success = map_.emplace(key, num_groups_);
+      auto group_id = it_success.first->second;
+
+      if (it_success.second) {
+        // new key; update offsets and key_bytes
+        ++num_groups_;
+        // Skip if there are no keys
+        if (key_length > 0) {
+          auto next_key_offset = static_cast<int32_t>(key_bytes_.size());
+          key_bytes_.resize(next_key_offset + key_length);
+          offsets_.push_back(next_key_offset + key_length);
+          memcpy(key_bytes_.data() + next_key_offset, key.c_str(), key_length);
+        }
+      }
+
+      group_ids_batch.UnsafeAppend(group_id);
+    }
+
+    ARROW_ASSIGN_OR_RAISE(auto group_ids, group_ids_batch.Finish());
+    return Datum(UInt32Array(batch.length, std::move(group_ids)));
+  }
+
+  uint32_t num_groups() const override { return num_groups_; }
+
+  Result<ExecBatch> GetUniques() override {
+    ExecBatch out({}, num_groups_);
+
+    std::vector<uint8_t*> key_buf_ptrs(num_groups_);
+    for (int64_t i = 0; i < num_groups_; ++i) {
+      key_buf_ptrs[i] = key_bytes_.data() + offsets_[i];
+    }
+
+    out.values.resize(encoders_.size());
+    for (size_t i = 0; i < encoders_.size(); ++i) {
+      ARROW_ASSIGN_OR_RAISE(
+          out.values[i],
+          encoders_[i]->Decode(key_buf_ptrs.data(), static_cast<int32_t>(num_groups_),
+                               ctx_->memory_pool()));
+    }
+
+    return out;
+  }
+
+  ExecContext* ctx_;
+  std::unordered_map<std::string, uint32_t> map_;
+  std::vector<int32_t> offsets_ = {0};
+  std::vector<uint8_t> key_bytes_;
+  uint32_t num_groups_ = 0;
+  std::vector<std::unique_ptr<internal::KeyEncoder>> encoders_;
+};
+
+struct GrouperFastImpl : Grouper {
+  static constexpr int kBitmapPaddingForSIMD = 64;  // bits
+  static constexpr int kPaddingForSIMD = 32;        // bytes
+
+  static bool CanUse(const std::vector<ValueDescr>& keys) {
+#if ARROW_LITTLE_ENDIAN
+    for (size_t i = 0; i < keys.size(); ++i) {
+      const auto& key = keys[i].type;
+      if (is_large_binary_like(key->id())) {
+        return false;
+      }
+    }
+    return true;
+#else
+    return false;
+#endif
+  }
+
+  static Result<std::unique_ptr<GrouperFastImpl>> Make(
+      const std::vector<ValueDescr>& keys, ExecContext* ctx) {
+    auto impl = ::arrow::internal::make_unique<GrouperFastImpl>();
+    impl->ctx_ = ctx;
+
+    RETURN_NOT_OK(impl->temp_stack_.Init(ctx->memory_pool(), 64 * minibatch_size_max_));
+    impl->encode_ctx_.hardware_flags =
+        arrow::internal::CpuInfo::GetInstance()->hardware_flags();
+    impl->encode_ctx_.stack = &impl->temp_stack_;
+
+    auto num_columns = keys.size();
+    impl->col_metadata_.resize(num_columns);
+    impl->key_types_.resize(num_columns);
+    impl->dictionaries_.resize(num_columns);
+    for (size_t icol = 0; icol < num_columns; ++icol) {
+      const auto& key = keys[icol].type;
+      if (key->id() == Type::DICTIONARY) {
+        auto bit_width = checked_cast<const FixedWidthType&>(*key).bit_width();
+        ARROW_DCHECK(bit_width % 8 == 0);
+        impl->col_metadata_[icol] = KeyColumnMetadata(true, bit_width / 8);
+      } else if (key->id() == Type::BOOL) {
+        impl->col_metadata_[icol] = KeyColumnMetadata(true, 0);
+      } else if (is_fixed_width(key->id())) {
+        impl->col_metadata_[icol] = KeyColumnMetadata(
+            true, checked_cast<const FixedWidthType&>(*key).bit_width() / 8);
+      } else if (is_binary_like(key->id())) {
+        impl->col_metadata_[icol] = KeyColumnMetadata(false, sizeof(uint32_t));
+      } else if (key->id() == Type::NA) {
+        impl->col_metadata_[icol] = KeyColumnMetadata(true, 0, /*is_null_type_in=*/true);
+      } else {
+        return Status::NotImplemented("Keys of type ", *key);
+      }
+      impl->key_types_[icol] = key;
+    }
+
+    impl->encoder_.Init(impl->col_metadata_, &impl->encode_ctx_,
+                        /* row_alignment = */ sizeof(uint64_t),
+                        /* string_alignment = */ sizeof(uint64_t));
+    RETURN_NOT_OK(impl->rows_.Init(ctx->memory_pool(), impl->encoder_.row_metadata()));
+    RETURN_NOT_OK(
+        impl->rows_minibatch_.Init(ctx->memory_pool(), impl->encoder_.row_metadata()));
+    impl->minibatch_size_ = impl->minibatch_size_min_;
+    GrouperFastImpl* impl_ptr = impl.get();
+    auto equal_func = [impl_ptr](
+                          int num_keys_to_compare, const uint16_t* selection_may_be_null,
+                          const uint32_t* group_ids, uint32_t* out_num_keys_mismatch,
+                          uint16_t* out_selection_mismatch) {
+      KeyCompare::CompareColumnsToRows(
+          num_keys_to_compare, selection_may_be_null, group_ids, &impl_ptr->encode_ctx_,
+          out_num_keys_mismatch, out_selection_mismatch,
+          impl_ptr->encoder_.batch_all_cols(), impl_ptr->rows_);
+    };
+    auto append_func = [impl_ptr](int num_keys, const uint16_t* selection) {
+      RETURN_NOT_OK(impl_ptr->encoder_.EncodeSelected(&impl_ptr->rows_minibatch_,
+                                                      num_keys, selection));
+      return impl_ptr->rows_.AppendSelectionFrom(impl_ptr->rows_minibatch_, num_keys,
+                                                 nullptr);
+    };
+    RETURN_NOT_OK(impl->map_.init(impl->encode_ctx_.hardware_flags, ctx->memory_pool(),
+                                  impl->encode_ctx_.stack, impl->log_minibatch_max_,
+                                  equal_func, append_func));
+    impl->cols_.resize(num_columns);
+    impl->minibatch_hashes_.resize(impl->minibatch_size_max_ +
+                                   kPaddingForSIMD / sizeof(uint32_t));
+
+    return std::move(impl);
+  }
+
+  ~GrouperFastImpl() { map_.cleanup(); }
+
+  Result<Datum> Consume(const ExecBatch& batch) override {
+    // ARROW-14027: broadcast scalar arguments for now
+    for (int i = 0; i < batch.num_values(); i++) {
+      if (batch.values[i].is_scalar()) {
+        ExecBatch expanded = batch;
+        for (int j = i; j < expanded.num_values(); j++) {
+          if (expanded.values[j].is_scalar()) {
+            ARROW_ASSIGN_OR_RAISE(
+                expanded.values[j],
+                MakeArrayFromScalar(*expanded.values[j].scalar(), expanded.length,
+                                    ctx_->memory_pool()));
+          }
+        }
+        return ConsumeImpl(expanded);
+      }
+    }
+    return ConsumeImpl(batch);
+  }
+
+  Result<Datum> ConsumeImpl(const ExecBatch& batch) {
+    int64_t num_rows = batch.length;
+    int num_columns = batch.num_values();
+    // Process dictionaries
+    for (int icol = 0; icol < num_columns; ++icol) {
+      if (key_types_[icol]->id() == Type::DICTIONARY) {
+        auto data = batch[icol].array();
+        auto dict = MakeArray(data->dictionary);
+        if (dictionaries_[icol]) {
+          if (!dictionaries_[icol]->Equals(dict)) {
+            // TODO(bkietz) unify if necessary. For now, just error if any batch's
+            // dictionary differs from the first we saw for this key
+            return Status::NotImplemented("Unifying differing dictionaries");
+          }
+        } else {
+          dictionaries_[icol] = std::move(dict);
+        }
+      }
+    }
+
+    std::shared_ptr<arrow::Buffer> group_ids;
+    ARROW_ASSIGN_OR_RAISE(
+        group_ids, AllocateBuffer(sizeof(uint32_t) * num_rows, ctx_->memory_pool()));
+
+    for (int icol = 0; icol < num_columns; ++icol) {
+      const uint8_t* non_nulls = NULLPTR;
+      const uint8_t* fixedlen = NULLPTR;
+      const uint8_t* varlen = NULLPTR;
+
+      // Skip if the key's type is NULL
+      if (key_types_[icol]->id() != Type::NA) {
+        if (batch[icol].array()->buffers[0] != NULLPTR) {
+          non_nulls = batch[icol].array()->buffers[0]->data();
+        }
+        fixedlen = batch[icol].array()->buffers[1]->data();
+        if (!col_metadata_[icol].is_fixed_length) {
+          varlen = batch[icol].array()->buffers[2]->data();
+        }
+      }
+
+      int64_t offset = batch[icol].array()->offset;
+
+      auto col_base = KeyColumnArray(col_metadata_[icol], offset + num_rows, non_nulls,
+                                     fixedlen, varlen);
+
+      cols_[icol] = col_base.Slice(offset, num_rows);
+    }
+
+    // Split into smaller mini-batches
+    //
+    for (uint32_t start_row = 0; start_row < num_rows;) {
+      uint32_t batch_size_next = std::min(static_cast<uint32_t>(minibatch_size_),
+                                          static_cast<uint32_t>(num_rows) - start_row);
+
+      // Encode
+      rows_minibatch_.Clean();
+      encoder_.PrepareEncodeSelected(start_row, batch_size_next, cols_);
+
+      // Compute hash
+      Hashing32::HashMultiColumn(encoder_.batch_all_cols(), &encode_ctx_,
+                                 minibatch_hashes_.data());
+
+      // Map
+      auto match_bitvector =
+          util::TempVectorHolder<uint8_t>(&temp_stack_, (batch_size_next + 7) / 8);
+      {
+        auto local_slots = util::TempVectorHolder<uint8_t>(&temp_stack_, batch_size_next);
+        map_.early_filter(batch_size_next, minibatch_hashes_.data(),
+                          match_bitvector.mutable_data(), local_slots.mutable_data());
+        map_.find(batch_size_next, minibatch_hashes_.data(),
+                  match_bitvector.mutable_data(), local_slots.mutable_data(),
+                  reinterpret_cast<uint32_t*>(group_ids->mutable_data()) + start_row);
+      }
+      auto ids = util::TempVectorHolder<uint16_t>(&temp_stack_, batch_size_next);
+      int num_ids;
+      util::bit_util::bits_to_indexes(0, encode_ctx_.hardware_flags, batch_size_next,
+                                      match_bitvector.mutable_data(), &num_ids,
+                                      ids.mutable_data());
+
+      RETURN_NOT_OK(map_.map_new_keys(
+          num_ids, ids.mutable_data(), minibatch_hashes_.data(),
+          reinterpret_cast<uint32_t*>(group_ids->mutable_data()) + start_row));
+
+      start_row += batch_size_next;
+
+      if (minibatch_size_ * 2 <= minibatch_size_max_) {
+        minibatch_size_ *= 2;
+      }
+    }
+
+    return Datum(UInt32Array(batch.length, std::move(group_ids)));
+  }
+
+  uint32_t num_groups() const override { return static_cast<uint32_t>(rows_.length()); }
+
+  // Make sure padded buffers end up with the right logical size
+
+  Result<std::shared_ptr<Buffer>> AllocatePaddedBitmap(int64_t length) {
+    ARROW_ASSIGN_OR_RAISE(
+        std::shared_ptr<Buffer> buf,
+        AllocateBitmap(length + kBitmapPaddingForSIMD, ctx_->memory_pool()));
+    return SliceMutableBuffer(buf, 0, bit_util::BytesForBits(length));
+  }
+
+  Result<std::shared_ptr<Buffer>> AllocatePaddedBuffer(int64_t size) {
+    ARROW_ASSIGN_OR_RAISE(
+        std::shared_ptr<Buffer> buf,
+        AllocateBuffer(size + kBitmapPaddingForSIMD, ctx_->memory_pool()));
+    return SliceMutableBuffer(buf, 0, size);
+  }
+
+  Result<ExecBatch> GetUniques() override {
+    auto num_columns = static_cast<uint32_t>(col_metadata_.size());
+    int64_t num_groups = rows_.length();
+
+    std::vector<std::shared_ptr<Buffer>> non_null_bufs(num_columns);
+    std::vector<std::shared_ptr<Buffer>> fixedlen_bufs(num_columns);
+    std::vector<std::shared_ptr<Buffer>> varlen_bufs(num_columns);
+
+    for (size_t i = 0; i < num_columns; ++i) {
+      if (col_metadata_[i].is_null_type) {
+        uint8_t* non_nulls = NULLPTR;
+        uint8_t* fixedlen = NULLPTR;
+        cols_[i] =
+            KeyColumnArray(col_metadata_[i], num_groups, non_nulls, fixedlen, NULLPTR);
+        continue;
+      }
+      ARROW_ASSIGN_OR_RAISE(non_null_bufs[i], AllocatePaddedBitmap(num_groups));
+      if (col_metadata_[i].is_fixed_length && !col_metadata_[i].is_null_type) {
+        if (col_metadata_[i].fixed_length == 0) {
+          ARROW_ASSIGN_OR_RAISE(fixedlen_bufs[i], AllocatePaddedBitmap(num_groups));
+        } else {
+          ARROW_ASSIGN_OR_RAISE(
+              fixedlen_bufs[i],
+              AllocatePaddedBuffer(num_groups * col_metadata_[i].fixed_length));
+        }
+      } else {
+        ARROW_ASSIGN_OR_RAISE(fixedlen_bufs[i],
+                              AllocatePaddedBuffer((num_groups + 1) * sizeof(uint32_t)));
+      }
+      cols_[i] =
+          KeyColumnArray(col_metadata_[i], num_groups, non_null_bufs[i]->mutable_data(),
+                         fixedlen_bufs[i]->mutable_data(), nullptr);
+    }
+
+    for (int64_t start_row = 0; start_row < num_groups;) {
+      int64_t batch_size_next =
+          std::min(num_groups - start_row, static_cast<int64_t>(minibatch_size_max_));
+      encoder_.DecodeFixedLengthBuffers(start_row, start_row, batch_size_next, rows_,
+                                        &cols_);
+      start_row += batch_size_next;
+    }
+
+    if (!rows_.metadata().is_fixed_length) {
+      for (size_t i = 0; i < num_columns; ++i) {
+        if (!col_metadata_[i].is_fixed_length) {
+          auto varlen_size =
+              reinterpret_cast<const uint32_t*>(fixedlen_bufs[i]->data())[num_groups];
+          ARROW_ASSIGN_OR_RAISE(varlen_bufs[i], AllocatePaddedBuffer(varlen_size));
+          cols_[i] = KeyColumnArray(
+              col_metadata_[i], num_groups, non_null_bufs[i]->mutable_data(),
+              fixedlen_bufs[i]->mutable_data(), varlen_bufs[i]->mutable_data());
+        }
+      }
+
+      for (int64_t start_row = 0; start_row < num_groups;) {
+        int64_t batch_size_next =
+            std::min(num_groups - start_row, static_cast<int64_t>(minibatch_size_max_));
+        encoder_.DecodeVaryingLengthBuffers(start_row, start_row, batch_size_next, rows_,
+                                            &cols_);
+        start_row += batch_size_next;
+      }
+    }
+
+    ExecBatch out({}, num_groups);
+    out.values.resize(num_columns);
+    for (size_t i = 0; i < num_columns; ++i) {
+      if (col_metadata_[i].is_null_type) {
+        out.values[i] = ArrayData::Make(null(), num_groups, {nullptr}, num_groups);
+        continue;
+      }
+      auto valid_count = arrow::internal::CountSetBits(
+          non_null_bufs[i]->data(), /*offset=*/0, static_cast<int64_t>(num_groups));
+      int null_count = static_cast<int>(num_groups) - static_cast<int>(valid_count);
+
+      if (col_metadata_[i].is_fixed_length) {
+        out.values[i] = ArrayData::Make(
+            key_types_[i], num_groups,
+            {std::move(non_null_bufs[i]), std::move(fixedlen_bufs[i])}, null_count);
+      } else {
+        out.values[i] =
+            ArrayData::Make(key_types_[i], num_groups,
+                            {std::move(non_null_bufs[i]), std::move(fixedlen_bufs[i]),
+                             std::move(varlen_bufs[i])},
+                            null_count);
+      }
+    }
+
+    // Process dictionaries
+    for (size_t icol = 0; icol < num_columns; ++icol) {
+      if (key_types_[icol]->id() == Type::DICTIONARY) {
+        if (dictionaries_[icol]) {
+          out.values[icol].array()->dictionary = dictionaries_[icol]->data();
+        } else {
+          ARROW_ASSIGN_OR_RAISE(auto dict, MakeArrayOfNull(key_types_[icol], 0));
+          out.values[icol].array()->dictionary = dict->data();
+        }
+      }
+    }
+
+    return out;
+  }
+
+  static constexpr int log_minibatch_max_ = 10;
+  static constexpr int minibatch_size_max_ = 1 << log_minibatch_max_;
+  static constexpr int minibatch_size_min_ = 128;
+  int minibatch_size_;
+
+  ExecContext* ctx_;
+  arrow::util::TempVectorStack temp_stack_;
+  LightContext encode_ctx_;
+
+  std::vector<std::shared_ptr<arrow::DataType>> key_types_;
+  std::vector<KeyColumnMetadata> col_metadata_;
+  std::vector<KeyColumnArray> cols_;
+  std::vector<uint32_t> minibatch_hashes_;
+
+  std::vector<std::shared_ptr<Array>> dictionaries_;
+
+  RowTableImpl rows_;
+  RowTableImpl rows_minibatch_;
+  RowTableEncoder encoder_;
+  SwissTable map_;
+};
+
+}  // namespace
+
+Result<std::unique_ptr<Grouper>> Grouper::Make(const std::vector<ValueDescr>& descrs,
+                                               ExecContext* ctx) {
+  if (GrouperFastImpl::CanUse(descrs)) {
+    return GrouperFastImpl::Make(descrs, ctx);
+  }
+  return GrouperImpl::Make(descrs, ctx);
+}
+
+Result<std::shared_ptr<ListArray>> Grouper::ApplyGroupings(const ListArray& groupings,
+                                                           const Array& array,
+                                                           ExecContext* ctx) {
+  ARROW_ASSIGN_OR_RAISE(Datum sorted,
+                        compute::Take(array, groupings.data()->child_data[0],
+                                      TakeOptions::NoBoundsCheck(), ctx));
+
+  return std::make_shared<ListArray>(list(array.type()), groupings.length(),
+                                     groupings.value_offsets(), sorted.make_array());
+}
+
+Result<std::shared_ptr<ListArray>> Grouper::MakeGroupings(const UInt32Array& ids,
+                                                          uint32_t num_groups,
+                                                          ExecContext* ctx) {
+  if (ids.null_count() != 0) {
+    return Status::Invalid("MakeGroupings with null ids");
+  }
+
+  ARROW_ASSIGN_OR_RAISE(auto offsets, AllocateBuffer(sizeof(int32_t) * (num_groups + 1),
+                                                     ctx->memory_pool()));
+  auto raw_offsets = reinterpret_cast<int32_t*>(offsets->mutable_data());
+
+  std::memset(raw_offsets, 0, offsets->size());
+  for (int i = 0; i < ids.length(); ++i) {
+    DCHECK_LT(ids.Value(i), num_groups);
+    raw_offsets[ids.Value(i)] += 1;
+  }
+  int32_t length = 0;
+  for (uint32_t id = 0; id < num_groups; ++id) {
+    auto offset = raw_offsets[id];
+    raw_offsets[id] = length;
+    length += offset;
+  }
+  raw_offsets[num_groups] = length;
+  DCHECK_EQ(ids.length(), length);
+
+  ARROW_ASSIGN_OR_RAISE(auto offsets_copy,
+                        offsets->CopySlice(0, offsets->size(), ctx->memory_pool()));
+  raw_offsets = reinterpret_cast<int32_t*>(offsets_copy->mutable_data());
+
+  ARROW_ASSIGN_OR_RAISE(auto sort_indices, AllocateBuffer(sizeof(int32_t) * ids.length(),
+                                                          ctx->memory_pool()));
+  auto raw_sort_indices = reinterpret_cast<int32_t*>(sort_indices->mutable_data());
+  for (int i = 0; i < ids.length(); ++i) {
+    raw_sort_indices[raw_offsets[ids.Value(i)]++] = i;
+  }
+
+  return std::make_shared<ListArray>(
+      list(int32()), num_groups, std::move(offsets),
+      std::make_shared<Int32Array>(ids.length(), std::move(sort_indices)));
+}
+
+}  // namespace compute
+}  // namespace arrow
diff --git a/cpp/src/arrow/compute/row/grouper.h b/cpp/src/arrow/compute/row/grouper.h
new file mode 100644
index 0000000000..8281b75317
--- /dev/null
+++ b/cpp/src/arrow/compute/row/grouper.h
@@ -0,0 +1,112 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <memory>
+#include <vector>
+
+#include "arrow/compute/exec.h"
+#include "arrow/compute/exec/options.h"
+#include "arrow/compute/kernel.h"
+#include "arrow/datum.h"
+#include "arrow/result.h"
+#include "arrow/util/visibility.h"
+
+namespace arrow {
+namespace compute {
+
+/// Consumes batches of keys and yields batches of the group ids.
+class ARROW_EXPORT Grouper {
+ public:
+  virtual ~Grouper() = default;
+
+  /// Construct a Grouper which receives the specified key types
+  static Result<std::unique_ptr<Grouper>> Make(const std::vector<ValueDescr>& descrs,
+                                               ExecContext* ctx = default_exec_context());
+
+  /// Consume a batch of keys, producing the corresponding group ids as an integer array.
+  /// Currently only uint32 indices will be produced, eventually the bit width will only
+  /// be as wide as necessary.
+  virtual Result<Datum> Consume(const ExecBatch& batch) = 0;
+
+  /// Get current unique keys. May be called multiple times.
+  virtual Result<ExecBatch> GetUniques() = 0;
+
+  /// Get the current number of groups.
+  virtual uint32_t num_groups() const = 0;
+
+  /// \brief Assemble lists of indices of identical elements.
+  ///
+  /// \param[in] ids An unsigned, all-valid integral array which will be
+  ///                used as grouping criteria.
+  /// \param[in] num_groups An upper bound for the elements of ids
+  /// \param[in] ctx Execution context to use during the operation
+  /// \return A num_groups-long ListArray where the slot at i contains a
+  ///         list of indices where i appears in ids.
+  ///
+  ///   MakeGroupings([
+  ///       2,
+  ///       2,
+  ///       5,
+  ///       5,
+  ///       2,
+  ///       3
+  ///   ], 8) == [
+  ///       [],
+  ///       [],
+  ///       [0, 1, 4],
+  ///       [5],
+  ///       [],
+  ///       [2, 3],
+  ///       [],
+  ///       []
+  ///   ]
+  static Result<std::shared_ptr<ListArray>> MakeGroupings(
+      const UInt32Array& ids, uint32_t num_groups,
+      ExecContext* ctx = default_exec_context());
+
+  /// \brief Produce a ListArray whose slots are selections of `array` which correspond to
+  /// the provided groupings.
+  ///
+  /// For example,
+  ///   ApplyGroupings([
+  ///       [],
+  ///       [],
+  ///       [0, 1, 4],
+  ///       [5],
+  ///       [],
+  ///       [2, 3],
+  ///       [],
+  ///       []
+  ///   ], [2, 2, 5, 5, 2, 3]) == [
+  ///       [],
+  ///       [],
+  ///       [2, 2, 2],
+  ///       [3],
+  ///       [],
+  ///       [5, 5],
+  ///       [],
+  ///       []
+  ///   ]
+  static Result<std::shared_ptr<ListArray>> ApplyGroupings(
+      const ListArray& groupings, const Array& array,
+      ExecContext* ctx = default_exec_context());
+};
+
+}  // namespace compute
+}  // namespace arrow
diff --git a/cpp/src/arrow/compute/row/row_internal.cc b/cpp/src/arrow/compute/row/row_internal.cc
new file mode 100644
index 0000000000..e99ff75d64
--- /dev/null
+++ b/cpp/src/arrow/compute/row/row_internal.cc
@@ -0,0 +1,409 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/compute/row/row_internal.h"
+
+#include "arrow/compute/exec/util.h"
+
+namespace arrow {
+namespace compute {
+
+uint32_t RowTableMetadata::num_varbinary_cols() const {
+  uint32_t result = 0;
+  for (auto column_metadata : column_metadatas) {
+    if (!column_metadata.is_fixed_length) {
+      ++result;
+    }
+  }
+  return result;
+}
+
+bool RowTableMetadata::is_compatible(const RowTableMetadata& other) const {
+  if (other.num_cols() != num_cols()) {
+    return false;
+  }
+  if (row_alignment != other.row_alignment ||
+      string_alignment != other.string_alignment) {
+    return false;
+  }
+  for (size_t i = 0; i < column_metadatas.size(); ++i) {
+    if (column_metadatas[i].is_fixed_length !=
+        other.column_metadatas[i].is_fixed_length) {
+      return false;
+    }
+    if (column_metadatas[i].fixed_length != other.column_metadatas[i].fixed_length) {
+      return false;
+    }
+  }
+  return true;
+}
+
+void RowTableMetadata::FromColumnMetadataVector(
+    const std::vector<KeyColumnMetadata>& cols, int in_row_alignment,
+    int in_string_alignment) {
+  column_metadatas.resize(cols.size());
+  for (size_t i = 0; i < cols.size(); ++i) {
+    column_metadatas[i] = cols[i];
+  }
+
+  const auto num_cols = static_cast<uint32_t>(cols.size());
+
+  // Sort columns.
+  //
+  // Columns are sorted based on the size in bytes of their fixed-length part.
+  // For the varying-length column, the fixed-length part is the 32-bit field storing
+  // cumulative length of varying-length fields.
+  //
+  // The rules are:
+  //
+  // a) Boolean column, marked with fixed-length 0, is considered to have fixed-length
+  // part of 1 byte.
+  //
+  // b) Columns with fixed-length part being power of 2 or multiple of row
+  // alignment precede other columns. They are sorted in decreasing order of the size of
+  // their fixed-length part.
+  //
+  // c) Fixed-length columns precede varying-length columns when
+  // both have the same size fixed-length part.
+  //
+  column_order.resize(num_cols);
+  for (uint32_t i = 0; i < num_cols; ++i) {
+    column_order[i] = i;
+  }
+  std::sort(
+      column_order.begin(), column_order.end(), [&cols](uint32_t left, uint32_t right) {
+        bool is_left_pow2 =
+            !cols[left].is_fixed_length || ARROW_POPCOUNT64(cols[left].fixed_length) <= 1;
+        bool is_right_pow2 = !cols[right].is_fixed_length ||
+                             ARROW_POPCOUNT64(cols[right].fixed_length) <= 1;
+        bool is_left_fixedlen = cols[left].is_fixed_length;
+        bool is_right_fixedlen = cols[right].is_fixed_length;
+        uint32_t width_left =
+            cols[left].is_fixed_length ? cols[left].fixed_length : sizeof(uint32_t);
+        uint32_t width_right =
+            cols[right].is_fixed_length ? cols[right].fixed_length : sizeof(uint32_t);
+        if (is_left_pow2 != is_right_pow2) {
+          return is_left_pow2;
+        }
+        if (!is_left_pow2) {
+          return left < right;
+        }
+        if (width_left != width_right) {
+          return width_left > width_right;
+        }
+        if (is_left_fixedlen != is_right_fixedlen) {
+          return is_left_fixedlen;
+        }
+        return left < right;
+      });
+
+  row_alignment = in_row_alignment;
+  string_alignment = in_string_alignment;
+  varbinary_end_array_offset = 0;
+
+  column_offsets.resize(num_cols);
+  uint32_t num_varbinary_cols = 0;
+  uint32_t offset_within_row = 0;
+  for (uint32_t i = 0; i < num_cols; ++i) {
+    const KeyColumnMetadata& col = cols[column_order[i]];
+    if (col.is_fixed_length && col.fixed_length != 0 &&
+        ARROW_POPCOUNT64(col.fixed_length) != 1) {
+      offset_within_row += RowTableMetadata::padding_for_alignment(offset_within_row,
+                                                                   string_alignment, col);
+    }
+    column_offsets[i] = offset_within_row;
+    if (!col.is_fixed_length) {
+      if (num_varbinary_cols == 0) {
+        varbinary_end_array_offset = offset_within_row;
+      }
+      DCHECK(column_offsets[i] - varbinary_end_array_offset ==
+             num_varbinary_cols * sizeof(uint32_t));
+      ++num_varbinary_cols;
+      offset_within_row += sizeof(uint32_t);
+    } else {
+      // Boolean column is a bit-vector, which is indicated by
+      // setting fixed length in column metadata to zero.
+      // It will be stored as a byte in output row.
+      if (col.fixed_length == 0) {
+        offset_within_row += 1;
+      } else {
+        offset_within_row += col.fixed_length;
+      }
+    }
+  }
+
+  is_fixed_length = (num_varbinary_cols == 0);
+  fixed_length =
+      offset_within_row +
+      RowTableMetadata::padding_for_alignment(
+          offset_within_row, num_varbinary_cols == 0 ? row_alignment : string_alignment);
+
+  // We set the number of bytes per row storing null masks of individual key columns
+  // to be a power of two. This is not required. It could be also set to the minimal
+  // number of bytes required for a given number of bits (one bit per column).
+  null_masks_bytes_per_row = 1;
+  while (static_cast<uint32_t>(null_masks_bytes_per_row * 8) < num_cols) {
+    null_masks_bytes_per_row *= 2;
+  }
+}
+
+RowTableImpl::RowTableImpl() : pool_(nullptr), rows_capacity_(0), bytes_capacity_(0) {}
+
+Status RowTableImpl::Init(MemoryPool* pool, const RowTableMetadata& metadata) {
+  pool_ = pool;
+  metadata_ = metadata;
+
+  DCHECK(!null_masks_ && !offsets_ && !rows_);
+
+  constexpr int64_t kInitialRowsCapacity = 8;
+  constexpr int64_t kInitialBytesCapacity = 1024;
+
+  // Null masks
+  ARROW_ASSIGN_OR_RAISE(
+      auto null_masks,
+      AllocateResizableBuffer(size_null_masks(kInitialRowsCapacity), pool_));
+  null_masks_ = std::move(null_masks);
+  memset(null_masks_->mutable_data(), 0, size_null_masks(kInitialRowsCapacity));
+
+  // Offsets and rows
+  if (!metadata.is_fixed_length) {
+    ARROW_ASSIGN_OR_RAISE(
+        auto offsets, AllocateResizableBuffer(size_offsets(kInitialRowsCapacity), pool_));
+    offsets_ = std::move(offsets);
+    memset(offsets_->mutable_data(), 0, size_offsets(kInitialRowsCapacity));
+    reinterpret_cast<uint32_t*>(offsets_->mutable_data())[0] = 0;
+
+    ARROW_ASSIGN_OR_RAISE(
+        auto rows,
+        AllocateResizableBuffer(size_rows_varying_length(kInitialBytesCapacity), pool_));
+    rows_ = std::move(rows);
+    memset(rows_->mutable_data(), 0, size_rows_varying_length(kInitialBytesCapacity));
+    bytes_capacity_ =
+        size_rows_varying_length(kInitialBytesCapacity) - kPaddingForVectors;
+  } else {
+    ARROW_ASSIGN_OR_RAISE(
+        auto rows,
+        AllocateResizableBuffer(size_rows_fixed_length(kInitialRowsCapacity), pool_));
+    rows_ = std::move(rows);
+    memset(rows_->mutable_data(), 0, size_rows_fixed_length(kInitialRowsCapacity));
+    bytes_capacity_ = size_rows_fixed_length(kInitialRowsCapacity) - kPaddingForVectors;
+  }
+
+  UpdateBufferPointers();
+
+  rows_capacity_ = kInitialRowsCapacity;
+
+  num_rows_ = 0;
+  num_rows_for_has_any_nulls_ = 0;
+  has_any_nulls_ = false;
+
+  return Status::OK();
+}
+
+void RowTableImpl::Clean() {
+  num_rows_ = 0;
+  num_rows_for_has_any_nulls_ = 0;
+  has_any_nulls_ = false;
+
+  if (!metadata_.is_fixed_length) {
+    reinterpret_cast<uint32_t*>(offsets_->mutable_data())[0] = 0;
+  }
+}
+
+int64_t RowTableImpl::size_null_masks(int64_t num_rows) const {
+  return num_rows * metadata_.null_masks_bytes_per_row + kPaddingForVectors;
+}
+
+int64_t RowTableImpl::size_offsets(int64_t num_rows) const {
+  return (num_rows + 1) * sizeof(uint32_t) + kPaddingForVectors;
+}
+
+int64_t RowTableImpl::size_rows_fixed_length(int64_t num_rows) const {
+  return num_rows * metadata_.fixed_length + kPaddingForVectors;
+}
+
+int64_t RowTableImpl::size_rows_varying_length(int64_t num_bytes) const {
+  return num_bytes + kPaddingForVectors;
+}
+
+void RowTableImpl::UpdateBufferPointers() {
+  buffers_[0] = null_masks_->mutable_data();
+  if (metadata_.is_fixed_length) {
+    buffers_[1] = rows_->mutable_data();
+    buffers_[2] = nullptr;
+  } else {
+    buffers_[1] = offsets_->mutable_data();
+    buffers_[2] = rows_->mutable_data();
+  }
+}
+
+Status RowTableImpl::ResizeFixedLengthBuffers(int64_t num_extra_rows) {
+  if (rows_capacity_ >= num_rows_ + num_extra_rows) {
+    return Status::OK();
+  }
+
+  int64_t rows_capacity_new = std::max(static_cast<int64_t>(1), 2 * rows_capacity_);
+  while (rows_capacity_new < num_rows_ + num_extra_rows) {
+    rows_capacity_new *= 2;
+  }
+
+  // Null masks
+  RETURN_NOT_OK(null_masks_->Resize(size_null_masks(rows_capacity_new), false));
+  memset(null_masks_->mutable_data() + size_null_masks(rows_capacity_), 0,
+         size_null_masks(rows_capacity_new) - size_null_masks(rows_capacity_));
+
+  // Either offsets or rows
+  if (!metadata_.is_fixed_length) {
+    RETURN_NOT_OK(offsets_->Resize(size_offsets(rows_capacity_new), false));
+    memset(offsets_->mutable_data() + size_offsets(rows_capacity_), 0,
+           size_offsets(rows_capacity_new) - size_offsets(rows_capacity_));
+  } else {
+    RETURN_NOT_OK(rows_->Resize(size_rows_fixed_length(rows_capacity_new), false));
+    memset(rows_->mutable_data() + size_rows_fixed_length(rows_capacity_), 0,
+           size_rows_fixed_length(rows_capacity_new) -
+               size_rows_fixed_length(rows_capacity_));
+    bytes_capacity_ = size_rows_fixed_length(rows_capacity_new) - kPaddingForVectors;
+  }
+
+  UpdateBufferPointers();
+
+  rows_capacity_ = rows_capacity_new;
+
+  return Status::OK();
+}
+
+Status RowTableImpl::ResizeOptionalVaryingLengthBuffer(int64_t num_extra_bytes) {
+  int64_t num_bytes = offsets()[num_rows_];
+  if (bytes_capacity_ >= num_bytes + num_extra_bytes || metadata_.is_fixed_length) {
+    return Status::OK();
+  }
+
+  int64_t bytes_capacity_new = std::max(static_cast<int64_t>(1), 2 * bytes_capacity_);
+  while (bytes_capacity_new < num_bytes + num_extra_bytes) {
+    bytes_capacity_new *= 2;
+  }
+
+  RETURN_NOT_OK(rows_->Resize(size_rows_varying_length(bytes_capacity_new), false));
+  memset(rows_->mutable_data() + size_rows_varying_length(bytes_capacity_), 0,
+         size_rows_varying_length(bytes_capacity_new) -
+             size_rows_varying_length(bytes_capacity_));
+
+  UpdateBufferPointers();
+
+  bytes_capacity_ = bytes_capacity_new;
+
+  return Status::OK();
+}
+
+Status RowTableImpl::AppendSelectionFrom(const RowTableImpl& from,
+                                         uint32_t num_rows_to_append,
+                                         const uint16_t* source_row_ids) {
+  DCHECK(metadata_.is_compatible(from.metadata()));
+
+  RETURN_NOT_OK(ResizeFixedLengthBuffers(num_rows_to_append));
+
+  if (!metadata_.is_fixed_length) {
+    // Varying-length rows
+    auto from_offsets = reinterpret_cast<const uint32_t*>(from.offsets_->data());
+    auto to_offsets = reinterpret_cast<uint32_t*>(offsets_->mutable_data());
+    uint32_t total_length = to_offsets[num_rows_];
+    uint32_t total_length_to_append = 0;
+    for (uint32_t i = 0; i < num_rows_to_append; ++i) {
+      uint16_t row_id = source_row_ids ? source_row_ids[i] : i;
+      uint32_t length = from_offsets[row_id + 1] - from_offsets[row_id];
+      total_length_to_append += length;
+      to_offsets[num_rows_ + i + 1] = total_length + total_length_to_append;
+    }
+
+    RETURN_NOT_OK(ResizeOptionalVaryingLengthBuffer(total_length_to_append));
+
+    const uint8_t* src = from.rows_->data();
+    uint8_t* dst = rows_->mutable_data() + total_length;
+    for (uint32_t i = 0; i < num_rows_to_append; ++i) {
+      uint16_t row_id = source_row_ids ? source_row_ids[i] : i;
+      uint32_t length = from_offsets[row_id + 1] - from_offsets[row_id];
+      auto src64 = reinterpret_cast<const uint64_t*>(src + from_offsets[row_id]);
+      auto dst64 = reinterpret_cast<uint64_t*>(dst);
+      for (uint32_t j = 0; j < bit_util::CeilDiv(length, 8); ++j) {
+        dst64[j] = src64[j];
+      }
+      dst += length;
+    }
+  } else {
+    // Fixed-length rows
+    const uint8_t* src = from.rows_->data();
+    uint8_t* dst = rows_->mutable_data() + num_rows_ * metadata_.fixed_length;
+    for (uint32_t i = 0; i < num_rows_to_append; ++i) {
+      uint16_t row_id = source_row_ids ? source_row_ids[i] : i;
+      uint32_t length = metadata_.fixed_length;
+      auto src64 = reinterpret_cast<const uint64_t*>(src + length * row_id);
+      auto dst64 = reinterpret_cast<uint64_t*>(dst);
+      for (uint32_t j = 0; j < bit_util::CeilDiv(length, 8); ++j) {
+        dst64[j] = src64[j];
+      }
+      dst += length;
+    }
+  }
+
+  // Null masks
+  uint32_t byte_length = metadata_.null_masks_bytes_per_row;
+  uint64_t dst_byte_offset = num_rows_ * byte_length;
+  const uint8_t* src_base = from.null_masks_->data();
+  uint8_t* dst_base = null_masks_->mutable_data();
+  for (uint32_t i = 0; i < num_rows_to_append; ++i) {
+    uint32_t row_id = source_row_ids ? source_row_ids[i] : i;
+    int64_t src_byte_offset = row_id * byte_length;
+    const uint8_t* src = src_base + src_byte_offset;
+    uint8_t* dst = dst_base + dst_byte_offset;
+    for (uint32_t ibyte = 0; ibyte < byte_length; ++ibyte) {
+      dst[ibyte] = src[ibyte];
+    }
+    dst_byte_offset += byte_length;
+  }
+
+  num_rows_ += num_rows_to_append;
+
+  return Status::OK();
+}
+
+Status RowTableImpl::AppendEmpty(uint32_t num_rows_to_append,
+                                 uint32_t num_extra_bytes_to_append) {
+  RETURN_NOT_OK(ResizeFixedLengthBuffers(num_rows_to_append));
+  RETURN_NOT_OK(ResizeOptionalVaryingLengthBuffer(num_extra_bytes_to_append));
+  num_rows_ += num_rows_to_append;
+  if (metadata_.row_alignment > 1 || metadata_.string_alignment > 1) {
+    memset(rows_->mutable_data(), 0, bytes_capacity_);
+  }
+  return Status::OK();
+}
+
+bool RowTableImpl::has_any_nulls(const LightContext* ctx) const {
+  if (has_any_nulls_) {
+    return true;
+  }
+  if (num_rows_for_has_any_nulls_ < num_rows_) {
+    auto size_per_row = metadata().null_masks_bytes_per_row;
+    has_any_nulls_ = !util::bit_util::are_all_bytes_zero(
+        ctx->hardware_flags, null_masks() + size_per_row * num_rows_for_has_any_nulls_,
+        static_cast<uint32_t>(size_per_row * (num_rows_ - num_rows_for_has_any_nulls_)));
+    num_rows_for_has_any_nulls_ = num_rows_;
+  }
+  return has_any_nulls_;
+}
+
+}  // namespace compute
+}  // namespace arrow
diff --git a/cpp/src/arrow/compute/row/row_internal.h b/cpp/src/arrow/compute/row/row_internal.h
new file mode 100644
index 0000000000..d46ac0e9a9
--- /dev/null
+++ b/cpp/src/arrow/compute/row/row_internal.h
@@ -0,0 +1,250 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#pragma once
+
+#include <cstdint>
+#include <vector>
+
+#include "arrow/buffer.h"
+#include "arrow/compute/light_array.h"
+#include "arrow/memory_pool.h"
+#include "arrow/status.h"
+#include "arrow/util/logging.h"
+
+namespace arrow {
+namespace compute {
+
+/// Description of the data stored in a RowTable
+struct ARROW_EXPORT RowTableMetadata {
+  /// \brief True if there are no variable length columns in the table
+  bool is_fixed_length;
+
+  /// For a fixed-length binary row, common size of rows in bytes,
+  /// rounded up to the multiple of alignment.
+  ///
+  /// For a varying-length binary, size of all encoded fixed-length key columns,
+  /// including lengths of varying-length columns, rounded up to the multiple of string
+  /// alignment.
+  uint32_t fixed_length;
+
+  /// Offset within a row to the array of 32-bit offsets within a row of
+  /// ends of varbinary fields.
+  /// Used only when the row is not fixed-length, zero for fixed-length row.
+  /// There are N elements for N varbinary fields.
+  /// Each element is the offset within a row of the first byte after
+  /// the corresponding varbinary field bytes in that row.
+  /// If varbinary fields begin at aligned addresses, than the end of the previous
+  /// varbinary field needs to be rounded up according to the specified alignment
+  /// to obtain the beginning of the next varbinary field.
+  /// The first varbinary field starts at offset specified by fixed_length,
+  /// which should already be aligned.
+  uint32_t varbinary_end_array_offset;
+
+  /// Fixed number of bytes per row that are used to encode null masks.
+  /// Null masks indicate for a single row which of its columns are null.
+  /// Nth bit in the sequence of bytes assigned to a row represents null
+  /// information for Nth field according to the order in which they are encoded.
+  int null_masks_bytes_per_row;
+
+  /// Power of 2. Every row will start at an offset aligned to that number of bytes.
+  int row_alignment;
+
+  /// Power of 2. Must be no greater than row alignment.
+  /// Every non-power-of-2 binary field and every varbinary field bytes
+  /// will start aligned to that number of bytes.
+  int string_alignment;
+
+  /// Metadata of encoded columns in their original order.
+  std::vector<KeyColumnMetadata> column_metadatas;
+
+  /// Order in which fields are encoded.
+  std::vector<uint32_t> column_order;
+
+  /// Offsets within a row to fields in their encoding order.
+  std::vector<uint32_t> column_offsets;
+
+  /// Rounding up offset to the nearest multiple of alignment value.
+  /// Alignment must be a power of 2.
+  static inline uint32_t padding_for_alignment(uint32_t offset, int required_alignment) {
+    ARROW_DCHECK(ARROW_POPCOUNT64(required_alignment) == 1);
+    return static_cast<uint32_t>((-static_cast<int32_t>(offset)) &
+                                 (required_alignment - 1));
+  }
+
+  /// Rounding up offset to the beginning of next column,
+  /// choosing required alignment based on the data type of that column.
+  static inline uint32_t padding_for_alignment(uint32_t offset, int string_alignment,
+                                               const KeyColumnMetadata& col_metadata) {
+    if (!col_metadata.is_fixed_length ||
+        ARROW_POPCOUNT64(col_metadata.fixed_length) <= 1) {
+      return 0;
+    } else {
+      return padding_for_alignment(offset, string_alignment);
+    }
+  }
+
+  /// Returns an array of offsets within a row of ends of varbinary fields.
+  inline const uint32_t* varbinary_end_array(const uint8_t* row) const {
+    ARROW_DCHECK(!is_fixed_length);
+    return reinterpret_cast<const uint32_t*>(row + varbinary_end_array_offset);
+  }
+
+  /// \brief An array of mutable offsets within a row of ends of varbinary fields.
+  inline uint32_t* varbinary_end_array(uint8_t* row) const {
+    ARROW_DCHECK(!is_fixed_length);
+    return reinterpret_cast<uint32_t*>(row + varbinary_end_array_offset);
+  }
+
+  /// Returns the offset within the row and length of the first varbinary field.
+  inline void first_varbinary_offset_and_length(const uint8_t* row, uint32_t* offset,
+                                                uint32_t* length) const {
+    ARROW_DCHECK(!is_fixed_length);
+    *offset = fixed_length;
+    *length = varbinary_end_array(row)[0] - fixed_length;
+  }
+
+  /// Returns the offset within the row and length of the second and further varbinary
+  /// fields.
+  inline void nth_varbinary_offset_and_length(const uint8_t* row, int varbinary_id,
+                                              uint32_t* out_offset,
+                                              uint32_t* out_length) const {
+    ARROW_DCHECK(!is_fixed_length);
+    ARROW_DCHECK(varbinary_id > 0);
+    const uint32_t* varbinary_end = varbinary_end_array(row);
+    uint32_t offset = varbinary_end[varbinary_id - 1];
+    offset += padding_for_alignment(offset, string_alignment);
+    *out_offset = offset;
+    *out_length = varbinary_end[varbinary_id] - offset;
+  }
+
+  uint32_t encoded_field_order(uint32_t icol) const { return column_order[icol]; }
+
+  uint32_t encoded_field_offset(uint32_t icol) const { return column_offsets[icol]; }
+
+  uint32_t num_cols() const { return static_cast<uint32_t>(column_metadatas.size()); }
+
+  uint32_t num_varbinary_cols() const;
+
+  /// \brief Populate this instance to describe `cols` with the given alignment
+  void FromColumnMetadataVector(const std::vector<KeyColumnMetadata>& cols,
+                                int in_row_alignment, int in_string_alignment);
+
+  /// \brief True if `other` has the same number of columns
+  ///   and each column has the same width (two variable length
+  ///   columns are considered to have the same width)
+  bool is_compatible(const RowTableMetadata& other) const;
+};
+
+/// \brief A table of data stored in row-major order
+///
+/// Can only store non-nested data types
+///
+/// Can store both fixed-size data types and variable-length data types
+///
+/// The row table is not safe
+class ARROW_EXPORT RowTableImpl {
+ public:
+  RowTableImpl();
+  /// \brief Initialize a row array for use
+  ///
+  /// This must be called before any other method
+  Status Init(MemoryPool* pool, const RowTableMetadata& metadata);
+  /// \brief Clear all rows from the table
+  ///
+  /// Does not shrink buffers
+  void Clean();
+  /// \brief Add empty rows
+  /// \param num_rows_to_append The number of empty rows to append
+  /// \param num_extra_bytes_to_append For tables storing variable-length data this
+  ///     should be a guess of how many data bytes will be needed to populate the
+  ///     data.  This is ignored if there are no variable-length columns
+  Status AppendEmpty(uint32_t num_rows_to_append, uint32_t num_extra_bytes_to_append);
+  /// \brief Append rows from a source table
+  /// \param from The table to append from
+  /// \param num_rows_to_append The number of rows to append
+  /// \param source_row_ids Indices (into `from`) of the desired rows
+  Status AppendSelectionFrom(const RowTableImpl& from, uint32_t num_rows_to_append,
+                             const uint16_t* source_row_ids);
+  /// \brief Metadata describing the data stored in this table
+  const RowTableMetadata& metadata() const { return metadata_; }
+  /// \brief The number of rows stored in the table
+  int64_t length() const { return num_rows_; }
+  // Accessors into the table's buffers
+  const uint8_t* data(int i) const {
+    ARROW_DCHECK(i >= 0 && i < kMaxBuffers);
+    return buffers_[i];
+  }
+  uint8_t* mutable_data(int i) {
+    ARROW_DCHECK(i >= 0 && i < kMaxBuffers);
+    return buffers_[i];
+  }
+  const uint32_t* offsets() const { return reinterpret_cast<const uint32_t*>(data(1)); }
+  uint32_t* mutable_offsets() { return reinterpret_cast<uint32_t*>(mutable_data(1)); }
+  const uint8_t* null_masks() const { return null_masks_->data(); }
+  uint8_t* null_masks() { return null_masks_->mutable_data(); }
+
+  /// \brief True if there is a null value anywhere in the table
+  ///
+  /// This calculation is memoized based on the number of rows and assumes
+  /// that values are only appended (and not modified in place) between
+  /// successive calls
+  bool has_any_nulls(const LightContext* ctx) const;
+
+ private:
+  Status ResizeFixedLengthBuffers(int64_t num_extra_rows);
+  Status ResizeOptionalVaryingLengthBuffer(int64_t num_extra_bytes);
+
+  // Helper functions to determine the number of bytes needed for each
+  // buffer given a number of rows.
+  int64_t size_null_masks(int64_t num_rows) const;
+  int64_t size_offsets(int64_t num_rows) const;
+  int64_t size_rows_fixed_length(int64_t num_rows) const;
+  int64_t size_rows_varying_length(int64_t num_bytes) const;
+
+  // Called after resize to fix pointers
+  void UpdateBufferPointers();
+
+  // The arrays in `buffers_` need to be padded so that
+  // vectorized operations can operate in blocks without
+  // worrying about tails
+  static constexpr int64_t kPaddingForVectors = 64;
+  MemoryPool* pool_;
+  RowTableMetadata metadata_;
+  // Buffers can only expand during lifetime and never shrink.
+  std::unique_ptr<ResizableBuffer> null_masks_;
+  // Only used if the table has variable-length columns
+  // Stores the offsets into the binary data (which is stored
+  // after all the fixed-sized fields)
+  std::unique_ptr<ResizableBuffer> offsets_;
+  // Stores the fixed-length parts of the rows
+  std::unique_ptr<ResizableBuffer> rows_;
+  static constexpr int kMaxBuffers = 3;
+  uint8_t* buffers_[kMaxBuffers];
+  // The number of rows in the table
+  int64_t num_rows_;
+  // The number of rows that can be stored in the table without resizing
+  int64_t rows_capacity_;
+  // The number of bytes that can be stored in the table without resizing
+  int64_t bytes_capacity_;
+
+  // Mutable to allow lazy evaluation
+  mutable int64_t num_rows_for_has_any_nulls_;
+  mutable bool has_any_nulls_;
+};
+
+}  // namespace compute
+}  // namespace arrow
diff --git a/cpp/src/arrow/dataset/partition.cc b/cpp/src/arrow/dataset/partition.cc
index 89d90c758f..7108ff452f 100644
--- a/cpp/src/arrow/dataset/partition.cc
+++ b/cpp/src/arrow/dataset/partition.cc
@@ -26,11 +26,11 @@
 #include "arrow/array/array_dict.h"
 #include "arrow/array/array_nested.h"
 #include "arrow/array/builder_dict.h"
-#include "arrow/compute/api_aggregate.h"
 #include "arrow/compute/api_scalar.h"
 #include "arrow/compute/api_vector.h"
 #include "arrow/compute/cast.h"
 #include "arrow/compute/exec/expression_internal.h"
+#include "arrow/compute/row/grouper.h"
 #include "arrow/dataset/dataset_internal.h"
 #include "arrow/filesystem/path_util.h"
 #include "arrow/scalar.h"
@@ -141,14 +141,13 @@ Result<Partitioning::PartitionedBatches> KeyValuePartitioning::Partition(
     key_batch.values.emplace_back(batch->column_data(i));
   }
 
-  ARROW_ASSIGN_OR_RAISE(auto grouper,
-                        compute::internal::Grouper::Make(key_batch.GetDescriptors()));
+  ARROW_ASSIGN_OR_RAISE(auto grouper, compute::Grouper::Make(key_batch.GetDescriptors()));
 
   ARROW_ASSIGN_OR_RAISE(Datum id_batch, grouper->Consume(key_batch));
 
   auto ids = id_batch.array_as<UInt32Array>();
-  ARROW_ASSIGN_OR_RAISE(auto groupings, compute::internal::Grouper::MakeGroupings(
-                                            *ids, grouper->num_groups()));
+  ARROW_ASSIGN_OR_RAISE(auto groupings,
+                        compute::Grouper::MakeGroupings(*ids, grouper->num_groups()));
 
   ARROW_ASSIGN_OR_RAISE(auto uniques, grouper->GetUniques());
   ArrayVector unique_arrays(num_keys);
diff --git a/python/pyarrow/includes/libarrow.pxd b/python/pyarrow/includes/libarrow.pxd
index a3cb044855..39fc130c8a 100644
--- a/python/pyarrow/includes/libarrow.pxd
+++ b/python/pyarrow/includes/libarrow.pxd
@@ -2391,7 +2391,7 @@ cdef extern from * namespace "arrow::compute":
             const CBuffer& buffer)
 
 
-cdef extern from "arrow/compute/api_aggregate.h" namespace \
+cdef extern from "arrow/compute/exec/aggregate.h" namespace \
         "arrow::compute::internal" nogil:
     cdef cppclass CAggregate "arrow::compute::internal::Aggregate":
         c_string function