You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by li...@apache.org on 2022/04/21 13:13:22 UTC

[arrow] branch master updated: ARROW-16166: [C++][Compute] Utilities for assembling join output

This is an automated email from the ASF dual-hosted git repository.

lidavidm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new b0c75dee34 ARROW-16166: [C++][Compute] Utilities for assembling join output
b0c75dee34 is described below

commit b0c75dee34de65834e5a83438e6581f90970fd3d
Author: michalursa <mi...@ursacomputing.com>
AuthorDate: Thu Apr 21 09:13:05 2022 -0400

    ARROW-16166: [C++][Compute] Utilities for assembling join output
    
    Adding utilities that make it easier to filter, replicate and accumulate rows from potentially multiple sources in a single exec batch.
    This is meant to be used in hash join for reducing overheads in producing its output.
    
    Also, moving light-weight array utilities to the same file and adding helper functions for generating them.
    
    Thanks to Weston Pace for updating comments and writing the unit tests.
    
    Closes #12872 from michalursa/ARROW-16166-join-array-utils
    
    Authored-by: michalursa <mi...@ursacomputing.com>
    Signed-off-by: David Li <li...@gmail.com>
---
 cpp/src/arrow/CMakeLists.txt                    |   1 +
 cpp/src/arrow/compute/CMakeLists.txt            |   1 +
 cpp/src/arrow/compute/exec/key_compare.cc       |  25 +-
 cpp/src/arrow/compute/exec/key_compare.h        |  35 +-
 cpp/src/arrow/compute/exec/key_compare_avx2.cc  |  14 +-
 cpp/src/arrow/compute/exec/key_encode.cc        |  93 +--
 cpp/src/arrow/compute/exec/key_encode.h         |  75 +--
 cpp/src/arrow/compute/exec/key_hash.cc          |   4 +-
 cpp/src/arrow/compute/exec/key_hash.h           |   4 +-
 cpp/src/arrow/compute/kernels/hash_aggregate.cc |  30 +-
 cpp/src/arrow/compute/light_array.cc            | 729 ++++++++++++++++++++++++
 cpp/src/arrow/compute/light_array.h             | 382 +++++++++++++
 cpp/src/arrow/compute/light_array_test.cc       | 481 ++++++++++++++++
 13 files changed, 1654 insertions(+), 220 deletions(-)

diff --git a/cpp/src/arrow/CMakeLists.txt b/cpp/src/arrow/CMakeLists.txt
index 2933457287..2e74a2cac1 100644
--- a/cpp/src/arrow/CMakeLists.txt
+++ b/cpp/src/arrow/CMakeLists.txt
@@ -407,6 +407,7 @@ if(ARROW_COMPUTE)
        compute/function.cc
        compute/function_internal.cc
        compute/kernel.cc
+       compute/light_array.cc
        compute/registry.cc
        compute/kernels/aggregate_basic.cc
        compute/kernels/aggregate_mode.cc
diff --git a/cpp/src/arrow/compute/CMakeLists.txt b/cpp/src/arrow/compute/CMakeLists.txt
index 897dc32f35..27e693d9a3 100644
--- a/cpp/src/arrow/compute/CMakeLists.txt
+++ b/cpp/src/arrow/compute/CMakeLists.txt
@@ -63,6 +63,7 @@ add_arrow_compute_test(internals_test
                        function_test.cc
                        exec_test.cc
                        kernel_test.cc
+                       light_array_test.cc
                        registry_test.cc)
 
 add_arrow_benchmark(function_benchmark PREFIX "arrow-compute")
diff --git a/cpp/src/arrow/compute/exec/key_compare.cc b/cpp/src/arrow/compute/exec/key_compare.cc
index ed94bf7230..d873aec692 100644
--- a/cpp/src/arrow/compute/exec/key_compare.cc
+++ b/cpp/src/arrow/compute/exec/key_compare.cc
@@ -34,7 +34,7 @@ void KeyCompare::NullUpdateColumnToRow(uint32_t id_col, uint32_t num_rows_to_com
                                        const uint16_t* sel_left_maybe_null,
                                        const uint32_t* left_to_right_map,
                                        KeyEncoder::KeyEncoderContext* ctx,
-                                       const KeyEncoder::KeyColumnArray& col,
+                                       const KeyColumnArray& col,
                                        const KeyEncoder::KeyRowArray& rows,
                                        uint8_t* match_bytevector) {
   if (!rows.has_any_nulls(ctx) && !col.data(0)) {
@@ -91,7 +91,7 @@ void KeyCompare::CompareBinaryColumnToRowHelper(
     uint32_t offset_within_row, uint32_t first_row_to_compare,
     uint32_t num_rows_to_compare, const uint16_t* sel_left_maybe_null,
     const uint32_t* left_to_right_map, KeyEncoder::KeyEncoderContext* ctx,
-    const KeyEncoder::KeyColumnArray& col, const KeyEncoder::KeyRowArray& rows,
+    const KeyColumnArray& col, const KeyEncoder::KeyRowArray& rows,
     uint8_t* match_bytevector, COMPARE_FN compare_fn) {
   bool is_fixed_length = rows.metadata().is_fixed_length;
   if (is_fixed_length) {
@@ -121,7 +121,7 @@ template <bool use_selection>
 void KeyCompare::CompareBinaryColumnToRow(
     uint32_t offset_within_row, uint32_t num_rows_to_compare,
     const uint16_t* sel_left_maybe_null, const uint32_t* left_to_right_map,
-    KeyEncoder::KeyEncoderContext* ctx, const KeyEncoder::KeyColumnArray& col,
+    KeyEncoder::KeyEncoderContext* ctx, const KeyColumnArray& col,
     const KeyEncoder::KeyRowArray& rows, uint8_t* match_bytevector) {
   uint32_t num_processed = 0;
 #if defined(ARROW_HAVE_AVX2)
@@ -231,7 +231,7 @@ template <bool use_selection, bool is_first_varbinary_col>
 void KeyCompare::CompareVarBinaryColumnToRow(
     uint32_t id_varbinary_col, uint32_t num_rows_to_compare,
     const uint16_t* sel_left_maybe_null, const uint32_t* left_to_right_map,
-    KeyEncoder::KeyEncoderContext* ctx, const KeyEncoder::KeyColumnArray& col,
+    KeyEncoder::KeyEncoderContext* ctx, const KeyColumnArray& col,
     const KeyEncoder::KeyRowArray& rows, uint8_t* match_bytevector) {
 #if defined(ARROW_HAVE_AVX2)
   if (ctx->has_avx2()) {
@@ -306,14 +306,11 @@ void KeyCompare::AndByteVectors(KeyEncoder::KeyEncoderContext* ctx, uint32_t num
   }
 }
 
-void KeyCompare::CompareColumnsToRows(uint32_t num_rows_to_compare,
-                                      const uint16_t* sel_left_maybe_null,
-                                      const uint32_t* left_to_right_map,
-                                      KeyEncoder::KeyEncoderContext* ctx,
-                                      uint32_t* out_num_rows,
-                                      uint16_t* out_sel_left_maybe_same,
-                                      const std::vector<KeyEncoder::KeyColumnArray>& cols,
-                                      const KeyEncoder::KeyRowArray& rows) {
+void KeyCompare::CompareColumnsToRows(
+    uint32_t num_rows_to_compare, const uint16_t* sel_left_maybe_null,
+    const uint32_t* left_to_right_map, KeyEncoder::KeyEncoderContext* ctx,
+    uint32_t* out_num_rows, uint16_t* out_sel_left_maybe_same,
+    const std::vector<KeyColumnArray>& cols, const KeyEncoder::KeyRowArray& rows) {
   if (num_rows_to_compare == 0) {
     *out_num_rows = 0;
     return;
@@ -333,7 +330,7 @@ void KeyCompare::CompareColumnsToRows(uint32_t num_rows_to_compare,
 
   bool is_first_column = true;
   for (size_t icol = 0; icol < cols.size(); ++icol) {
-    const KeyEncoder::KeyColumnArray& col = cols[icol];
+    const KeyColumnArray& col = cols[icol];
     if (col.metadata().is_null_type) {
       // If this null type col is the first column, the match_bytevector_A needs to be
       // initialized with 0xFF. Otherwise, the calculation can be skipped
@@ -374,7 +371,7 @@ void KeyCompare::CompareColumnsToRows(uint32_t num_rows_to_compare,
 
   uint32_t ivarbinary = 0;
   for (size_t icol = 0; icol < cols.size(); ++icol) {
-    const KeyEncoder::KeyColumnArray& col = cols[icol];
+    const KeyColumnArray& col = cols[icol];
     if (!col.metadata().is_fixed_length) {
       // Process varbinary and nulls
       if (sel_left_maybe_null) {
diff --git a/cpp/src/arrow/compute/exec/key_compare.h b/cpp/src/arrow/compute/exec/key_compare.h
index aeb5abbdd1..773b32d46c 100644
--- a/cpp/src/arrow/compute/exec/key_compare.h
+++ b/cpp/src/arrow/compute/exec/key_compare.h
@@ -33,14 +33,11 @@ class KeyCompare {
   // Returns a single 16-bit selection vector of rows that failed comparison.
   // If there is input selection on the left, the resulting selection is a filtered image
   // of input selection.
-  static void CompareColumnsToRows(uint32_t num_rows_to_compare,
-                                   const uint16_t* sel_left_maybe_null,
-                                   const uint32_t* left_to_right_map,
-                                   KeyEncoder::KeyEncoderContext* ctx,
-                                   uint32_t* out_num_rows,
-                                   uint16_t* out_sel_left_maybe_same,
-                                   const std::vector<KeyEncoder::KeyColumnArray>& cols,
-                                   const KeyEncoder::KeyRowArray& rows);
+  static void CompareColumnsToRows(
+      uint32_t num_rows_to_compare, const uint16_t* sel_left_maybe_null,
+      const uint32_t* left_to_right_map, KeyEncoder::KeyEncoderContext* ctx,
+      uint32_t* out_num_rows, uint16_t* out_sel_left_maybe_same,
+      const std::vector<KeyColumnArray>& cols, const KeyEncoder::KeyRowArray& rows);
 
  private:
   template <bool use_selection>
@@ -48,7 +45,7 @@ class KeyCompare {
                                     const uint16_t* sel_left_maybe_null,
                                     const uint32_t* left_to_right_map,
                                     KeyEncoder::KeyEncoderContext* ctx,
-                                    const KeyEncoder::KeyColumnArray& col,
+                                    const KeyColumnArray& col,
                                     const KeyEncoder::KeyRowArray& rows,
                                     uint8_t* match_bytevector);
 
@@ -57,21 +54,21 @@ class KeyCompare {
       uint32_t offset_within_row, uint32_t first_row_to_compare,
       uint32_t num_rows_to_compare, const uint16_t* sel_left_maybe_null,
       const uint32_t* left_to_right_map, KeyEncoder::KeyEncoderContext* ctx,
-      const KeyEncoder::KeyColumnArray& col, const KeyEncoder::KeyRowArray& rows,
+      const KeyColumnArray& col, const KeyEncoder::KeyRowArray& rows,
       uint8_t* match_bytevector, COMPARE_FN compare_fn);
 
   template <bool use_selection>
   static void CompareBinaryColumnToRow(
       uint32_t offset_within_row, uint32_t num_rows_to_compare,
       const uint16_t* sel_left_maybe_null, const uint32_t* left_to_right_map,
-      KeyEncoder::KeyEncoderContext* ctx, const KeyEncoder::KeyColumnArray& col,
+      KeyEncoder::KeyEncoderContext* ctx, const KeyColumnArray& col,
       const KeyEncoder::KeyRowArray& rows, uint8_t* match_bytevector);
 
   template <bool use_selection, bool is_first_varbinary_col>
   static void CompareVarBinaryColumnToRow(
       uint32_t id_varlen_col, uint32_t num_rows_to_compare,
       const uint16_t* sel_left_maybe_null, const uint32_t* left_to_right_map,
-      KeyEncoder::KeyEncoderContext* ctx, const KeyEncoder::KeyColumnArray& col,
+      KeyEncoder::KeyEncoderContext* ctx, const KeyColumnArray& col,
       const KeyEncoder::KeyRowArray& rows, uint8_t* match_bytevector);
 
   static void AndByteVectors(KeyEncoder::KeyEncoderContext* ctx, uint32_t num_elements,
@@ -83,14 +80,14 @@ class KeyCompare {
   static uint32_t NullUpdateColumnToRowImp_avx2(
       uint32_t id_col, uint32_t num_rows_to_compare, const uint16_t* sel_left_maybe_null,
       const uint32_t* left_to_right_map, KeyEncoder::KeyEncoderContext* ctx,
-      const KeyEncoder::KeyColumnArray& col, const KeyEncoder::KeyRowArray& rows,
+      const KeyColumnArray& col, const KeyEncoder::KeyRowArray& rows,
       uint8_t* match_bytevector);
 
   template <bool use_selection, class COMPARE8_FN>
   static uint32_t CompareBinaryColumnToRowHelper_avx2(
       uint32_t offset_within_row, uint32_t num_rows_to_compare,
       const uint16_t* sel_left_maybe_null, const uint32_t* left_to_right_map,
-      KeyEncoder::KeyEncoderContext* ctx, const KeyEncoder::KeyColumnArray& col,
+      KeyEncoder::KeyEncoderContext* ctx, const KeyColumnArray& col,
       const KeyEncoder::KeyRowArray& rows, uint8_t* match_bytevector,
       COMPARE8_FN compare8_fn);
 
@@ -98,14 +95,14 @@ class KeyCompare {
   static uint32_t CompareBinaryColumnToRowImp_avx2(
       uint32_t offset_within_row, uint32_t num_rows_to_compare,
       const uint16_t* sel_left_maybe_null, const uint32_t* left_to_right_map,
-      KeyEncoder::KeyEncoderContext* ctx, const KeyEncoder::KeyColumnArray& col,
+      KeyEncoder::KeyEncoderContext* ctx, const KeyColumnArray& col,
       const KeyEncoder::KeyRowArray& rows, uint8_t* match_bytevector);
 
   template <bool use_selection, bool is_first_varbinary_col>
   static void CompareVarBinaryColumnToRowImp_avx2(
       uint32_t id_varlen_col, uint32_t num_rows_to_compare,
       const uint16_t* sel_left_maybe_null, const uint32_t* left_to_right_map,
-      KeyEncoder::KeyEncoderContext* ctx, const KeyEncoder::KeyColumnArray& col,
+      KeyEncoder::KeyEncoderContext* ctx, const KeyColumnArray& col,
       const KeyEncoder::KeyRowArray& rows, uint8_t* match_bytevector);
 
   static uint32_t AndByteVectors_avx2(uint32_t num_elements, uint8_t* bytevector_A,
@@ -114,20 +111,20 @@ class KeyCompare {
   static uint32_t NullUpdateColumnToRow_avx2(
       bool use_selection, uint32_t id_col, uint32_t num_rows_to_compare,
       const uint16_t* sel_left_maybe_null, const uint32_t* left_to_right_map,
-      KeyEncoder::KeyEncoderContext* ctx, const KeyEncoder::KeyColumnArray& col,
+      KeyEncoder::KeyEncoderContext* ctx, const KeyColumnArray& col,
       const KeyEncoder::KeyRowArray& rows, uint8_t* match_bytevector);
 
   static uint32_t CompareBinaryColumnToRow_avx2(
       bool use_selection, uint32_t offset_within_row, uint32_t num_rows_to_compare,
       const uint16_t* sel_left_maybe_null, const uint32_t* left_to_right_map,
-      KeyEncoder::KeyEncoderContext* ctx, const KeyEncoder::KeyColumnArray& col,
+      KeyEncoder::KeyEncoderContext* ctx, const KeyColumnArray& col,
       const KeyEncoder::KeyRowArray& rows, uint8_t* match_bytevector);
 
   static void CompareVarBinaryColumnToRow_avx2(
       bool use_selection, bool is_first_varbinary_col, uint32_t id_varlen_col,
       uint32_t num_rows_to_compare, const uint16_t* sel_left_maybe_null,
       const uint32_t* left_to_right_map, KeyEncoder::KeyEncoderContext* ctx,
-      const KeyEncoder::KeyColumnArray& col, const KeyEncoder::KeyRowArray& rows,
+      const KeyColumnArray& col, const KeyEncoder::KeyRowArray& rows,
       uint8_t* match_bytevector);
 
 #endif
diff --git a/cpp/src/arrow/compute/exec/key_compare_avx2.cc b/cpp/src/arrow/compute/exec/key_compare_avx2.cc
index df13e8cae3..e45486b2eb 100644
--- a/cpp/src/arrow/compute/exec/key_compare_avx2.cc
+++ b/cpp/src/arrow/compute/exec/key_compare_avx2.cc
@@ -40,7 +40,7 @@ template <bool use_selection>
 uint32_t KeyCompare::NullUpdateColumnToRowImp_avx2(
     uint32_t id_col, uint32_t num_rows_to_compare, const uint16_t* sel_left_maybe_null,
     const uint32_t* left_to_right_map, KeyEncoder::KeyEncoderContext* ctx,
-    const KeyEncoder::KeyColumnArray& col, const KeyEncoder::KeyRowArray& rows,
+    const KeyColumnArray& col, const KeyEncoder::KeyRowArray& rows,
     uint8_t* match_bytevector) {
   if (!rows.has_any_nulls(ctx) && !col.data(0)) {
     return num_rows_to_compare;
@@ -180,7 +180,7 @@ template <bool use_selection, class COMPARE8_FN>
 uint32_t KeyCompare::CompareBinaryColumnToRowHelper_avx2(
     uint32_t offset_within_row, uint32_t num_rows_to_compare,
     const uint16_t* sel_left_maybe_null, const uint32_t* left_to_right_map,
-    KeyEncoder::KeyEncoderContext* ctx, const KeyEncoder::KeyColumnArray& col,
+    KeyEncoder::KeyEncoderContext* ctx, const KeyColumnArray& col,
     const KeyEncoder::KeyRowArray& rows, uint8_t* match_bytevector,
     COMPARE8_FN compare8_fn) {
   bool is_fixed_length = rows.metadata().is_fixed_length;
@@ -419,7 +419,7 @@ template <bool use_selection>
 uint32_t KeyCompare::CompareBinaryColumnToRowImp_avx2(
     uint32_t offset_within_row, uint32_t num_rows_to_compare,
     const uint16_t* sel_left_maybe_null, const uint32_t* left_to_right_map,
-    KeyEncoder::KeyEncoderContext* ctx, const KeyEncoder::KeyColumnArray& col,
+    KeyEncoder::KeyEncoderContext* ctx, const KeyColumnArray& col,
     const KeyEncoder::KeyRowArray& rows, uint8_t* match_bytevector) {
   uint32_t col_width = col.metadata().fixed_length;
   if (col_width == 0) {
@@ -503,7 +503,7 @@ template <bool use_selection, bool is_first_varbinary_col>
 void KeyCompare::CompareVarBinaryColumnToRowImp_avx2(
     uint32_t id_varbinary_col, uint32_t num_rows_to_compare,
     const uint16_t* sel_left_maybe_null, const uint32_t* left_to_right_map,
-    KeyEncoder::KeyEncoderContext* ctx, const KeyEncoder::KeyColumnArray& col,
+    KeyEncoder::KeyEncoderContext* ctx, const KeyColumnArray& col,
     const KeyEncoder::KeyRowArray& rows, uint8_t* match_bytevector) {
   const uint32_t* offsets_left = col.offsets();
   const uint32_t* offsets_right = rows.offsets();
@@ -569,7 +569,7 @@ uint32_t KeyCompare::AndByteVectors_avx2(uint32_t num_elements, uint8_t* bytevec
 uint32_t KeyCompare::NullUpdateColumnToRow_avx2(
     bool use_selection, uint32_t id_col, uint32_t num_rows_to_compare,
     const uint16_t* sel_left_maybe_null, const uint32_t* left_to_right_map,
-    KeyEncoder::KeyEncoderContext* ctx, const KeyEncoder::KeyColumnArray& col,
+    KeyEncoder::KeyEncoderContext* ctx, const KeyColumnArray& col,
     const KeyEncoder::KeyRowArray& rows, uint8_t* match_bytevector) {
   if (use_selection) {
     return NullUpdateColumnToRowImp_avx2<true>(id_col, num_rows_to_compare,
@@ -585,7 +585,7 @@ uint32_t KeyCompare::NullUpdateColumnToRow_avx2(
 uint32_t KeyCompare::CompareBinaryColumnToRow_avx2(
     bool use_selection, uint32_t offset_within_row, uint32_t num_rows_to_compare,
     const uint16_t* sel_left_maybe_null, const uint32_t* left_to_right_map,
-    KeyEncoder::KeyEncoderContext* ctx, const KeyEncoder::KeyColumnArray& col,
+    KeyEncoder::KeyEncoderContext* ctx, const KeyColumnArray& col,
     const KeyEncoder::KeyRowArray& rows, uint8_t* match_bytevector) {
   if (use_selection) {
     return CompareBinaryColumnToRowImp_avx2<true>(offset_within_row, num_rows_to_compare,
@@ -602,7 +602,7 @@ void KeyCompare::CompareVarBinaryColumnToRow_avx2(
     bool use_selection, bool is_first_varbinary_col, uint32_t id_varlen_col,
     uint32_t num_rows_to_compare, const uint16_t* sel_left_maybe_null,
     const uint32_t* left_to_right_map, KeyEncoder::KeyEncoderContext* ctx,
-    const KeyEncoder::KeyColumnArray& col, const KeyEncoder::KeyRowArray& rows,
+    const KeyColumnArray& col, const KeyEncoder::KeyRowArray& rows,
     uint8_t* match_bytevector) {
   if (use_selection) {
     if (is_first_varbinary_col) {
diff --git a/cpp/src/arrow/compute/exec/key_encode.cc b/cpp/src/arrow/compute/exec/key_encode.cc
index f8bd7c2503..3d92c77b09 100644
--- a/cpp/src/arrow/compute/exec/key_encode.cc
+++ b/cpp/src/arrow/compute/exec/key_encode.cc
@@ -271,87 +271,8 @@ bool KeyEncoder::KeyRowArray::has_any_nulls(const KeyEncoderContext* ctx) const
   return has_any_nulls_;
 }
 
-KeyEncoder::KeyColumnArray::KeyColumnArray(const KeyColumnMetadata& metadata,
-                                           const KeyColumnArray& left,
-                                           const KeyColumnArray& right,
-                                           int buffer_id_to_replace) {
-  metadata_ = metadata;
-  length_ = left.length();
-  for (int i = 0; i < max_buffers_; ++i) {
-    buffers_[i] = left.buffers_[i];
-    mutable_buffers_[i] = left.mutable_buffers_[i];
-  }
-  buffers_[buffer_id_to_replace] = right.buffers_[buffer_id_to_replace];
-  mutable_buffers_[buffer_id_to_replace] = right.mutable_buffers_[buffer_id_to_replace];
-  bit_offset_[0] = left.bit_offset_[0];
-  bit_offset_[1] = left.bit_offset_[1];
-  if (buffer_id_to_replace < max_buffers_ - 1) {
-    bit_offset_[buffer_id_to_replace] = right.bit_offset_[buffer_id_to_replace];
-  }
-}
-
-KeyEncoder::KeyColumnArray::KeyColumnArray(const KeyColumnMetadata& metadata,
-                                           int64_t length, const uint8_t* buffer0,
-                                           const uint8_t* buffer1, const uint8_t* buffer2,
-                                           int bit_offset0, int bit_offset1) {
-  metadata_ = metadata;
-  length_ = length;
-  buffers_[0] = buffer0;
-  buffers_[1] = buffer1;
-  buffers_[2] = buffer2;
-  mutable_buffers_[0] = mutable_buffers_[1] = mutable_buffers_[2] = nullptr;
-  bit_offset_[0] = bit_offset0;
-  bit_offset_[1] = bit_offset1;
-}
-
-KeyEncoder::KeyColumnArray::KeyColumnArray(const KeyColumnMetadata& metadata,
-                                           int64_t length, uint8_t* buffer0,
-                                           uint8_t* buffer1, uint8_t* buffer2,
-                                           int bit_offset0, int bit_offset1) {
-  metadata_ = metadata;
-  length_ = length;
-  buffers_[0] = mutable_buffers_[0] = buffer0;
-  buffers_[1] = mutable_buffers_[1] = buffer1;
-  buffers_[2] = mutable_buffers_[2] = buffer2;
-  bit_offset_[0] = bit_offset0;
-  bit_offset_[1] = bit_offset1;
-}
-
-KeyEncoder::KeyColumnArray::KeyColumnArray(const KeyColumnArray& from, int64_t start,
-                                           int64_t length) {
-  metadata_ = from.metadata_;
-  length_ = length;
-  uint32_t fixed_size =
-      !metadata_.is_fixed_length ? sizeof(uint32_t) : metadata_.fixed_length;
-
-  buffers_[0] =
-      from.buffers_[0] ? from.buffers_[0] + (from.bit_offset_[0] + start) / 8 : nullptr;
-  mutable_buffers_[0] = from.mutable_buffers_[0]
-                            ? from.mutable_buffers_[0] + (from.bit_offset_[0] + start) / 8
-                            : nullptr;
-  bit_offset_[0] = (from.bit_offset_[0] + start) % 8;
-
-  if (fixed_size == 0 && !metadata_.is_null_type) {
-    buffers_[1] =
-        from.buffers_[1] ? from.buffers_[1] + (from.bit_offset_[1] + start) / 8 : nullptr;
-    mutable_buffers_[1] = from.mutable_buffers_[1] ? from.mutable_buffers_[1] +
-                                                         (from.bit_offset_[1] + start) / 8
-                                                   : nullptr;
-    bit_offset_[1] = (from.bit_offset_[1] + start) % 8;
-  } else {
-    buffers_[1] = from.buffers_[1] ? from.buffers_[1] + start * fixed_size : nullptr;
-    mutable_buffers_[1] = from.mutable_buffers_[1]
-                              ? from.mutable_buffers_[1] + start * fixed_size
-                              : nullptr;
-    bit_offset_[1] = 0;
-  }
-
-  buffers_[2] = from.buffers_[2];
-  mutable_buffers_[2] = from.mutable_buffers_[2];
-}
-
-KeyEncoder::KeyColumnArray KeyEncoder::TransformBoolean::ArrayReplace(
-    const KeyColumnArray& column, const KeyColumnArray& temp) {
+KeyColumnArray KeyEncoder::TransformBoolean::ArrayReplace(const KeyColumnArray& column,
+                                                          const KeyColumnArray& temp) {
   // Make sure that the temp buffer is large enough
   DCHECK(temp.length() >= column.length() && temp.metadata().is_fixed_length &&
          temp.metadata().fixed_length >= sizeof(uint8_t));
@@ -359,8 +280,7 @@ KeyEncoder::KeyColumnArray KeyEncoder::TransformBoolean::ArrayReplace(
   metadata.is_fixed_length = true;
   metadata.fixed_length = sizeof(uint8_t);
   constexpr int buffer_index = 1;
-  KeyColumnArray result = KeyColumnArray(metadata, column, temp, buffer_index);
-  return result;
+  return column.WithBufferFrom(temp, buffer_index).WithMetadata(metadata);
 }
 
 void KeyEncoder::TransformBoolean::PostDecode(const KeyColumnArray& input,
@@ -387,8 +307,8 @@ bool KeyEncoder::EncoderInteger::UsesTransform(const KeyColumnArray& column) {
   return IsBoolean(column.metadata());
 }
 
-KeyEncoder::KeyColumnArray KeyEncoder::EncoderInteger::ArrayReplace(
-    const KeyColumnArray& column, const KeyColumnArray& temp) {
+KeyColumnArray KeyEncoder::EncoderInteger::ArrayReplace(const KeyColumnArray& column,
+                                                        const KeyColumnArray& temp) {
   if (IsBoolean(column.metadata())) {
     return TransformBoolean::ArrayReplace(column, temp);
   }
@@ -955,7 +875,8 @@ void KeyEncoder::PrepareKeyColumnArrays(int64_t start_row, int64_t num_rows,
   uint32_t num_varbinary_visited = 0;
   for (uint32_t i = 0; i < num_cols; ++i) {
     const KeyColumnArray& col = cols_in[row_metadata_.column_order[i]];
-    KeyColumnArray col_window(col, start_row, num_rows);
+    KeyColumnArray col_window = col.Slice(start_row, num_rows);
+
     batch_all_cols_[i] = col_window;
     if (!col.metadata().is_fixed_length) {
       DCHECK(num_varbinary_visited < batch_varbinary_cols_.size());
diff --git a/cpp/src/arrow/compute/exec/key_encode.h b/cpp/src/arrow/compute/exec/key_encode.h
index da533434d3..f9de31d9c2 100644
--- a/cpp/src/arrow/compute/exec/key_encode.h
+++ b/cpp/src/arrow/compute/exec/key_encode.h
@@ -22,6 +22,7 @@
 #include <vector>
 
 #include "arrow/compute/exec/util.h"
+#include "arrow/compute/light_array.h"
 #include "arrow/memory_pool.h"
 #include "arrow/result.h"
 #include "arrow/status.h"
@@ -30,8 +31,6 @@
 namespace arrow {
 namespace compute {
 
-class KeyColumnMetadata;
-
 /// Converts between key representation as a collection of arrays for
 /// individual columns and another representation as a single array of rows
 /// combining data from all columns into one value.
@@ -49,27 +48,6 @@ class KeyEncoder {
     util::TempVectorStack* stack;
   };
 
-  /// Description of a storage format of a single key column as needed
-  /// for the purpose of row encoding.
-  struct KeyColumnMetadata {
-    KeyColumnMetadata() = default;
-    KeyColumnMetadata(bool is_fixed_length_in, uint32_t fixed_length_in,
-                      bool is_null_type_in = false)
-        : is_fixed_length(is_fixed_length_in),
-          is_null_type(is_null_type_in),
-          fixed_length(fixed_length_in) {}
-    /// Is column storing a varying-length binary, using offsets array
-    /// to find a beginning of a value, or is it a fixed-length binary.
-    bool is_fixed_length;
-    /// Is column null type
-    bool is_null_type;
-    /// For a fixed-length binary column: number of bytes per value.
-    /// Zero has a special meaning, indicating a bit vector with one bit per value if it
-    /// isn't a null type column.
-    /// For a varying-length binary column: number of bytes per offset.
-    uint32_t fixed_length;
-  };
-
   /// Description of a storage format for rows produced by encoder.
   struct KeyRowMetadata {
     /// Is row a varying-length binary, using offsets array to find a beginning of a row,
@@ -241,57 +219,6 @@ class KeyEncoder {
     mutable bool has_any_nulls_;
   };
 
-  /// A lightweight description of an array representing one of key columns.
-  class KeyColumnArray {
-   public:
-    KeyColumnArray() = default;
-    /// Create as a mix of buffers according to the mask from two descriptions
-    /// (Nth bit is set to 0 if Nth buffer from the first input
-    /// should be used and is set to 1 otherwise).
-    /// Metadata is inherited from the first input.
-    KeyColumnArray(const KeyColumnMetadata& metadata, const KeyColumnArray& left,
-                   const KeyColumnArray& right, int buffer_id_to_replace);
-    /// Create for reading
-    KeyColumnArray(const KeyColumnMetadata& metadata, int64_t length,
-                   const uint8_t* buffer0, const uint8_t* buffer1, const uint8_t* buffer2,
-                   int bit_offset0 = 0, int bit_offset1 = 0);
-    /// Create for writing
-    KeyColumnArray(const KeyColumnMetadata& metadata, int64_t length, uint8_t* buffer0,
-                   uint8_t* buffer1, uint8_t* buffer2, int bit_offset0 = 0,
-                   int bit_offset1 = 0);
-    /// Create as a window view of original description that is offset
-    /// by a given number of rows.
-    /// The number of rows used in offset must be divisible by 8
-    /// in order to not split bit vectors within a single byte.
-    KeyColumnArray(const KeyColumnArray& from, int64_t start, int64_t length);
-    uint8_t* mutable_data(int i) {
-      ARROW_DCHECK(i >= 0 && i <= max_buffers_);
-      return mutable_buffers_[i];
-    }
-    const uint8_t* data(int i) const {
-      ARROW_DCHECK(i >= 0 && i <= max_buffers_);
-      return buffers_[i];
-    }
-    uint32_t* mutable_offsets() { return reinterpret_cast<uint32_t*>(mutable_data(1)); }
-    const uint32_t* offsets() const { return reinterpret_cast<const uint32_t*>(data(1)); }
-    const KeyColumnMetadata& metadata() const { return metadata_; }
-    int64_t length() const { return length_; }
-    int bit_offset(int i) const {
-      ARROW_DCHECK(i >= 0 && i < max_buffers_);
-      return bit_offset_[i];
-    }
-
-   private:
-    static constexpr int max_buffers_ = 3;
-    const uint8_t* buffers_[max_buffers_];
-    uint8_t* mutable_buffers_[max_buffers_];
-    KeyColumnMetadata metadata_;
-    int64_t length_;
-    // Starting bit offset within the first byte (between 0 and 7)
-    // to be used when accessing buffers that store bit vectors.
-    int bit_offset_[max_buffers_ - 1];
-  };
-
   void Init(const std::vector<KeyColumnMetadata>& cols, KeyEncoderContext* ctx,
             int row_alignment, int string_alignment);
 
diff --git a/cpp/src/arrow/compute/exec/key_hash.cc b/cpp/src/arrow/compute/exec/key_hash.cc
index bc4cae74dd..125fd3912e 100644
--- a/cpp/src/arrow/compute/exec/key_hash.cc
+++ b/cpp/src/arrow/compute/exec/key_hash.cc
@@ -375,7 +375,7 @@ void Hashing32::HashFixed(int64_t hardware_flags, bool combine_hashes, uint32_t
   }
 }
 
-void Hashing32::HashMultiColumn(const std::vector<KeyEncoder::KeyColumnArray>& cols,
+void Hashing32::HashMultiColumn(const std::vector<KeyColumnArray>& cols,
                                 KeyEncoder::KeyEncoderContext* ctx, uint32_t* hashes) {
   uint32_t num_rows = static_cast<uint32_t>(cols[0].length());
 
@@ -799,7 +799,7 @@ void Hashing64::HashFixed(bool combine_hashes, uint32_t num_rows, uint64_t lengt
   }
 }
 
-void Hashing64::HashMultiColumn(const std::vector<KeyEncoder::KeyColumnArray>& cols,
+void Hashing64::HashMultiColumn(const std::vector<KeyColumnArray>& cols,
                                 KeyEncoder::KeyEncoderContext* ctx, uint64_t* hashes) {
   uint32_t num_rows = static_cast<uint32_t>(cols[0].length());
 
diff --git a/cpp/src/arrow/compute/exec/key_hash.h b/cpp/src/arrow/compute/exec/key_hash.h
index 88f77be1a4..719f3dfd46 100644
--- a/cpp/src/arrow/compute/exec/key_hash.h
+++ b/cpp/src/arrow/compute/exec/key_hash.h
@@ -45,7 +45,7 @@ class ARROW_EXPORT Hashing32 {
   friend void TestBloomSmall(BloomFilterBuildStrategy, int64_t, int, bool, bool);
 
  public:
-  static void HashMultiColumn(const std::vector<KeyEncoder::KeyColumnArray>& cols,
+  static void HashMultiColumn(const std::vector<KeyColumnArray>& cols,
                               KeyEncoder::KeyEncoderContext* ctx, uint32_t* out_hash);
 
  private:
@@ -153,7 +153,7 @@ class ARROW_EXPORT Hashing64 {
   friend void TestBloomSmall(BloomFilterBuildStrategy, int64_t, int, bool, bool);
 
  public:
-  static void HashMultiColumn(const std::vector<KeyEncoder::KeyColumnArray>& cols,
+  static void HashMultiColumn(const std::vector<KeyColumnArray>& cols,
                               KeyEncoder::KeyEncoderContext* ctx, uint64_t* hashes);
 
  private:
diff --git a/cpp/src/arrow/compute/kernels/hash_aggregate.cc b/cpp/src/arrow/compute/kernels/hash_aggregate.cc
index ab8e6cd77d..db34ee6c59 100644
--- a/cpp/src/arrow/compute/kernels/hash_aggregate.cc
+++ b/cpp/src/arrow/compute/kernels/hash_aggregate.cc
@@ -237,19 +237,18 @@ struct GrouperFastImpl : Grouper {
         auto bit_width = checked_cast<const FixedWidthType&>(*key).bit_width();
         ARROW_DCHECK(bit_width % 8 == 0);
         impl->col_metadata_[icol] =
-            arrow::compute::KeyEncoder::KeyColumnMetadata(true, bit_width / 8);
+            arrow::compute::KeyColumnMetadata(true, bit_width / 8);
       } else if (key->id() == Type::BOOL) {
-        impl->col_metadata_[icol] =
-            arrow::compute::KeyEncoder::KeyColumnMetadata(true, 0);
+        impl->col_metadata_[icol] = arrow::compute::KeyColumnMetadata(true, 0);
       } else if (is_fixed_width(key->id())) {
-        impl->col_metadata_[icol] = arrow::compute::KeyEncoder::KeyColumnMetadata(
+        impl->col_metadata_[icol] = arrow::compute::KeyColumnMetadata(
             true, checked_cast<const FixedWidthType&>(*key).bit_width() / 8);
       } else if (is_binary_like(key->id())) {
         impl->col_metadata_[icol] =
-            arrow::compute::KeyEncoder::KeyColumnMetadata(false, sizeof(uint32_t));
+            arrow::compute::KeyColumnMetadata(false, sizeof(uint32_t));
       } else if (key->id() == Type::NA) {
-        impl->col_metadata_[icol] = arrow::compute::KeyEncoder::KeyColumnMetadata(
-            true, 0, /*is_null_type_in=*/true);
+        impl->col_metadata_[icol] =
+            arrow::compute::KeyColumnMetadata(true, 0, /*is_null_type_in=*/true);
       } else {
         return Status::NotImplemented("Keys of type ", *key);
       }
@@ -352,11 +351,10 @@ struct GrouperFastImpl : Grouper {
 
       int64_t offset = batch[icol].array()->offset;
 
-      auto col_base = arrow::compute::KeyEncoder::KeyColumnArray(
+      auto col_base = arrow::compute::KeyColumnArray(
           col_metadata_[icol], offset + num_rows, non_nulls, fixedlen, varlen);
 
-      cols_[icol] =
-          arrow::compute::KeyEncoder::KeyColumnArray(col_base, offset, num_rows);
+      cols_[icol] = col_base.Slice(offset, num_rows);
     }
 
     // Split into smaller mini-batches
@@ -434,8 +432,8 @@ struct GrouperFastImpl : Grouper {
       if (col_metadata_[i].is_null_type) {
         uint8_t* non_nulls = NULLPTR;
         uint8_t* fixedlen = NULLPTR;
-        cols_[i] = arrow::compute::KeyEncoder::KeyColumnArray(
-            col_metadata_[i], num_groups, non_nulls, fixedlen, NULLPTR);
+        cols_[i] = arrow::compute::KeyColumnArray(col_metadata_[i], num_groups, non_nulls,
+                                                  fixedlen, NULLPTR);
         continue;
       }
       ARROW_ASSIGN_OR_RAISE(non_null_bufs[i], AllocatePaddedBitmap(num_groups));
@@ -451,7 +449,7 @@ struct GrouperFastImpl : Grouper {
         ARROW_ASSIGN_OR_RAISE(fixedlen_bufs[i],
                               AllocatePaddedBuffer((num_groups + 1) * sizeof(uint32_t)));
       }
-      cols_[i] = arrow::compute::KeyEncoder::KeyColumnArray(
+      cols_[i] = arrow::compute::KeyColumnArray(
           col_metadata_[i], num_groups, non_null_bufs[i]->mutable_data(),
           fixedlen_bufs[i]->mutable_data(), nullptr);
     }
@@ -470,7 +468,7 @@ struct GrouperFastImpl : Grouper {
           auto varlen_size =
               reinterpret_cast<const uint32_t*>(fixedlen_bufs[i]->data())[num_groups];
           ARROW_ASSIGN_OR_RAISE(varlen_bufs[i], AllocatePaddedBuffer(varlen_size));
-          cols_[i] = arrow::compute::KeyEncoder::KeyColumnArray(
+          cols_[i] = arrow::compute::KeyColumnArray(
               col_metadata_[i], num_groups, non_null_bufs[i]->mutable_data(),
               fixedlen_bufs[i]->mutable_data(), varlen_bufs[i]->mutable_data());
         }
@@ -534,8 +532,8 @@ struct GrouperFastImpl : Grouper {
   arrow::compute::KeyEncoder::KeyEncoderContext encode_ctx_;
 
   std::vector<std::shared_ptr<arrow::DataType>> key_types_;
-  std::vector<arrow::compute::KeyEncoder::KeyColumnMetadata> col_metadata_;
-  std::vector<arrow::compute::KeyEncoder::KeyColumnArray> cols_;
+  std::vector<arrow::compute::KeyColumnMetadata> col_metadata_;
+  std::vector<arrow::compute::KeyColumnArray> cols_;
   std::vector<uint32_t> minibatch_hashes_;
 
   std::vector<std::shared_ptr<Array>> dictionaries_;
diff --git a/cpp/src/arrow/compute/light_array.cc b/cpp/src/arrow/compute/light_array.cc
new file mode 100644
index 0000000000..390dcc6cbf
--- /dev/null
+++ b/cpp/src/arrow/compute/light_array.cc
@@ -0,0 +1,729 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/compute/light_array.h"
+
+#include <type_traits>
+
+#include "arrow/util/bitmap_ops.h"
+
+namespace arrow {
+namespace compute {
+
+KeyColumnArray::KeyColumnArray(const KeyColumnMetadata& metadata, int64_t length,
+                               const uint8_t* validity_buffer,
+                               const uint8_t* fixed_length_buffer,
+                               const uint8_t* var_length_buffer, int bit_offset_validity,
+                               int bit_offset_fixed) {
+  static_assert(std::is_pod<KeyColumnArray>::value,
+                "This class was intended to be a POD type");
+  metadata_ = metadata;
+  length_ = length;
+  buffers_[kValidityBuffer] = validity_buffer;
+  buffers_[kFixedLengthBuffer] = fixed_length_buffer;
+  buffers_[kVariableLengthBuffer] = var_length_buffer;
+  mutable_buffers_[kValidityBuffer] = mutable_buffers_[kFixedLengthBuffer] =
+      mutable_buffers_[kVariableLengthBuffer] = nullptr;
+  bit_offset_[kValidityBuffer] = bit_offset_validity;
+  bit_offset_[kFixedLengthBuffer] = bit_offset_fixed;
+}
+
+KeyColumnArray::KeyColumnArray(const KeyColumnMetadata& metadata, int64_t length,
+                               uint8_t* validity_buffer, uint8_t* fixed_length_buffer,
+                               uint8_t* var_length_buffer, int bit_offset_validity,
+                               int bit_offset_fixed) {
+  metadata_ = metadata;
+  length_ = length;
+  buffers_[kValidityBuffer] = mutable_buffers_[kValidityBuffer] = validity_buffer;
+  buffers_[kFixedLengthBuffer] = mutable_buffers_[kFixedLengthBuffer] =
+      fixed_length_buffer;
+  buffers_[kVariableLengthBuffer] = mutable_buffers_[kVariableLengthBuffer] =
+      var_length_buffer;
+  bit_offset_[kValidityBuffer] = bit_offset_validity;
+  bit_offset_[kFixedLengthBuffer] = bit_offset_fixed;
+}
+
+KeyColumnArray KeyColumnArray::WithBufferFrom(const KeyColumnArray& other,
+                                              int buffer_id_to_replace) const {
+  KeyColumnArray copy = *this;
+  copy.mutable_buffers_[buffer_id_to_replace] =
+      other.mutable_buffers_[buffer_id_to_replace];
+  copy.buffers_[buffer_id_to_replace] = other.buffers_[buffer_id_to_replace];
+  if (buffer_id_to_replace < kMaxBuffers - 1) {
+    copy.bit_offset_[buffer_id_to_replace] = other.bit_offset_[buffer_id_to_replace];
+  }
+  return copy;
+}
+
+KeyColumnArray KeyColumnArray::WithMetadata(const KeyColumnMetadata& metadata) const {
+  KeyColumnArray copy = *this;
+  copy.metadata_ = metadata;
+  return copy;
+}
+
+KeyColumnArray KeyColumnArray::Slice(int64_t offset, int64_t length) const {
+  KeyColumnArray sliced;
+  sliced.metadata_ = metadata_;
+  sliced.length_ = length;
+  uint32_t fixed_size =
+      !metadata_.is_fixed_length ? sizeof(uint32_t) : metadata_.fixed_length;
+
+  sliced.buffers_[0] =
+      buffers_[0] ? buffers_[0] + (bit_offset_[0] + offset) / 8 : nullptr;
+  sliced.mutable_buffers_[0] =
+      mutable_buffers_[0] ? mutable_buffers_[0] + (bit_offset_[0] + offset) / 8 : nullptr;
+  sliced.bit_offset_[0] = (bit_offset_[0] + offset) % 8;
+
+  if (fixed_size == 0 && !metadata_.is_null_type) {
+    sliced.buffers_[1] =
+        buffers_[1] ? buffers_[1] + (bit_offset_[1] + offset) / 8 : nullptr;
+    sliced.mutable_buffers_[1] = mutable_buffers_[1]
+                                     ? mutable_buffers_[1] + (bit_offset_[1] + offset) / 8
+                                     : nullptr;
+    sliced.bit_offset_[1] = (bit_offset_[1] + offset) % 8;
+  } else {
+    sliced.buffers_[1] = buffers_[1] ? buffers_[1] + offset * fixed_size : nullptr;
+    sliced.mutable_buffers_[1] =
+        mutable_buffers_[1] ? mutable_buffers_[1] + offset * fixed_size : nullptr;
+    sliced.bit_offset_[1] = 0;
+  }
+
+  sliced.buffers_[2] = buffers_[2];
+  sliced.mutable_buffers_[2] = mutable_buffers_[2];
+  return sliced;
+}
+
+Result<KeyColumnMetadata> ColumnMetadataFromDataType(
+    const std::shared_ptr<DataType>& type) {
+  if (type->id() == Type::DICTIONARY) {
+    auto bit_width =
+        arrow::internal::checked_cast<const FixedWidthType&>(*type).bit_width();
+    ARROW_DCHECK(bit_width % 8 == 0);
+    return KeyColumnMetadata(true, bit_width / 8);
+  }
+  if (type->id() == Type::BOOL) {
+    return KeyColumnMetadata(true, 0);
+  }
+  if (is_fixed_width(type->id())) {
+    return KeyColumnMetadata(
+        true,
+        arrow::internal::checked_cast<const FixedWidthType&>(*type).bit_width() / 8);
+  }
+  if (is_binary_like(type->id())) {
+    return KeyColumnMetadata(false, sizeof(uint32_t));
+  }
+  if (is_large_binary_like(type->id())) {
+    return KeyColumnMetadata(false, sizeof(uint64_t));
+  }
+  if (type->id() == Type::NA) {
+    return KeyColumnMetadata(true, 0, true);
+  }
+  // Caller attempted to create a KeyColumnArray from an invalid type
+  return Status::TypeError("Unsupported column data type ", type->name(),
+                           " used with KeyColumnMetadata");
+}
+
+Result<KeyColumnArray> ColumnArrayFromArrayData(
+    const std::shared_ptr<ArrayData>& array_data, int start_row, int num_rows) {
+  ARROW_ASSIGN_OR_RAISE(KeyColumnMetadata metadata,
+                        ColumnMetadataFromDataType(array_data->type));
+  KeyColumnArray column_array = KeyColumnArray(
+      metadata, array_data->offset + start_row + num_rows,
+      array_data->buffers[0] != NULLPTR ? array_data->buffers[0]->data() : nullptr,
+      array_data->buffers[1]->data(),
+      (array_data->buffers.size() > 2 && array_data->buffers[2] != NULLPTR)
+          ? array_data->buffers[2]->data()
+          : nullptr);
+  return column_array.Slice(array_data->offset + start_row, num_rows);
+}
+
+Status ColumnMetadatasFromExecBatch(const ExecBatch& batch,
+                                    std::vector<KeyColumnMetadata>* column_metadatas) {
+  int num_columns = static_cast<int>(batch.values.size());
+  column_metadatas->resize(num_columns);
+  for (int i = 0; i < num_columns; ++i) {
+    const Datum& data = batch.values[i];
+    ARROW_DCHECK(data.is_array());
+    const std::shared_ptr<ArrayData>& array_data = data.array();
+    ARROW_ASSIGN_OR_RAISE((*column_metadatas)[i],
+                          ColumnMetadataFromDataType(array_data->type));
+  }
+  return Status::OK();
+}
+
+Status ColumnArraysFromExecBatch(const ExecBatch& batch, int start_row, int num_rows,
+                                 std::vector<KeyColumnArray>* column_arrays) {
+  int num_columns = static_cast<int>(batch.values.size());
+  column_arrays->resize(num_columns);
+  for (int i = 0; i < num_columns; ++i) {
+    const Datum& data = batch.values[i];
+    ARROW_DCHECK(data.is_array());
+    const std::shared_ptr<ArrayData>& array_data = data.array();
+    ARROW_ASSIGN_OR_RAISE((*column_arrays)[i],
+                          ColumnArrayFromArrayData(array_data, start_row, num_rows));
+  }
+  return Status::OK();
+}
+
+Status ColumnArraysFromExecBatch(const ExecBatch& batch,
+                                 std::vector<KeyColumnArray>* column_arrays) {
+  return ColumnArraysFromExecBatch(batch, 0, static_cast<int>(batch.length),
+                                   column_arrays);
+}
+
+void ResizableArrayData::Init(const std::shared_ptr<DataType>& data_type,
+                              MemoryPool* pool, int log_num_rows_min) {
+#ifndef NDEBUG
+  if (num_rows_allocated_ > 0) {
+    ARROW_DCHECK(data_type_ != NULLPTR);
+    KeyColumnMetadata metadata_before =
+        ColumnMetadataFromDataType(data_type_).ValueOrDie();
+    KeyColumnMetadata metadata_after = ColumnMetadataFromDataType(data_type).ValueOrDie();
+    ARROW_DCHECK(metadata_before.is_fixed_length == metadata_after.is_fixed_length &&
+                 metadata_before.fixed_length == metadata_after.fixed_length);
+  }
+#endif
+  Clear(/*release_buffers=*/false);
+  log_num_rows_min_ = log_num_rows_min;
+  data_type_ = data_type;
+  pool_ = pool;
+}
+
+void ResizableArrayData::Clear(bool release_buffers) {
+  num_rows_ = 0;
+  if (release_buffers) {
+    buffers_[kValidityBuffer].reset();
+    buffers_[kFixedLengthBuffer].reset();
+    buffers_[kVariableLengthBuffer].reset();
+    num_rows_allocated_ = 0;
+    var_len_buf_size_ = 0;
+  }
+}
+
+Status ResizableArrayData::ResizeFixedLengthBuffers(int num_rows_new) {
+  ARROW_DCHECK(num_rows_new >= 0);
+  if (num_rows_new <= num_rows_allocated_) {
+    num_rows_ = num_rows_new;
+    return Status::OK();
+  }
+
+  int num_rows_allocated_new = 1 << log_num_rows_min_;
+  while (num_rows_allocated_new < num_rows_new) {
+    num_rows_allocated_new *= 2;
+  }
+
+  KeyColumnMetadata column_metadata = ColumnMetadataFromDataType(data_type_).ValueOrDie();
+
+  if (buffers_[kFixedLengthBuffer] == NULLPTR) {
+    ARROW_DCHECK(buffers_[kValidityBuffer] == NULLPTR &&
+                 buffers_[kVariableLengthBuffer] == NULLPTR);
+
+    ARROW_ASSIGN_OR_RAISE(
+        buffers_[kValidityBuffer],
+        AllocateResizableBuffer(
+            bit_util::BytesForBits(num_rows_allocated_new) + kNumPaddingBytes, pool_));
+    if (column_metadata.is_fixed_length) {
+      if (column_metadata.fixed_length == 0) {
+        ARROW_ASSIGN_OR_RAISE(
+            buffers_[kFixedLengthBuffer],
+            AllocateResizableBuffer(
+                bit_util::BytesForBits(num_rows_allocated_new) + kNumPaddingBytes,
+                pool_));
+      } else {
+        ARROW_ASSIGN_OR_RAISE(
+            buffers_[kFixedLengthBuffer],
+            AllocateResizableBuffer(
+                num_rows_allocated_new * column_metadata.fixed_length + kNumPaddingBytes,
+                pool_));
+      }
+    } else {
+      ARROW_ASSIGN_OR_RAISE(
+          buffers_[kFixedLengthBuffer],
+          AllocateResizableBuffer(
+              (num_rows_allocated_new + 1) * sizeof(uint32_t) + kNumPaddingBytes, pool_));
+    }
+
+    ARROW_ASSIGN_OR_RAISE(
+        buffers_[kVariableLengthBuffer],
+        AllocateResizableBuffer(sizeof(uint64_t) + kNumPaddingBytes, pool_));
+
+    var_len_buf_size_ = sizeof(uint64_t);
+  } else {
+    ARROW_DCHECK(buffers_[kValidityBuffer] != NULLPTR &&
+                 buffers_[kVariableLengthBuffer] != NULLPTR);
+
+    RETURN_NOT_OK(buffers_[kValidityBuffer]->Resize(
+        bit_util::BytesForBits(num_rows_allocated_new) + kNumPaddingBytes));
+
+    if (column_metadata.is_fixed_length) {
+      if (column_metadata.fixed_length == 0) {
+        RETURN_NOT_OK(buffers_[kFixedLengthBuffer]->Resize(
+            bit_util::BytesForBits(num_rows_allocated_new) + kNumPaddingBytes));
+      } else {
+        RETURN_NOT_OK(buffers_[kFixedLengthBuffer]->Resize(
+            num_rows_allocated_new * column_metadata.fixed_length + kNumPaddingBytes));
+      }
+    } else {
+      RETURN_NOT_OK(buffers_[kFixedLengthBuffer]->Resize(
+          (num_rows_allocated_new + 1) * sizeof(uint32_t) + kNumPaddingBytes));
+    }
+  }
+
+  num_rows_allocated_ = num_rows_allocated_new;
+  num_rows_ = num_rows_new;
+
+  return Status::OK();
+}
+
+Status ResizableArrayData::ResizeVaryingLengthBuffer() {
+  KeyColumnMetadata column_metadata;
+  column_metadata = ColumnMetadataFromDataType(data_type_).ValueOrDie();
+
+  if (!column_metadata.is_fixed_length) {
+    int min_new_size = static_cast<int>(reinterpret_cast<const uint32_t*>(
+        buffers_[kFixedLengthBuffer]->data())[num_rows_]);
+    ARROW_DCHECK(var_len_buf_size_ > 0);
+    if (var_len_buf_size_ < min_new_size) {
+      int new_size = var_len_buf_size_;
+      while (new_size < min_new_size) {
+        new_size *= 2;
+      }
+      RETURN_NOT_OK(buffers_[kVariableLengthBuffer]->Resize(new_size + kNumPaddingBytes));
+      var_len_buf_size_ = new_size;
+    }
+  }
+
+  return Status::OK();
+}
+
+KeyColumnArray ResizableArrayData::column_array() const {
+  KeyColumnMetadata column_metadata;
+  column_metadata = ColumnMetadataFromDataType(data_type_).ValueOrDie();
+  return KeyColumnArray(column_metadata, num_rows_,
+                        buffers_[kValidityBuffer]->mutable_data(),
+                        buffers_[kFixedLengthBuffer]->mutable_data(),
+                        buffers_[kVariableLengthBuffer]->mutable_data());
+}
+
+std::shared_ptr<ArrayData> ResizableArrayData::array_data() const {
+  KeyColumnMetadata column_metadata;
+  column_metadata = ColumnMetadataFromDataType(data_type_).ValueOrDie();
+
+  auto valid_count = arrow::internal::CountSetBits(
+      buffers_[kValidityBuffer]->data(), /*offset=*/0, static_cast<int64_t>(num_rows_));
+  int null_count = static_cast<int>(num_rows_) - static_cast<int>(valid_count);
+
+  if (column_metadata.is_fixed_length) {
+    return ArrayData::Make(data_type_, num_rows_,
+                           {buffers_[kValidityBuffer], buffers_[kFixedLengthBuffer]},
+                           null_count);
+  } else {
+    return ArrayData::Make(data_type_, num_rows_,
+                           {buffers_[kValidityBuffer], buffers_[kFixedLengthBuffer],
+                            buffers_[kVariableLengthBuffer]},
+                           null_count);
+  }
+}
+
+int ExecBatchBuilder::NumRowsToSkip(const std::shared_ptr<ArrayData>& column,
+                                    int num_rows, const uint16_t* row_ids,
+                                    int num_tail_bytes_to_skip) {
+#ifndef NDEBUG
+  // Ids must be in non-decreasing order
+  //
+  for (int i = 1; i < num_rows; ++i) {
+    ARROW_DCHECK(row_ids[i] >= row_ids[i - 1]);
+  }
+#endif
+
+  KeyColumnMetadata column_metadata =
+      ColumnMetadataFromDataType(column->type).ValueOrDie();
+
+  int num_rows_left = num_rows;
+  int num_bytes_skipped = 0;
+  while (num_rows_left > 0 && num_bytes_skipped < num_tail_bytes_to_skip) {
+    if (column_metadata.is_fixed_length) {
+      if (column_metadata.fixed_length == 0) {
+        num_rows_left = std::max(num_rows_left, 8) - 8;
+        ++num_bytes_skipped;
+      } else {
+        --num_rows_left;
+        num_bytes_skipped += column_metadata.fixed_length;
+      }
+    } else {
+      --num_rows_left;
+      int row_id_removed = row_ids[num_rows_left];
+      const uint32_t* offsets =
+          reinterpret_cast<const uint32_t*>(column->buffers[1]->data());
+      num_bytes_skipped += offsets[row_id_removed + 1] - offsets[row_id_removed];
+    }
+  }
+
+  return num_rows - num_rows_left;
+}
+
+template <bool OUTPUT_BYTE_ALIGNED>
+void ExecBatchBuilder::CollectBitsImp(const uint8_t* input_bits,
+                                      int64_t input_bits_offset, uint8_t* output_bits,
+                                      int64_t output_bits_offset, int num_rows,
+                                      const uint16_t* row_ids) {
+  if (!OUTPUT_BYTE_ALIGNED) {
+    ARROW_DCHECK(output_bits_offset % 8 > 0);
+    output_bits[output_bits_offset / 8] &=
+        static_cast<uint8_t>((1 << (output_bits_offset % 8)) - 1);
+  } else {
+    ARROW_DCHECK(output_bits_offset % 8 == 0);
+  }
+  constexpr int unroll = 8;
+  for (int i = 0; i < num_rows / unroll; ++i) {
+    const uint16_t* row_ids_base = row_ids + unroll * i;
+    uint8_t result;
+    result = bit_util::GetBit(input_bits, input_bits_offset + row_ids_base[0]) ? 1 : 0;
+    result |= bit_util::GetBit(input_bits, input_bits_offset + row_ids_base[1]) ? 2 : 0;
+    result |= bit_util::GetBit(input_bits, input_bits_offset + row_ids_base[2]) ? 4 : 0;
+    result |= bit_util::GetBit(input_bits, input_bits_offset + row_ids_base[3]) ? 8 : 0;
+    result |= bit_util::GetBit(input_bits, input_bits_offset + row_ids_base[4]) ? 16 : 0;
+    result |= bit_util::GetBit(input_bits, input_bits_offset + row_ids_base[5]) ? 32 : 0;
+    result |= bit_util::GetBit(input_bits, input_bits_offset + row_ids_base[6]) ? 64 : 0;
+    result |= bit_util::GetBit(input_bits, input_bits_offset + row_ids_base[7]) ? 128 : 0;
+    if (OUTPUT_BYTE_ALIGNED) {
+      output_bits[output_bits_offset / 8 + i] = result;
+    } else {
+      output_bits[output_bits_offset / 8 + i] |=
+          static_cast<uint8_t>(result << (output_bits_offset % 8));
+      output_bits[output_bits_offset / 8 + i + 1] =
+          static_cast<uint8_t>(result >> (8 - (output_bits_offset % 8)));
+    }
+  }
+  if (num_rows % unroll > 0) {
+    for (int i = num_rows - (num_rows % unroll); i < num_rows; ++i) {
+      bit_util::SetBitTo(output_bits, output_bits_offset + i,
+                         bit_util::GetBit(input_bits, input_bits_offset + row_ids[i]));
+    }
+  }
+}
+
+void ExecBatchBuilder::CollectBits(const uint8_t* input_bits, int64_t input_bits_offset,
+                                   uint8_t* output_bits, int64_t output_bits_offset,
+                                   int num_rows, const uint16_t* row_ids) {
+  if (output_bits_offset % 8 > 0) {
+    CollectBitsImp<false>(input_bits, input_bits_offset, output_bits, output_bits_offset,
+                          num_rows, row_ids);
+  } else {
+    CollectBitsImp<true>(input_bits, input_bits_offset, output_bits, output_bits_offset,
+                         num_rows, row_ids);
+  }
+}
+
+template <class PROCESS_VALUE_FN>
+void ExecBatchBuilder::Visit(const std::shared_ptr<ArrayData>& column, int num_rows,
+                             const uint16_t* row_ids, PROCESS_VALUE_FN process_value_fn) {
+  KeyColumnMetadata metadata = ColumnMetadataFromDataType(column->type).ValueOrDie();
+
+  if (!metadata.is_fixed_length) {
+    const uint8_t* ptr_base = column->buffers[2]->data();
+    const uint32_t* offsets =
+        reinterpret_cast<const uint32_t*>(column->buffers[1]->data()) + column->offset;
+    for (int i = 0; i < num_rows; ++i) {
+      uint16_t row_id = row_ids[i];
+      const uint8_t* field_ptr = ptr_base + offsets[row_id];
+      uint32_t field_length = offsets[row_id + 1] - offsets[row_id];
+      process_value_fn(i, field_ptr, field_length);
+    }
+  } else {
+    ARROW_DCHECK(metadata.fixed_length > 0);
+    for (int i = 0; i < num_rows; ++i) {
+      uint16_t row_id = row_ids[i];
+      const uint8_t* field_ptr =
+          column->buffers[1]->data() +
+          (column->offset + row_id) * static_cast<int64_t>(metadata.fixed_length);
+      process_value_fn(i, field_ptr, metadata.fixed_length);
+    }
+  }
+}
+
+Status ExecBatchBuilder::AppendSelected(const std::shared_ptr<ArrayData>& source,
+                                        ResizableArrayData* target,
+                                        int num_rows_to_append, const uint16_t* row_ids,
+                                        MemoryPool* pool) {
+  int num_rows_before = target->num_rows();
+  ARROW_DCHECK(num_rows_before >= 0);
+  int num_rows_after = num_rows_before + num_rows_to_append;
+  if (target->num_rows() == 0) {
+    target->Init(source->type, pool, kLogNumRows);
+  }
+  RETURN_NOT_OK(target->ResizeFixedLengthBuffers(num_rows_after));
+
+  KeyColumnMetadata column_metadata =
+      ColumnMetadataFromDataType(source->type).ValueOrDie();
+
+  if (column_metadata.is_fixed_length) {
+    // Fixed length column
+    //
+    uint32_t fixed_length = column_metadata.fixed_length;
+    switch (fixed_length) {
+      case 0:
+        CollectBits(source->buffers[1]->data(), source->offset, target->mutable_data(1),
+                    num_rows_before, num_rows_to_append, row_ids);
+        break;
+      case 1:
+        Visit(source, num_rows_to_append, row_ids,
+              [&](int i, const uint8_t* ptr, uint32_t num_bytes) {
+                target->mutable_data(1)[num_rows_before + i] = *ptr;
+              });
+        break;
+      case 2:
+        Visit(
+            source, num_rows_to_append, row_ids,
+            [&](int i, const uint8_t* ptr, uint32_t num_bytes) {
+              reinterpret_cast<uint16_t*>(target->mutable_data(1))[num_rows_before + i] =
+                  *reinterpret_cast<const uint16_t*>(ptr);
+            });
+        break;
+      case 4:
+        Visit(
+            source, num_rows_to_append, row_ids,
+            [&](int i, const uint8_t* ptr, uint32_t num_bytes) {
+              reinterpret_cast<uint32_t*>(target->mutable_data(1))[num_rows_before + i] =
+                  *reinterpret_cast<const uint32_t*>(ptr);
+            });
+        break;
+      case 8:
+        Visit(
+            source, num_rows_to_append, row_ids,
+            [&](int i, const uint8_t* ptr, uint32_t num_bytes) {
+              reinterpret_cast<uint64_t*>(target->mutable_data(1))[num_rows_before + i] =
+                  *reinterpret_cast<const uint64_t*>(ptr);
+            });
+        break;
+      default: {
+        int num_rows_to_process =
+            num_rows_to_append -
+            NumRowsToSkip(source, num_rows_to_append, row_ids, sizeof(uint64_t));
+        Visit(source, num_rows_to_process, row_ids,
+              [&](int i, const uint8_t* ptr, uint32_t num_bytes) {
+                uint64_t* dst = reinterpret_cast<uint64_t*>(
+                    target->mutable_data(1) +
+                    static_cast<int64_t>(num_bytes) * (num_rows_before + i));
+                const uint64_t* src = reinterpret_cast<const uint64_t*>(ptr);
+                for (uint32_t word_id = 0;
+                     word_id < bit_util::CeilDiv(num_bytes, sizeof(uint64_t));
+                     ++word_id) {
+                  util::SafeStore<uint64_t>(dst + word_id, util::SafeLoad(src + word_id));
+                }
+              });
+        if (num_rows_to_append > num_rows_to_process) {
+          Visit(source, num_rows_to_append - num_rows_to_process,
+                row_ids + num_rows_to_process,
+                [&](int i, const uint8_t* ptr, uint32_t num_bytes) {
+                  uint64_t* dst = reinterpret_cast<uint64_t*>(
+                      target->mutable_data(1) +
+                      static_cast<int64_t>(num_bytes) *
+                          (num_rows_before + num_rows_to_process + i));
+                  const uint64_t* src = reinterpret_cast<const uint64_t*>(ptr);
+                  memcpy(dst, src, num_bytes);
+                });
+        }
+      }
+    }
+  } else {
+    // Varying length column
+    //
+
+    // Step 1: calculate target offsets
+    //
+    uint32_t* offsets = reinterpret_cast<uint32_t*>(target->mutable_data(1));
+    uint32_t sum = num_rows_before == 0 ? 0 : offsets[num_rows_before];
+    Visit(source, num_rows_to_append, row_ids,
+          [&](int i, const uint8_t* ptr, uint32_t num_bytes) {
+            offsets[num_rows_before + i] = num_bytes;
+          });
+    for (int i = 0; i < num_rows_to_append; ++i) {
+      uint32_t length = offsets[num_rows_before + i];
+      offsets[num_rows_before + i] = sum;
+      sum += length;
+    }
+    offsets[num_rows_before + num_rows_to_append] = sum;
+
+    // Step 2: resize output buffers
+    //
+    RETURN_NOT_OK(target->ResizeVaryingLengthBuffer());
+
+    // Step 3: copy varying-length data
+    //
+    int num_rows_to_process =
+        num_rows_to_append -
+        NumRowsToSkip(source, num_rows_to_append, row_ids, sizeof(uint64_t));
+    Visit(source, num_rows_to_process, row_ids,
+          [&](int i, const uint8_t* ptr, uint32_t num_bytes) {
+            uint64_t* dst = reinterpret_cast<uint64_t*>(target->mutable_data(2) +
+                                                        offsets[num_rows_before + i]);
+            const uint64_t* src = reinterpret_cast<const uint64_t*>(ptr);
+            for (uint32_t word_id = 0;
+                 word_id < bit_util::CeilDiv(num_bytes, sizeof(uint64_t)); ++word_id) {
+              util::SafeStore<uint64_t>(dst + word_id, util::SafeLoad(src + word_id));
+            }
+          });
+    Visit(source, num_rows_to_append - num_rows_to_process, row_ids + num_rows_to_process,
+          [&](int i, const uint8_t* ptr, uint32_t num_bytes) {
+            uint64_t* dst = reinterpret_cast<uint64_t*>(
+                target->mutable_data(2) +
+                offsets[num_rows_before + num_rows_to_process + i]);
+            const uint64_t* src = reinterpret_cast<const uint64_t*>(ptr);
+            memcpy(dst, src, num_bytes);
+          });
+  }
+
+  // Process nulls
+  //
+  if (source->buffers[0] == NULLPTR) {
+    uint8_t* dst = target->mutable_data(0);
+    dst[num_rows_before / 8] |= static_cast<uint8_t>(~0ULL << (num_rows_before & 7));
+    for (int i = num_rows_before / 8 + 1;
+         i < bit_util::BytesForBits(num_rows_before + num_rows_to_append); ++i) {
+      dst[i] = 0xff;
+    }
+  } else {
+    CollectBits(source->buffers[0]->data(), source->offset, target->mutable_data(0),
+                num_rows_before, num_rows_to_append, row_ids);
+  }
+
+  return Status::OK();
+}
+
+Status ExecBatchBuilder::AppendNulls(const std::shared_ptr<DataType>& type,
+                                     ResizableArrayData& target, int num_rows_to_append,
+                                     MemoryPool* pool) {
+  int num_rows_before = target.num_rows();
+  int num_rows_after = num_rows_before + num_rows_to_append;
+  if (target.num_rows() == 0) {
+    target.Init(type, pool, kLogNumRows);
+  }
+  RETURN_NOT_OK(target.ResizeFixedLengthBuffers(num_rows_after));
+
+  KeyColumnMetadata column_metadata = ColumnMetadataFromDataType(type).ValueOrDie();
+
+  // Process fixed length buffer
+  //
+  if (column_metadata.is_fixed_length) {
+    uint8_t* dst = target.mutable_data(1);
+    if (column_metadata.fixed_length == 0) {
+      dst[num_rows_before / 8] &= static_cast<uint8_t>((1 << (num_rows_before % 8)) - 1);
+      int64_t offset_begin = num_rows_before / 8 + 1;
+      int64_t offset_end = bit_util::BytesForBits(num_rows_after);
+      if (offset_end > offset_begin) {
+        memset(dst + offset_begin, 0, offset_end - offset_begin);
+      }
+    } else {
+      memset(dst + num_rows_before * static_cast<int64_t>(column_metadata.fixed_length),
+             0, static_cast<int64_t>(column_metadata.fixed_length) * num_rows_to_append);
+    }
+  } else {
+    uint32_t* dst = reinterpret_cast<uint32_t*>(target.mutable_data(1));
+    uint32_t sum = num_rows_before == 0 ? 0 : dst[num_rows_before];
+    for (int64_t i = num_rows_before; i <= num_rows_after; ++i) {
+      dst[i] = sum;
+    }
+  }
+
+  // Process nulls
+  //
+  uint8_t* dst = target.mutable_data(0);
+  dst[num_rows_before / 8] &= static_cast<uint8_t>((1 << (num_rows_before % 8)) - 1);
+  int64_t offset_begin = num_rows_before / 8 + 1;
+  int64_t offset_end = bit_util::BytesForBits(num_rows_after);
+  if (offset_end > offset_begin) {
+    memset(dst + offset_begin, 0, offset_end - offset_begin);
+  }
+
+  return Status::OK();
+}
+
+Status ExecBatchBuilder::AppendSelected(MemoryPool* pool, const ExecBatch& batch,
+                                        int num_rows_to_append, const uint16_t* row_ids,
+                                        int num_cols, const int* col_ids) {
+  if (num_rows_to_append == 0) {
+    return Status::OK();
+  }
+
+  if (num_rows() + num_rows_to_append > num_rows_max()) {
+    return Status::CapacityError("ExecBatch builder exceeded limit of accumulated rows");
+  }
+
+  // If this is the first time we append rows, then initialize output buffers.
+  //
+  if (values_.empty()) {
+    values_.resize(num_cols);
+    for (int i = 0; i < num_cols; ++i) {
+      const Datum& data = batch.values[col_ids ? col_ids[i] : i];
+      ARROW_DCHECK(data.is_array());
+      const std::shared_ptr<ArrayData>& array_data = data.array();
+      values_[i].Init(array_data->type, pool, kLogNumRows);
+    }
+  }
+
+  for (size_t i = 0; i < values_.size(); ++i) {
+    const Datum& data = batch.values[col_ids ? col_ids[i] : i];
+    ARROW_DCHECK(data.is_array());
+    const std::shared_ptr<ArrayData>& array_data = data.array();
+    RETURN_NOT_OK(
+        AppendSelected(array_data, &values_[i], num_rows_to_append, row_ids, pool));
+  }
+
+  return Status::OK();
+}
+
+Status ExecBatchBuilder::AppendNulls(MemoryPool* pool,
+                                     const std::vector<std::shared_ptr<DataType>>& types,
+                                     int num_rows_to_append) {
+  if (num_rows_to_append == 0) {
+    return Status::OK();
+  }
+
+  if (num_rows() + num_rows_to_append > num_rows_max()) {
+    return Status::CapacityError("ExecBatch builder exceeded limit of accumulated rows.");
+  }
+
+  // If this is the first time we append rows, then initialize output buffers.
+  //
+  if (values_.empty()) {
+    values_.resize(types.size());
+    for (size_t i = 0; i < types.size(); ++i) {
+      values_[i].Init(types[i], pool, kLogNumRows);
+    }
+  }
+
+  for (size_t i = 0; i < values_.size(); ++i) {
+    RETURN_NOT_OK(AppendNulls(types[i], values_[i], num_rows_to_append, pool));
+  }
+
+  return Status::OK();
+}
+
+ExecBatch ExecBatchBuilder::Flush() {
+  ARROW_DCHECK(num_rows() > 0);
+  ExecBatch out({}, num_rows());
+  out.values.resize(values_.size());
+  for (size_t i = 0; i < values_.size(); ++i) {
+    out.values[i] = values_[i].array_data();
+    values_[i].Clear(true);
+  }
+  return out;
+}
+
+}  // namespace compute
+}  // namespace arrow
diff --git a/cpp/src/arrow/compute/light_array.h b/cpp/src/arrow/compute/light_array.h
new file mode 100644
index 0000000000..0856e3e8aa
--- /dev/null
+++ b/cpp/src/arrow/compute/light_array.h
@@ -0,0 +1,382 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <cstdint>
+
+#include "arrow/array.h"
+#include "arrow/compute/exec.h"
+#include "arrow/type.h"
+#include "arrow/util/logging.h"
+
+/// This file contains lightweight containers for Arrow buffers.  These containers
+/// makes compromises in terms of strong ownership and the range of data types supported
+/// in order to gain performance and reduced overhead.
+
+namespace arrow {
+namespace compute {
+
+/// \brief Description of the layout of a "key" column
+///
+/// A "key" column is a non-nested, non-union column.
+/// Every key column has either 0 (null), 2 (e.g. int32) or 3 (e.g. string) buffers
+/// and no children.
+///
+/// This metadata object is a zero-allocation analogue of arrow::DataType
+struct ARROW_EXPORT KeyColumnMetadata {
+  KeyColumnMetadata() = default;
+  KeyColumnMetadata(bool is_fixed_length_in, uint32_t fixed_length_in,
+                    bool is_null_type_in = false)
+      : is_fixed_length(is_fixed_length_in),
+        is_null_type(is_null_type_in),
+        fixed_length(fixed_length_in) {}
+  /// \brief True if the column is not a varying-length binary type
+  ///
+  /// If this is true the column will have a validity buffer and
+  /// a data buffer and the third buffer will be unused.
+  bool is_fixed_length;
+  /// \brief True if this column is the null type
+  bool is_null_type;
+  /// \brief The number of bytes for each item
+  ///
+  /// Zero has a special meaning, indicating a bit vector with one bit per value if it
+  /// isn't a null type column.
+  ///
+  /// For a varying-length binary column this represents the number of bytes per offset.
+  uint32_t fixed_length;
+};
+
+/// \brief A lightweight view into a "key" array
+///
+/// A "key" column is a non-nested, non-union column \see KeyColumnMetadata
+///
+/// This metadata object is a zero-allocation analogue of arrow::ArrayData
+class ARROW_EXPORT KeyColumnArray {
+ public:
+  /// \brief Create an uninitialized KeyColumnArray
+  KeyColumnArray() = default;
+  /// \brief Create a read-only view from buffers
+  ///
+  /// This is a view only and does not take ownership of the buffers.  The lifetime
+  /// of the buffers must exceed the lifetime of this view
+  KeyColumnArray(const KeyColumnMetadata& metadata, int64_t length,
+                 const uint8_t* validity_buffer, const uint8_t* fixed_length_buffer,
+                 const uint8_t* var_length_buffer, int bit_offset_validity = 0,
+                 int bit_offset_fixed = 0);
+  /// \brief Create a mutable view from buffers
+  ///
+  /// This is a view only and does not take ownership of the buffers.  The lifetime
+  /// of the buffers must exceed the lifetime of this view
+  KeyColumnArray(const KeyColumnMetadata& metadata, int64_t length,
+                 uint8_t* validity_buffer, uint8_t* fixed_length_buffer,
+                 uint8_t* var_length_buffer, int bit_offset_validity = 0,
+                 int bit_offset_fixed = 0);
+  /// \brief Create a sliced view of `this`
+  ///
+  /// The number of rows used in offset must be divisible by 8
+  /// in order to not split bit vectors within a single byte.
+  KeyColumnArray Slice(int64_t offset, int64_t length) const;
+  /// \brief Create a copy of `this` with a buffer from `other`
+  ///
+  /// The copy will be identical to `this` except the buffer at buffer_id_to_replace
+  /// will be replaced by the corresponding buffer in `other`.
+  KeyColumnArray WithBufferFrom(const KeyColumnArray& other,
+                                int buffer_id_to_replace) const;
+
+  /// \brief Create a copy of `this` with new metadata
+  KeyColumnArray WithMetadata(const KeyColumnMetadata& metadata) const;
+
+  // Constants used for accessing buffers using data() and mutable_data().
+  static constexpr int kValidityBuffer = 0;
+  static constexpr int kFixedLengthBuffer = 1;
+  static constexpr int kVariableLengthBuffer = 2;
+
+  /// \brief Return one of the underlying mutable buffers
+  uint8_t* mutable_data(int i) {
+    ARROW_DCHECK(i >= 0 && i <= kMaxBuffers);
+    return mutable_buffers_[i];
+  }
+  /// \brief Return one of the underlying read-only buffers
+  const uint8_t* data(int i) const {
+    ARROW_DCHECK(i >= 0 && i <= kMaxBuffers);
+    return buffers_[i];
+  }
+  /// \brief Return a mutable version of the offsets buffer
+  ///
+  /// Only valid if this is a view into a varbinary type
+  uint32_t* mutable_offsets() {
+    DCHECK(!metadata_.is_fixed_length);
+    return reinterpret_cast<uint32_t*>(mutable_data(kFixedLengthBuffer));
+  }
+  /// \brief Return a read-only version of the offsets buffer
+  ///
+  /// Only valid if this is a view into a varbinary type
+  const uint32_t* offsets() const {
+    DCHECK(!metadata_.is_fixed_length);
+    return reinterpret_cast<const uint32_t*>(data(kFixedLengthBuffer));
+  }
+  /// \brief Return the type metadata
+  const KeyColumnMetadata& metadata() const { return metadata_; }
+  /// \brief Return the length (in rows) of the array
+  int64_t length() const { return length_; }
+  /// \brief Return the bit offset into the corresponding vector
+  ///
+  /// if i == 1 then this must be a bool array
+  int bit_offset(int i) const {
+    ARROW_DCHECK(i >= 0 && i < kMaxBuffers);
+    return bit_offset_[i];
+  }
+
+ private:
+  static constexpr int kMaxBuffers = 3;
+  const uint8_t* buffers_[kMaxBuffers];
+  uint8_t* mutable_buffers_[kMaxBuffers];
+  KeyColumnMetadata metadata_;
+  int64_t length_;
+  // Starting bit offset within the first byte (between 0 and 7)
+  // to be used when accessing buffers that store bit vectors.
+  int bit_offset_[kMaxBuffers - 1];
+};
+
+/// \brief Create KeyColumnMetadata from a DataType
+///
+/// If `type` is a dictionary type then this will return the KeyColumnMetadata for
+/// the indices type
+///
+/// This should only be called on "key" columns.  Calling this with
+/// a non-key column will return Status::TypeError.
+ARROW_EXPORT Result<KeyColumnMetadata> ColumnMetadataFromDataType(
+    const std::shared_ptr<DataType>& type);
+
+/// \brief Create KeyColumnArray from ArrayData
+///
+/// If `type` is a dictionary type then this will return the KeyColumnArray for
+/// the indices array
+///
+/// The caller should ensure this is only called on "key" columns.
+/// \see ColumnMetadataFromDataType for details
+ARROW_EXPORT Result<KeyColumnArray> ColumnArrayFromArrayData(
+    const std::shared_ptr<ArrayData>& array_data, int start_row, int num_rows);
+
+/// \brief Create KeyColumnMetadata instances from an ExecBatch
+///
+/// column_metadatas will be resized to fit
+///
+/// All columns in `batch` must be eligible "key" columns and have an array shape
+/// \see ColumnMetadataFromDataType for more details
+ARROW_EXPORT Status ColumnMetadatasFromExecBatch(
+    const ExecBatch& batch, std::vector<KeyColumnMetadata>* column_metadatas);
+
+/// \brief Create KeyColumnArray instances from a slice of an ExecBatch
+///
+/// column_arrays will be resized to fit
+///
+/// All columns in `batch` must be eligible "key" columns and have an array shape
+/// \see ColumnArrayFromArrayData for more details
+ARROW_EXPORT Status ColumnArraysFromExecBatch(const ExecBatch& batch, int start_row,
+                                              int num_rows,
+                                              std::vector<KeyColumnArray>* column_arrays);
+
+/// \brief Create KeyColumnArray instances from an ExecBatch
+///
+/// column_arrays will be resized to fit
+///
+/// All columns in `batch` must be eligible "key" columns and have an array shape
+/// \see ColumnArrayFromArrayData for more details
+ARROW_EXPORT Status ColumnArraysFromExecBatch(const ExecBatch& batch,
+                                              std::vector<KeyColumnArray>* column_arrays);
+
+/// A lightweight resizable array for "key" columns
+///
+/// Unlike KeyColumnArray this instance owns its buffers
+///
+/// Resizing is handled by arrow::ResizableBuffer and a doubling approach is
+/// used so that resizes will always grow up to the next power of 2
+class ARROW_EXPORT ResizableArrayData {
+ public:
+  /// \brief Create an uninitialized instance
+  ///
+  /// Init must be called before calling any other operations
+  ResizableArrayData()
+      : log_num_rows_min_(0),
+        pool_(NULLPTR),
+        num_rows_(0),
+        num_rows_allocated_(0),
+        var_len_buf_size_(0) {}
+
+  ~ResizableArrayData() { Clear(true); }
+
+  /// \brief Initialize the array
+  /// \param data_type The data type this array is holding data for.
+  /// \param pool The pool to make allocations on
+  /// \param log_num_rows_min All resize operations will allocate at least enough
+  ///                         space for (1 << log_num_rows_min) rows
+  void Init(const std::shared_ptr<DataType>& data_type, MemoryPool* pool,
+            int log_num_rows_min);
+
+  /// \brief Resets the array back to an empty state
+  /// \param release_buffers If true then allocated memory is released and the
+  ///                        next resize operation will have to reallocate memory
+  void Clear(bool release_buffers);
+
+  /// \brief Resize the fixed length buffers
+  ///
+  /// The buffers will be resized to hold at least `num_rows_new` rows of data
+  Status ResizeFixedLengthBuffers(int num_rows_new);
+
+  /// \brief Resize the varying length buffer if this array is a variable binary type
+  ///
+  /// This must be called after offsets have been populated and the buffer will be
+  /// resized to hold at least as much data as the offsets require
+  ///
+  /// Does nothing if the array is not a variable binary type
+  Status ResizeVaryingLengthBuffer();
+
+  /// \brief The current length (in rows) of the array
+  int num_rows() const { return num_rows_; }
+
+  /// \brief A non-owning view into this array
+  KeyColumnArray column_array() const;
+
+  /// \brief A lightweight descriptor of the data held by this array
+  Result<KeyColumnMetadata> column_metadata() const {
+    return ColumnMetadataFromDataType(data_type_);
+  }
+
+  /// \brief Convert the data to an arrow::ArrayData
+  ///
+  /// This is a zero copy operation and the created ArrayData will reference the
+  /// buffers held by this instance.
+  std::shared_ptr<ArrayData> array_data() const;
+
+  // Constants used for accessing buffers using mutable_data().
+  static constexpr int kValidityBuffer = 0;
+  static constexpr int kFixedLengthBuffer = 1;
+  static constexpr int kVariableLengthBuffer = 2;
+
+  /// \brief A raw pointer to the requested buffer
+  ///
+  /// If i is 0 (kValidityBuffer) then this returns the validity buffer
+  /// If i is 1 (kFixedLengthBuffer) then this returns the buffer used for values (if this
+  /// is a fixed
+  ///           length data type) or offsets (if this is a variable binary type)
+  /// If i is 2 (kVariableLengthBuffer) then this returns the buffer used for variable
+  /// length binary data
+  uint8_t* mutable_data(int i) { return buffers_[i]->mutable_data(); }
+
+ private:
+  static constexpr int64_t kNumPaddingBytes = 64;
+  int log_num_rows_min_;
+  std::shared_ptr<DataType> data_type_;
+  MemoryPool* pool_;
+  int num_rows_;
+  int num_rows_allocated_;
+  int var_len_buf_size_;
+  static constexpr int kMaxBuffers = 3;
+  std::shared_ptr<ResizableBuffer> buffers_[kMaxBuffers];
+};
+
+/// \brief A builder to concatenate batches of data into a larger batch
+///
+/// Will only store num_rows_max() rows
+class ARROW_EXPORT ExecBatchBuilder {
+ public:
+  /// \brief Add rows from `source` into `target` column
+  ///
+  /// If `target` is uninitialized or cleared it will be initialized to use
+  /// the given pool.
+  static Status AppendSelected(const std::shared_ptr<ArrayData>& source,
+                               ResizableArrayData* target, int num_rows_to_append,
+                               const uint16_t* row_ids, MemoryPool* pool);
+
+  /// \brief Add nulls into `target` column
+  ///
+  /// If `target` is uninitialized or cleared it will be initialized to use
+  /// the given pool.
+  static Status AppendNulls(const std::shared_ptr<DataType>& type,
+                            ResizableArrayData& target, int num_rows_to_append,
+                            MemoryPool* pool);
+
+  /// \brief Add selected rows from `batch`
+  ///
+  /// If `col_ids` is null then `num_cols` should less than batch.num_values() and
+  /// the first `num_cols` columns of batch will be appended.
+  ///
+  /// All columns in `batch` must have array shape
+  Status AppendSelected(MemoryPool* pool, const ExecBatch& batch, int num_rows_to_append,
+                        const uint16_t* row_ids, int num_cols,
+                        const int* col_ids = NULLPTR);
+
+  /// \brief Add all-null rows
+  Status AppendNulls(MemoryPool* pool,
+                     const std::vector<std::shared_ptr<DataType>>& types,
+                     int num_rows_to_append);
+
+  /// \brief Create an ExecBatch with the data that has been appended so far
+  ///        and clear this builder to be used again
+  ///
+  /// Should only be called if num_rows() returns non-zero.
+  ExecBatch Flush();
+
+  int num_rows() const { return values_.empty() ? 0 : values_[0].num_rows(); }
+
+  static int num_rows_max() { return 1 << kLogNumRows; }
+
+ private:
+  static constexpr int kLogNumRows = 15;
+
+  // Calculate how many rows to skip from the tail of the
+  // sequence of selected rows, such that the total size of skipped rows is at
+  // least equal to the size specified by the caller.
+  //
+  // Skipping of the tail rows
+  // is used to allow for faster processing by the caller of remaining rows
+  // without checking buffer bounds (useful with SIMD or fixed size memory loads
+  // and stores).
+  //
+  // The sequence of row_ids provided must be non-decreasing.
+  //
+  static int NumRowsToSkip(const std::shared_ptr<ArrayData>& column, int num_rows,
+                           const uint16_t* row_ids, int num_tail_bytes_to_skip);
+
+  // The supplied lambda will be called for each row in the given list of rows.
+  // The arguments given to it will be:
+  // - index of a row (within the set of selected rows),
+  // - pointer to the value,
+  // - byte length of the value.
+  //
+  // The information about nulls (validity bitmap) is not used in this call and
+  // has to be processed separately.
+  //
+  template <class PROCESS_VALUE_FN>
+  static void Visit(const std::shared_ptr<ArrayData>& column, int num_rows,
+                    const uint16_t* row_ids, PROCESS_VALUE_FN process_value_fn);
+
+  template <bool OUTPUT_BYTE_ALIGNED>
+  static void CollectBitsImp(const uint8_t* input_bits, int64_t input_bits_offset,
+                             uint8_t* output_bits, int64_t output_bits_offset,
+                             int num_rows, const uint16_t* row_ids);
+  static void CollectBits(const uint8_t* input_bits, int64_t input_bits_offset,
+                          uint8_t* output_bits, int64_t output_bits_offset, int num_rows,
+                          const uint16_t* row_ids);
+
+  std::vector<ResizableArrayData> values_;
+};
+
+}  // namespace compute
+}  // namespace arrow
diff --git a/cpp/src/arrow/compute/light_array_test.cc b/cpp/src/arrow/compute/light_array_test.cc
new file mode 100644
index 0000000000..3f6d478035
--- /dev/null
+++ b/cpp/src/arrow/compute/light_array_test.cc
@@ -0,0 +1,481 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/compute/light_array.h"
+
+#include <gtest/gtest.h>
+
+#include "arrow/compute/exec/test_util.h"
+#include "arrow/testing/generator.h"
+#include "arrow/testing/gtest_util.h"
+#include "arrow/type.h"
+#include "arrow/util/checked_cast.h"
+
+namespace arrow {
+namespace compute {
+
+const std::vector<std::shared_ptr<DataType>> kSampleFixedDataTypes = {
+    int8(),   int16(),  int32(),  int64(),           uint8(),
+    uint16(), uint32(), uint64(), decimal128(38, 6), decimal256(76, 6)};
+const std::vector<std::shared_ptr<DataType>> kSampleBinaryTypes = {utf8(), binary()};
+
+TEST(KeyColumnMetadata, FromDataType) {
+  KeyColumnMetadata metadata = ColumnMetadataFromDataType(boolean()).ValueOrDie();
+  ASSERT_EQ(0, metadata.fixed_length);
+  ASSERT_EQ(true, metadata.is_fixed_length);
+  ASSERT_EQ(false, metadata.is_null_type);
+
+  metadata = ColumnMetadataFromDataType(null()).ValueOrDie();
+  ASSERT_EQ(true, metadata.is_null_type);
+
+  for (const auto& type : kSampleFixedDataTypes) {
+    int byte_width =
+        arrow::internal::checked_pointer_cast<FixedWidthType>(type)->bit_width() / 8;
+    metadata = ColumnMetadataFromDataType(type).ValueOrDie();
+    ASSERT_EQ(byte_width, metadata.fixed_length);
+    ASSERT_EQ(true, metadata.is_fixed_length);
+    ASSERT_EQ(false, metadata.is_null_type);
+  }
+
+  for (const auto& type : {binary(), utf8()}) {
+    metadata = ColumnMetadataFromDataType(type).ValueOrDie();
+    ASSERT_EQ(4, metadata.fixed_length);
+    ASSERT_EQ(false, metadata.is_fixed_length);
+    ASSERT_EQ(false, metadata.is_null_type);
+  }
+
+  for (const auto& type : {large_binary(), large_utf8()}) {
+    metadata = ColumnMetadataFromDataType(type).ValueOrDie();
+    ASSERT_EQ(8, metadata.fixed_length);
+    ASSERT_EQ(false, metadata.is_fixed_length);
+    ASSERT_EQ(false, metadata.is_null_type);
+  }
+}
+
+TEST(KeyColumnArray, FromArrayData) {
+  for (const auto& type : kSampleFixedDataTypes) {
+    ARROW_SCOPED_TRACE("Type: ", type->ToString());
+    // `array_offset` is the offset of the source array (e.g. if we are given a sliced
+    // source array) while `offset` is the offset we pass when constructing the
+    // KeyColumnArray
+    for (auto array_offset : {0, 1}) {
+      ARROW_SCOPED_TRACE("Array offset: ", array_offset);
+      for (auto offset : {0, 1}) {
+        ARROW_SCOPED_TRACE("Constructor offset: ", offset);
+        std::shared_ptr<Array> array;
+        int byte_width =
+            arrow::internal::checked_pointer_cast<FixedWidthType>(type)->bit_width() / 8;
+        if (is_decimal(type->id())) {
+          array = ArrayFromJSON(type, R"(["1.123123", "2.123123", null])");
+        } else {
+          array = ArrayFromJSON(type, "[1, 2, null]");
+        }
+        array = array->Slice(array_offset);
+        int length = static_cast<int32_t>(array->length()) - offset - array_offset;
+        int buffer_offset_bytes = (offset + array_offset) * byte_width;
+        KeyColumnArray kc_array =
+            ColumnArrayFromArrayData(array->data(), offset, length).ValueOrDie();
+        // Maximum tested offset is < 8 so validity is just bit offset
+        ASSERT_EQ(offset + array_offset, kc_array.bit_offset(0));
+        ASSERT_EQ(0, kc_array.bit_offset(1));
+        ASSERT_EQ(array->data()->buffers[0]->data(), kc_array.data(0));
+        ASSERT_EQ(array->data()->buffers[1]->data() + buffer_offset_bytes,
+                  kc_array.data(1));
+        ASSERT_EQ(nullptr, kc_array.data(2));
+        ASSERT_EQ(length, kc_array.length());
+        // When creating from ArrayData always create read-only
+        ASSERT_EQ(nullptr, kc_array.mutable_data(0));
+        ASSERT_EQ(nullptr, kc_array.mutable_data(1));
+        ASSERT_EQ(nullptr, kc_array.mutable_data(2));
+      }
+    }
+  }
+}
+
+TEST(KeyColumnArray, FromArrayDataBinary) {
+  for (const auto& type : kSampleBinaryTypes) {
+    ARROW_SCOPED_TRACE("Type: ", type->ToString());
+    for (auto array_offset : {0, 1}) {
+      ARROW_SCOPED_TRACE("Array offset: ", array_offset);
+      for (auto offset : {0, 1}) {
+        ARROW_SCOPED_TRACE("Constructor offset: ", offset);
+        std::shared_ptr<Array> array = ArrayFromJSON(type, R"(["xyz", "abcabc", null])");
+        int offsets_width =
+            static_cast<int>(arrow::internal::checked_pointer_cast<BaseBinaryType>(type)
+                                 ->layout()
+                                 .buffers[1]
+                                 .byte_width);
+        array = array->Slice(array_offset);
+        int length = static_cast<int32_t>(array->length()) - offset - array_offset;
+        int buffer_offset_bytes = (offset + array_offset) * offsets_width;
+        KeyColumnArray kc_array =
+            ColumnArrayFromArrayData(array->data(), offset, length).ValueOrDie();
+        ASSERT_EQ(offset + array_offset, kc_array.bit_offset(0));
+        ASSERT_EQ(0, kc_array.bit_offset(1));
+        ASSERT_EQ(array->data()->buffers[0]->data(), kc_array.data(0));
+        ASSERT_EQ(array->data()->buffers[1]->data() + buffer_offset_bytes,
+                  kc_array.data(1));
+        ASSERT_EQ(array->data()->buffers[2]->data(), kc_array.data(2));
+        ASSERT_EQ(length, kc_array.length());
+        // When creating from ArrayData always create read-only
+        ASSERT_EQ(nullptr, kc_array.mutable_data(0));
+        ASSERT_EQ(nullptr, kc_array.mutable_data(1));
+        ASSERT_EQ(nullptr, kc_array.mutable_data(2));
+      }
+    }
+  }
+}
+
+TEST(KeyColumnArray, FromArrayDataBool) {
+  for (auto array_offset : {0, 1}) {
+    ARROW_SCOPED_TRACE("Array offset: ", array_offset);
+    for (auto offset : {0, 1}) {
+      ARROW_SCOPED_TRACE("Constructor offset: ", offset);
+      std::shared_ptr<Array> array = ArrayFromJSON(boolean(), "[true, false, null]");
+      array = array->Slice(array_offset);
+      int length = static_cast<int32_t>(array->length()) - offset - array_offset;
+      KeyColumnArray kc_array =
+          ColumnArrayFromArrayData(array->data(), offset, length).ValueOrDie();
+      ASSERT_EQ(offset + array_offset, kc_array.bit_offset(0));
+      ASSERT_EQ(offset + array_offset, kc_array.bit_offset(1));
+      ASSERT_EQ(array->data()->buffers[0]->data(), kc_array.data(0));
+      ASSERT_EQ(array->data()->buffers[1]->data(), kc_array.data(1));
+      ASSERT_EQ(length, kc_array.length());
+      ASSERT_EQ(nullptr, kc_array.mutable_data(0));
+      ASSERT_EQ(nullptr, kc_array.mutable_data(1));
+    }
+  }
+}
+
+TEST(KeyColumnArray, Slice) {
+  constexpr int kValuesByteLength = 128;
+  // Size needed for validity depends on byte_width but 16 will always be big enough
+  constexpr int kValidityByteLength = 16;
+  uint8_t validity_buffer[kValidityByteLength];
+  uint8_t values_buffer[kValuesByteLength];
+  for (auto byte_width : {2, 4}) {
+    ARROW_SCOPED_TRACE("Byte Width: ", byte_width);
+    int64_t length = kValuesByteLength / byte_width;
+    KeyColumnMetadata metadata(true, byte_width);
+    KeyColumnArray array(metadata, length, validity_buffer, values_buffer, nullptr);
+
+    for (int offset : {0, 4, 12}) {
+      ARROW_SCOPED_TRACE("Offset: ", offset);
+      for (int length : {0, 4}) {
+        ARROW_SCOPED_TRACE("Length: ", length);
+        KeyColumnArray sliced = array.Slice(offset, length);
+        int expected_validity_bit_offset = (offset == 0) ? 0 : 4;
+        int expected_validity_byte_offset = (offset == 12) ? 1 : 0;
+        int expected_values_byte_offset = byte_width * offset;
+        ASSERT_EQ(expected_validity_bit_offset, sliced.bit_offset(0));
+        ASSERT_EQ(0, sliced.bit_offset(1));
+        ASSERT_EQ(validity_buffer + expected_validity_byte_offset,
+                  sliced.mutable_data(0));
+        ASSERT_EQ(values_buffer + expected_values_byte_offset, sliced.mutable_data(1));
+      }
+    }
+  }
+}
+
+TEST(KeyColumnArray, SliceBool) {
+  constexpr int kValuesByteLength = 2;
+  constexpr int kValidityByteLength = 2;
+  uint8_t validity_buffer[kValidityByteLength];
+  uint8_t values_buffer[kValuesByteLength];
+  int length = 16;
+  KeyColumnMetadata metadata(true, /*byte_width=*/0);
+  KeyColumnArray array(metadata, length, validity_buffer, values_buffer, nullptr);
+
+  for (int offset : {0, 4, 12}) {
+    ARROW_SCOPED_TRACE("Offset: ", offset);
+    for (int length : {0, 4}) {
+      ARROW_SCOPED_TRACE("Length: ", length);
+      KeyColumnArray sliced = array.Slice(offset, length);
+      int expected_bit_offset = (offset == 0) ? 0 : 4;
+      int expected_byte_offset = (offset == 12) ? 1 : 0;
+      ASSERT_EQ(expected_bit_offset, sliced.bit_offset(0));
+      ASSERT_EQ(expected_bit_offset, sliced.bit_offset(1));
+      ASSERT_EQ(validity_buffer + expected_byte_offset, sliced.mutable_data(0));
+      ASSERT_EQ(values_buffer + expected_byte_offset, sliced.mutable_data(1));
+    }
+  }
+}
+
+TEST(KeyColumnArray, FromExecBatch) {
+  ExecBatch batch =
+      ExecBatchFromJSON({int64(), boolean()}, "[[1, true], [2, false], [null, null]]");
+  std::vector<KeyColumnArray> arrays;
+  ASSERT_OK(ColumnArraysFromExecBatch(batch, &arrays));
+
+  ASSERT_EQ(2, arrays.size());
+  ASSERT_EQ(8, arrays[0].metadata().fixed_length);
+  ASSERT_EQ(0, arrays[1].metadata().fixed_length);
+  ASSERT_EQ(3, arrays[0].length());
+  ASSERT_EQ(3, arrays[1].length());
+
+  ASSERT_OK(ColumnArraysFromExecBatch(batch, 1, 1, &arrays));
+
+  ASSERT_EQ(2, arrays.size());
+  ASSERT_EQ(8, arrays[0].metadata().fixed_length);
+  ASSERT_EQ(0, arrays[1].metadata().fixed_length);
+  ASSERT_EQ(1, arrays[0].length());
+  ASSERT_EQ(1, arrays[1].length());
+}
+
+TEST(ResizableArrayData, Basic) {
+  std::unique_ptr<MemoryPool> pool = MemoryPool::CreateDefault();
+  for (const auto& type : kSampleFixedDataTypes) {
+    ARROW_SCOPED_TRACE("Type: ", type->ToString());
+    int byte_width =
+        arrow::internal::checked_pointer_cast<FixedWidthType>(type)->bit_width() / 8;
+    {
+      ResizableArrayData array;
+      array.Init(type, pool.get(), /*log_num_rows_min=*/16);
+      ASSERT_EQ(0, array.num_rows());
+      ASSERT_OK(array.ResizeFixedLengthBuffers(2));
+      ASSERT_EQ(2, array.num_rows());
+      // Even though we are only asking for 2 rows we specified a rather high
+      // log_num_rows_min so it should allocate at least that many rows.  Padding
+      // and rounding up to a power of 2 will make the allocations larger.
+      int min_bytes_needed_for_values = byte_width * (1 << 16);
+      int min_bytes_needed_for_validity = (1 << 16) / 8;
+      int min_bytes_needed = min_bytes_needed_for_values + min_bytes_needed_for_validity;
+      ASSERT_LT(min_bytes_needed, pool->bytes_allocated());
+      ASSERT_GT(min_bytes_needed * 2, pool->bytes_allocated());
+
+      ASSERT_OK(array.ResizeFixedLengthBuffers(1 << 17));
+      ASSERT_LT(min_bytes_needed * 2, pool->bytes_allocated());
+      ASSERT_GT(min_bytes_needed * 4, pool->bytes_allocated());
+      ASSERT_EQ(1 << 17, array.num_rows());
+
+      // Shrinking array won't shrink allocated RAM
+      ASSERT_OK(array.ResizeFixedLengthBuffers(2));
+      ASSERT_LT(min_bytes_needed * 2, pool->bytes_allocated());
+      ASSERT_GT(min_bytes_needed * 4, pool->bytes_allocated());
+      ASSERT_EQ(2, array.num_rows());
+    }
+    // After array is destroyed buffers should be freed
+    ASSERT_EQ(0, pool->bytes_allocated());
+  }
+}
+
+TEST(ResizableArrayData, Binary) {
+  std::unique_ptr<MemoryPool> pool = MemoryPool::CreateDefault();
+  for (const auto& type : kSampleBinaryTypes) {
+    ARROW_SCOPED_TRACE("Type: ", type->ToString());
+    {
+      ResizableArrayData array;
+      array.Init(type, pool.get(), /*log_num_rows_min=*/4);
+      ASSERT_EQ(0, array.num_rows());
+      ASSERT_OK(array.ResizeFixedLengthBuffers(2));
+      ASSERT_EQ(2, array.num_rows());
+      // At this point the offets memory has been allocated and needs to be filled
+      // in before we allocate the variable length memory
+      int offsets_width =
+          static_cast<int>(arrow::internal::checked_pointer_cast<BaseBinaryType>(type)
+                               ->layout()
+                               .buffers[1]
+                               .byte_width);
+      if (offsets_width == 4) {
+        auto offsets = reinterpret_cast<int32_t*>(array.mutable_data(1));
+        offsets[0] = 0;
+        offsets[1] = 1000;
+        offsets[2] = 2000;
+      } else if (offsets_width == 8) {
+        auto offsets = reinterpret_cast<int64_t*>(array.mutable_data(1));
+        offsets[0] = 0;
+        offsets[1] = 1000;
+        offsets[2] = 2000;
+      } else {
+        FAIL() << "Unexpected offsets_width: " << offsets_width;
+      }
+      ASSERT_OK(array.ResizeVaryingLengthBuffer());
+      // Each string is 1000 bytes.  The offsets, padding, etc. should be less than 1000
+      // bytes
+      ASSERT_LT(2000, pool->bytes_allocated());
+      ASSERT_GT(3000, pool->bytes_allocated());
+    }
+    // After array is destroyed buffers should be freed
+    ASSERT_EQ(0, pool->bytes_allocated());
+  }
+}
+
+TEST(ExecBatchBuilder, AppendBatches) {
+  std::unique_ptr<MemoryPool> owned_pool = MemoryPool::CreateDefault();
+  MemoryPool* pool = owned_pool.get();
+  ExecBatch batch_one =
+      ExecBatchFromJSON({int64(), boolean()}, "[[1, true], [2, false], [null, null]]");
+  ExecBatch batch_two =
+      ExecBatchFromJSON({int64(), boolean()}, "[[null, true], [5, true], [6, false]]");
+  ExecBatch combined = ExecBatchFromJSON(
+      {int64(), boolean()},
+      "[[1, true], [2, false], [null, null], [null, true], [5, true], [6, false]]");
+  {
+    ExecBatchBuilder builder;
+    uint16_t row_ids[3] = {0, 1, 2};
+    ASSERT_OK(builder.AppendSelected(pool, batch_one, 3, row_ids, /*num_cols=*/2));
+    ASSERT_OK(builder.AppendSelected(pool, batch_two, 3, row_ids, /*num_cols=*/2));
+    ExecBatch built = builder.Flush();
+    ASSERT_EQ(combined, built);
+    ASSERT_NE(0, pool->bytes_allocated());
+  }
+  ASSERT_EQ(0, pool->bytes_allocated());
+}
+
+TEST(ExecBatchBuilder, AppendBatchesSomeRows) {
+  std::unique_ptr<MemoryPool> owned_pool = MemoryPool::CreateDefault();
+  MemoryPool* pool = owned_pool.get();
+  ExecBatch batch_one =
+      ExecBatchFromJSON({int64(), boolean()}, "[[1, true], [2, false], [null, null]]");
+  ExecBatch batch_two =
+      ExecBatchFromJSON({int64(), boolean()}, "[[null, true], [5, true], [6, false]]");
+  ExecBatch combined = ExecBatchFromJSON(
+      {int64(), boolean()}, "[[1, true], [2, false], [null, true], [5, true]]");
+  {
+    ExecBatchBuilder builder;
+    uint16_t row_ids[2] = {0, 1};
+    ASSERT_OK(builder.AppendSelected(pool, batch_one, 2, row_ids, /*num_cols=*/2));
+    ASSERT_OK(builder.AppendSelected(pool, batch_two, 2, row_ids, /*num_cols=*/2));
+    ExecBatch built = builder.Flush();
+    ASSERT_EQ(combined, built);
+    ASSERT_NE(0, pool->bytes_allocated());
+  }
+  ASSERT_EQ(0, pool->bytes_allocated());
+}
+
+TEST(ExecBatchBuilder, AppendBatchesSomeCols) {
+  std::unique_ptr<MemoryPool> owned_pool = MemoryPool::CreateDefault();
+  MemoryPool* pool = owned_pool.get();
+  ExecBatch batch_one =
+      ExecBatchFromJSON({int64(), boolean()}, "[[1, true], [2, false], [null, null]]");
+  ExecBatch batch_two =
+      ExecBatchFromJSON({int64(), boolean()}, "[[null, true], [5, true], [6, false]]");
+  ExecBatch first_col_only =
+      ExecBatchFromJSON({int64()}, "[[1], [2], [null], [null], [5], [6]]");
+  ExecBatch last_col_only = ExecBatchFromJSON(
+      {boolean()}, "[[true], [false], [null], [true], [true], [false]]");
+  {
+    ExecBatchBuilder builder;
+    uint16_t row_ids[3] = {0, 1, 2};
+    int first_col_ids[1] = {0};
+    ASSERT_OK(builder.AppendSelected(pool, batch_one, 3, row_ids, /*num_cols=*/1,
+                                     first_col_ids));
+    ASSERT_OK(builder.AppendSelected(pool, batch_two, 3, row_ids, /*num_cols=*/1,
+                                     first_col_ids));
+    ExecBatch built = builder.Flush();
+    ASSERT_EQ(first_col_only, built);
+    ASSERT_NE(0, pool->bytes_allocated());
+  }
+  {
+    ExecBatchBuilder builder;
+    uint16_t row_ids[3] = {0, 1, 2};
+    // If we don't specify col_ids and num_cols is 1 it is implicitly the first col
+    ASSERT_OK(builder.AppendSelected(pool, batch_one, 3, row_ids, /*num_cols=*/1));
+    ASSERT_OK(builder.AppendSelected(pool, batch_two, 3, row_ids, /*num_cols=*/1));
+    ExecBatch built = builder.Flush();
+    ASSERT_EQ(first_col_only, built);
+    ASSERT_NE(0, pool->bytes_allocated());
+  }
+  {
+    ExecBatchBuilder builder;
+    uint16_t row_ids[3] = {0, 1, 2};
+    int last_col_ids[1] = {1};
+    ASSERT_OK(builder.AppendSelected(pool, batch_one, 3, row_ids, /*num_cols=*/1,
+                                     last_col_ids));
+    ASSERT_OK(builder.AppendSelected(pool, batch_two, 3, row_ids, /*num_cols=*/1,
+                                     last_col_ids));
+    ExecBatch built = builder.Flush();
+    ASSERT_EQ(last_col_only, built);
+    ASSERT_NE(0, pool->bytes_allocated());
+  }
+  ASSERT_EQ(0, pool->bytes_allocated());
+}
+
+TEST(ExecBatchBuilder, AppendNulls) {
+  std::unique_ptr<MemoryPool> owned_pool = MemoryPool::CreateDefault();
+  MemoryPool* pool = owned_pool.get();
+  ExecBatch batch_one =
+      ExecBatchFromJSON({int64(), boolean()}, "[[1, true], [2, false], [null, null]]");
+  ExecBatch combined = ExecBatchFromJSON(
+      {int64(), boolean()},
+      "[[1, true], [2, false], [null, null], [null, null], [null, null]]");
+  ExecBatch just_nulls =
+      ExecBatchFromJSON({int64(), boolean()}, "[[null, null], [null, null]]");
+  {
+    ExecBatchBuilder builder;
+    uint16_t row_ids[3] = {0, 1, 2};
+    ASSERT_OK(builder.AppendSelected(pool, batch_one, 3, row_ids, /*num_cols=*/2));
+    ASSERT_OK(builder.AppendNulls(pool, {int64(), boolean()}, 2));
+    ExecBatch built = builder.Flush();
+    ASSERT_EQ(combined, built);
+    ASSERT_NE(0, pool->bytes_allocated());
+  }
+  {
+    ExecBatchBuilder builder;
+    ASSERT_OK(builder.AppendNulls(pool, {int64(), boolean()}, 2));
+    ExecBatch built = builder.Flush();
+    ASSERT_EQ(just_nulls, built);
+    ASSERT_NE(0, pool->bytes_allocated());
+  }
+  ASSERT_EQ(0, pool->bytes_allocated());
+}
+
+TEST(ExecBatchBuilder, AppendNullsBeyondLimit) {
+  std::unique_ptr<MemoryPool> owned_pool = MemoryPool::CreateDefault();
+  int num_rows_max = ExecBatchBuilder::num_rows_max();
+  MemoryPool* pool = owned_pool.get();
+  {
+    ExecBatchBuilder builder;
+    ASSERT_OK(builder.AppendNulls(pool, {int64(), boolean()}, 10));
+    ASSERT_RAISES(CapacityError,
+                  builder.AppendNulls(pool, {int64(), boolean()}, num_rows_max + 1 - 10));
+    ExecBatch built = builder.Flush();
+    ASSERT_EQ(10, built.length);
+    ASSERT_NE(0, pool->bytes_allocated());
+  }
+  ASSERT_EQ(0, pool->bytes_allocated());
+}
+
+TEST(ExecBatchBuilder, AppendValuesBeyondLimit) {
+  std::unique_ptr<MemoryPool> owned_pool = MemoryPool::CreateDefault();
+  MemoryPool* pool = owned_pool.get();
+  int num_rows_max = ExecBatchBuilder::num_rows_max();
+  std::shared_ptr<Array> values = ConstantArrayGenerator::Int32(num_rows_max + 1);
+  std::shared_ptr<Array> trimmed_values = ConstantArrayGenerator::Int32(10);
+  ExecBatch batch({values}, num_rows_max + 1);
+  ExecBatch trimmed_batch({trimmed_values}, 10);
+  std::vector<uint16_t> first_set_row_ids(10);
+  std::iota(first_set_row_ids.begin(), first_set_row_ids.end(), 0);
+  std::vector<uint16_t> second_set_row_ids(num_rows_max + 1 - 10);
+  std::iota(second_set_row_ids.begin(), second_set_row_ids.end(), 10);
+  {
+    ExecBatchBuilder builder;
+    ASSERT_OK(builder.AppendSelected(pool, batch, 10, first_set_row_ids.data(),
+                                     /*num_cols=*/1));
+    ASSERT_RAISES(CapacityError,
+                  builder.AppendSelected(pool, batch, num_rows_max + 1 - 10,
+                                         second_set_row_ids.data(),
+                                         /*num_cols=*/1));
+    ExecBatch built = builder.Flush();
+    ASSERT_EQ(trimmed_batch, built);
+    ASSERT_NE(0, pool->bytes_allocated());
+  }
+  ASSERT_EQ(0, pool->bytes_allocated());
+}
+
+}  // namespace compute
+}  // namespace arrow