You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2023/04/04 01:10:03 UTC

[arrow] branch main updated: GH-34382: [C++] Support more types in run_end_encode and run_end_decode functions (#34761)

This is an automated email from the ASF dual-hosted git repository.

westonpace pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/main by this push:
     new 6d109fbf8d GH-34382: [C++] Support more types in run_end_encode and run_end_decode functions (#34761)
6d109fbf8d is described below

commit 6d109fbf8d8d7adae54a558a3b72dd9b9ab88656
Author: Felipe Oliveira Carvalho <fe...@gmail.com>
AuthorDate: Mon Apr 3 22:09:55 2023 -0300

    GH-34382: [C++] Support more types in run_end_encode and run_end_decode functions (#34761)
    
    
    * Closes: #34382
    
    Authored-by: Felipe Oliveira Carvalho <fe...@gmail.com>
    Signed-off-by: Weston Pace <we...@gmail.com>
---
 .../arrow/compute/kernels/vector_run_end_encode.cc | 733 +++++++++++++++------
 .../compute/kernels/vector_run_end_encode_test.cc  | 160 +++--
 2 files changed, 624 insertions(+), 269 deletions(-)

diff --git a/cpp/src/arrow/compute/kernels/vector_run_end_encode.cc b/cpp/src/arrow/compute/kernels/vector_run_end_encode.cc
index 4ac9f93b47..718086b21e 100644
--- a/cpp/src/arrow/compute/kernels/vector_run_end_encode.cc
+++ b/cpp/src/arrow/compute/kernels/vector_run_end_encode.cc
@@ -18,7 +18,9 @@
 #include <utility>
 
 #include "arrow/compute/api_vector.h"
+#include "arrow/compute/kernel.h"
 #include "arrow/compute/kernels/common_internal.h"
+#include "arrow/type_traits.h"
 #include "arrow/util/checked_cast.h"
 #include "arrow/util/ree_util.h"
 
@@ -27,78 +29,278 @@ namespace compute {
 namespace internal {
 
 template <typename ArrowType, bool has_validity_buffer, typename Enable = void>
-struct ReadValueImpl {};
+struct ReadWriteValueImpl {};
 
 // Numeric and primitive C-compatible types
 template <typename ArrowType, bool has_validity_buffer>
-struct ReadValueImpl<ArrowType, has_validity_buffer, enable_if_has_c_type<ArrowType>> {
-  using CType = typename ArrowType::c_type;
+class ReadWriteValueImpl<ArrowType, has_validity_buffer,
+                         enable_if_has_c_type<ArrowType>> {
+ public:
+  using ValueRepr = typename ArrowType::c_type;
+
+ private:
+  const uint8_t* input_validity_;
+  const uint8_t* input_values_;
+
+  // Needed only by the writing functions
+  uint8_t* output_validity_;
+  uint8_t* output_values_;
 
-  [[nodiscard]] bool ReadValue(const uint8_t* input_validity, const void* input_values,
-                               CType* out, int64_t read_offset) const {
+ public:
+  explicit ReadWriteValueImpl(const ArraySpan& input_values_array,
+                              ArrayData* output_values_array_data)
+      : input_validity_(has_validity_buffer ? input_values_array.buffers[0].data
+                                            : NULLPTR),
+        input_values_(input_values_array.buffers[1].data),
+        output_validity_((has_validity_buffer && output_values_array_data)
+                             ? output_values_array_data->buffers[0]->mutable_data()
+                             : NULLPTR),
+        output_values_(output_values_array_data
+                           ? output_values_array_data->buffers[1]->mutable_data()
+                           : NULLPTR) {}
+
+  [[nodiscard]] bool ReadValue(ValueRepr* out, int64_t read_offset) const {
     bool valid = true;
     if constexpr (has_validity_buffer) {
-      valid = bit_util::GetBit(input_validity, read_offset);
+      valid = bit_util::GetBit(input_validity_, read_offset);
     }
     if constexpr (std::is_same_v<ArrowType, BooleanType>) {
-      *out =
-          bit_util::GetBit(reinterpret_cast<const uint8_t*>(input_values), read_offset);
+      *out = bit_util::GetBit(input_values_, read_offset);
     } else {
-      *out = (reinterpret_cast<const CType*>(input_values))[read_offset];
+      *out = (reinterpret_cast<const ValueRepr*>(input_values_))[read_offset];
     }
     return valid;
   }
-};
-
-template <typename ArrowType, bool has_validity_buffer, typename Enable = void>
-struct WriteValueImpl {};
 
-// Numeric and primitive C-compatible types
-template <typename ArrowType, bool has_validity_buffer>
-struct WriteValueImpl<ArrowType, has_validity_buffer, enable_if_has_c_type<ArrowType>> {
-  using CType = typename ArrowType::c_type;
+  /// \brief Ensure padding is zeroed in validity bitmap.
+  void ZeroValidityPadding(int64_t length) const {
+    DCHECK(output_values_);
+    if constexpr (has_validity_buffer) {
+      DCHECK(output_validity_);
+      const int64_t validity_buffer_size = bit_util::BytesForBits(length);
+      output_validity_[validity_buffer_size - 1] = 0;
+    }
+  }
 
-  void WriteValue(uint8_t* output_validity, void* output_values, int64_t write_offset,
-                  bool valid, CType value) const {
+  void WriteValue(int64_t write_offset, bool valid, ValueRepr value) const {
     if constexpr (has_validity_buffer) {
-      bit_util::SetBitTo(output_validity, write_offset, valid);
+      bit_util::SetBitTo(output_validity_, write_offset, valid);
     }
     if (valid) {
       if constexpr (std::is_same_v<ArrowType, BooleanType>) {
-        bit_util::SetBitTo(reinterpret_cast<uint8_t*>(output_values), write_offset,
-                           value);
+        bit_util::SetBitTo(output_values_, write_offset, value);
       } else {
-        (reinterpret_cast<CType*>(output_values))[write_offset] = value;
+        (reinterpret_cast<ValueRepr*>(output_values_))[write_offset] = value;
       }
     }
   }
 
-  void WriteRun(uint8_t* output_validity, void* output_values, int64_t write_offset,
-                int64_t run_length, bool valid, CType value) const {
+  void WriteRun(int64_t write_offset, int64_t run_length, bool valid,
+                ValueRepr value) const {
     if constexpr (has_validity_buffer) {
-      bit_util::SetBitsTo(output_validity, write_offset, run_length, valid);
+      bit_util::SetBitsTo(output_validity_, write_offset, run_length, valid);
     }
     if (valid) {
       if constexpr (std::is_same_v<ArrowType, BooleanType>) {
-        bit_util::SetBitsTo(reinterpret_cast<uint8_t*>(output_values), write_offset,
+        bit_util::SetBitsTo(reinterpret_cast<uint8_t*>(output_values_), write_offset,
                             run_length, value);
       } else {
-        auto* output_values_c = reinterpret_cast<CType*>(output_values);
+        auto* output_values_c = reinterpret_cast<ValueRepr*>(output_values_);
         std::fill(output_values_c + write_offset,
                   output_values_c + write_offset + run_length, value);
       }
     }
   }
+
+  bool Compare(ValueRepr lhs, ValueRepr rhs) const { return lhs == rhs; }
+};
+
+// FixedSizeBinary, Decimal128
+template <typename ArrowType, bool has_validity_buffer>
+class ReadWriteValueImpl<ArrowType, has_validity_buffer,
+                         enable_if_fixed_size_binary<ArrowType>> {
+ public:
+  // Every value is represented as a pointer to byte_width_ bytes
+  using ValueRepr = uint8_t const*;
+
+ private:
+  const uint8_t* input_validity_;
+  const uint8_t* input_values_;
+
+  // Needed only by the writing functions
+  uint8_t* output_validity_;
+  uint8_t* output_values_;
+
+  const size_t byte_width_;
+
+ public:
+  ReadWriteValueImpl(const ArraySpan& input_values_array,
+                     ArrayData* output_values_array_data)
+      : input_validity_(has_validity_buffer ? input_values_array.buffers[0].data
+                                            : NULLPTR),
+        input_values_(input_values_array.buffers[1].data),
+        output_validity_((has_validity_buffer && output_values_array_data)
+                             ? output_values_array_data->buffers[0]->mutable_data()
+                             : NULLPTR),
+        output_values_(output_values_array_data
+                           ? output_values_array_data->buffers[1]->mutable_data()
+                           : NULLPTR),
+        byte_width_(input_values_array.type->byte_width()) {}
+
+  [[nodiscard]] bool ReadValue(ValueRepr* out, int64_t read_offset) const {
+    bool valid = true;
+    if constexpr (has_validity_buffer) {
+      valid = bit_util::GetBit(input_validity_, read_offset);
+    }
+    *out = input_values_ + (read_offset * byte_width_);
+    return valid;
+  }
+
+  /// \brief Ensure padding is zeroed in validity bitmap.
+  void ZeroValidityPadding(int64_t length) const {
+    DCHECK(output_values_);
+    if constexpr (has_validity_buffer) {
+      DCHECK(output_validity_);
+      const int64_t validity_buffer_size = bit_util::BytesForBits(length);
+      output_validity_[validity_buffer_size - 1] = 0;
+    }
+  }
+
+  void WriteValue(int64_t write_offset, bool valid, ValueRepr value) const {
+    if constexpr (has_validity_buffer) {
+      bit_util::SetBitTo(output_validity_, write_offset, valid);
+    }
+    if (valid) {
+      memcpy(output_values_ + (write_offset * byte_width_), value, byte_width_);
+    }
+  }
+
+  void WriteRun(int64_t write_offset, int64_t run_length, bool valid,
+                ValueRepr value) const {
+    if constexpr (has_validity_buffer) {
+      bit_util::SetBitsTo(output_validity_, write_offset, run_length, valid);
+    }
+    if (valid) {
+      uint8_t* ptr = output_values_ + (write_offset * byte_width_);
+      for (int64_t i = 0; i < run_length; ++i) {
+        memcpy(ptr, value, byte_width_);
+        ptr += byte_width_;
+      }
+    }
+  }
+
+  bool Compare(ValueRepr lhs, ValueRepr rhs) const {
+    return memcmp(lhs, rhs, byte_width_) == 0;
+  }
+};
+
+// Binary, String...
+template <typename ArrowType, bool has_validity_buffer>
+class ReadWriteValueImpl<ArrowType, has_validity_buffer,
+                         enable_if_base_binary<ArrowType>> {
+ public:
+  using ValueRepr = std::string_view;
+  using offset_type = typename ArrowType::offset_type;
+
+ private:
+  const uint8_t* input_validity_;
+  const offset_type* input_offsets_;
+  const uint8_t* input_values_;
+
+  // Needed only by the writing functions
+  uint8_t* output_validity_;
+  offset_type* output_offsets_;
+  uint8_t* output_values_;
+
+ public:
+  ReadWriteValueImpl(const ArraySpan& input_values_array,
+                     ArrayData* output_values_array_data)
+      : input_validity_(has_validity_buffer ? input_values_array.buffers[0].data
+                                            : NULLPTR),
+        input_offsets_(input_values_array.template GetValues<offset_type>(1, 0)),
+        input_values_(input_values_array.buffers[2].data),
+        output_validity_((has_validity_buffer && output_values_array_data)
+                             ? output_values_array_data->buffers[0]->mutable_data()
+                             : NULLPTR),
+        output_offsets_(
+            output_values_array_data
+                ? output_values_array_data->template GetMutableValues<offset_type>(1, 0)
+                : NULLPTR),
+        output_values_(output_values_array_data
+                           ? output_values_array_data->buffers[2]->mutable_data()
+                           : NULLPTR) {}
+
+  [[nodiscard]] bool ReadValue(ValueRepr* out, int64_t read_offset) const {
+    bool valid = true;
+    if constexpr (has_validity_buffer) {
+      valid = bit_util::GetBit(input_validity_, read_offset);
+    }
+    if (valid) {
+      const offset_type offset0 = input_offsets_[read_offset];
+      const offset_type offset1 = input_offsets_[read_offset + 1];
+      *out = std::string_view(reinterpret_cast<const char*>(input_values_ + offset0),
+                              offset1 - offset0);
+    }
+    return valid;
+  }
+
+  /// \brief Ensure padding is zeroed in validity bitmap.
+  void ZeroValidityPadding(int64_t length) const {
+    DCHECK(output_values_);
+    if constexpr (has_validity_buffer) {
+      DCHECK(output_validity_);
+      const int64_t validity_buffer_size = bit_util::BytesForBits(length);
+      output_validity_[validity_buffer_size - 1] = 0;
+    }
+  }
+
+  void WriteValue(int64_t write_offset, bool valid, ValueRepr value) const {
+    if constexpr (has_validity_buffer) {
+      bit_util::SetBitTo(output_validity_, write_offset, valid);
+    }
+    const offset_type offset0 = output_offsets_[write_offset];
+    const offset_type offset1 =
+        offset0 + (valid ? static_cast<offset_type>(value.size()) : 0);
+    output_offsets_[write_offset + 1] = offset1;
+    if (valid) {
+      memcpy(output_values_ + offset0, value.data(), value.size());
+    }
+  }
+
+  void WriteRun(int64_t write_offset, int64_t run_length, bool valid,
+                ValueRepr value) const {
+    if constexpr (has_validity_buffer) {
+      bit_util::SetBitsTo(output_validity_, write_offset, run_length, valid);
+    }
+    if (valid) {
+      int64_t i = write_offset;
+      offset_type offset = output_offsets_[i];
+      while (i < write_offset + run_length) {
+        memcpy(output_values_ + offset, value.data(), value.size());
+        offset += static_cast<offset_type>(value.size());
+        i += 1;
+        output_offsets_[i] = offset;
+      }
+    } else {
+      offset_type offset = output_offsets_[write_offset];
+      offset_type* begin = output_offsets_ + write_offset + 1;
+      std::fill(begin, begin + run_length, offset);
+    }
+  }
+
+  bool Compare(ValueRepr lhs, ValueRepr rhs) const { return lhs == rhs; }
 };
 
-Result<std::shared_ptr<Buffer>> AllocatePrimitiveBuffer(int64_t length,
-                                                        const DataType& type,
-                                                        MemoryPool* pool) {
-  DCHECK(is_primitive(type.id()));
+Result<std::shared_ptr<Buffer>> AllocateValuesBuffer(int64_t length, const DataType& type,
+                                                     MemoryPool* pool,
+                                                     int64_t data_buffer_size) {
   if (type.bit_width() == 1) {
     return AllocateBitmap(length, pool);
-  } else {
+  } else if (is_fixed_width(type.id())) {
     return AllocateBuffer(length * type.byte_width(), pool);
+  } else {
+    DCHECK(is_base_binary_like(type.id()));
+    return AllocateBuffer(data_buffer_size, pool);
   }
 }
 
@@ -115,62 +317,50 @@ template <typename RunEndType, typename ValueType, bool has_validity_buffer>
 class RunEndEncodingLoop {
  public:
   using RunEndCType = typename RunEndType::c_type;
-  using CType = typename ValueType::c_type;
+
+ private:
+  using ReadWriteValue = ReadWriteValueImpl<ValueType, has_validity_buffer>;
+  using ValueRepr = typename ReadWriteValue::ValueRepr;
 
  private:
   const int64_t input_length_;
   const int64_t input_offset_;
-
-  const uint8_t* input_validity_;
-  const void* input_values_;
-
+  ReadWriteValue read_write_value_;
   // Needed only by WriteEncodedRuns()
-  uint8_t* output_validity_;
-  void* output_values_;
   RunEndCType* output_run_ends_;
 
  public:
-  RunEndEncodingLoop(int64_t input_length, int64_t input_offset,
-                     const uint8_t* input_validity, const void* input_values,
-                     uint8_t* output_validity = NULLPTR, void* output_values = NULLPTR,
-                     RunEndCType* output_run_ends = NULLPTR)
-      : input_length_(input_length),
-        input_offset_(input_offset),
-        input_validity_(input_validity),
-        input_values_(input_values),
-        output_validity_(output_validity),
-        output_values_(output_values),
+  explicit RunEndEncodingLoop(const ArraySpan& input_array,
+                              ArrayData* output_values_array_data,
+                              RunEndCType* output_run_ends)
+      : input_length_(input_array.length),
+        input_offset_(input_array.offset),
+        read_write_value_(input_array, output_values_array_data),
         output_run_ends_(output_run_ends) {
-    DCHECK_GT(input_length, 0);
-  }
-
- private:
-  [[nodiscard]] inline bool ReadValue(CType* out, int64_t read_offset) const {
-    return ReadValueImpl<ValueType, has_validity_buffer>{}.ReadValue(
-        input_validity_, input_values_, out, read_offset);
-  }
-
-  inline void WriteValue(int64_t write_offset, bool valid, CType value) {
-    WriteValueImpl<ValueType, has_validity_buffer>{}.WriteValue(
-        output_validity_, output_values_, write_offset, valid, value);
+    DCHECK_GT(input_array.length, 0);
   }
 
- public:
   /// \brief Give a pass over the input data and count the number of runs
   ///
-  /// \return a pair with the number of non-null run values and total number of runs
-  ARROW_NOINLINE std::pair<int64_t, int64_t> CountNumberOfRuns() const {
+  /// \return a tuple with the number of non-null run values, the total number of runs,
+  /// and the data buffer size for string and binary types
+  ARROW_NOINLINE std::tuple<int64_t, int64_t, int64_t> CountNumberOfRuns() const {
     int64_t read_offset = input_offset_;
-    CType current_run;
-    bool current_run_valid = ReadValue(&current_run, read_offset);
+    ValueRepr current_run;
+    bool current_run_valid = read_write_value_.ReadValue(&current_run, read_offset);
     read_offset += 1;
     int64_t num_valid_runs = current_run_valid ? 1 : 0;
     int64_t num_output_runs = 1;
+    int64_t data_buffer_size = 0;
+    if constexpr (is_base_binary_like(ValueType::type_id)) {
+      data_buffer_size = current_run_valid ? current_run.size() : 0;
+    }
     for (; read_offset < input_offset_ + input_length_; read_offset += 1) {
-      CType value;
-      const bool valid = ReadValue(&value, read_offset);
+      ValueRepr value;
+      const bool valid = read_write_value_.ReadValue(&value, read_offset);
 
-      const bool open_new_run = valid != current_run_valid || value != current_run;
+      const bool open_new_run =
+          valid != current_run_valid || !read_write_value_.Compare(value, current_run);
       if (open_new_run) {
         // Open the new run
         current_run = value;
@@ -178,27 +368,30 @@ class RunEndEncodingLoop {
         // Count the new run
         num_output_runs += 1;
         num_valid_runs += valid ? 1 : 0;
+        if constexpr (is_base_binary_like(ValueType::type_id)) {
+          data_buffer_size += valid ? current_run.size() : 0;
+        }
       }
     }
-    return std::make_pair(num_valid_runs, num_output_runs);
+    return std::make_tuple(num_valid_runs, num_output_runs, data_buffer_size);
   }
 
   ARROW_NOINLINE int64_t WriteEncodedRuns() {
-    DCHECK(output_values_);
     DCHECK(output_run_ends_);
     int64_t read_offset = input_offset_;
     int64_t write_offset = 0;
-    CType current_run;
-    bool current_run_valid = ReadValue(&current_run, read_offset);
+    ValueRepr current_run;
+    bool current_run_valid = read_write_value_.ReadValue(&current_run, read_offset);
     read_offset += 1;
     for (; read_offset < input_offset_ + input_length_; read_offset += 1) {
-      CType value;
-      const bool valid = ReadValue(&value, read_offset);
+      ValueRepr value;
+      const bool valid = read_write_value_.ReadValue(&value, read_offset);
 
-      const bool open_new_run = valid != current_run_valid || value != current_run;
+      const bool open_new_run =
+          valid != current_run_valid || !read_write_value_.Compare(value, current_run);
       if (open_new_run) {
         // Close the current run first by writing it out
-        WriteValue(write_offset, current_run_valid, current_run);
+        read_write_value_.WriteValue(write_offset, current_run_valid, current_run);
         const int64_t run_end = read_offset - input_offset_;
         output_run_ends_[write_offset] = static_cast<RunEndCType>(run_end);
         write_offset += 1;
@@ -207,7 +400,7 @@ class RunEndEncodingLoop {
         current_run = value;
       }
     }
-    WriteValue(write_offset, current_run_valid, current_run);
+    read_write_value_.WriteValue(write_offset, current_run_valid, current_run);
     DCHECK_EQ(input_length_, read_offset - input_offset_);
     output_run_ends_[write_offset] = static_cast<RunEndCType>(input_length_);
     return write_offset + 1;
@@ -227,34 +420,58 @@ Status ValidateRunEndType(int64_t input_length) {
   return Status::OK();
 }
 
-/// \brief Preallocate the ArrayData for the run-end encoded version
-/// of the input array
-template <typename RunEndType, bool has_validity_buffer>
-Result<std::shared_ptr<ArrayData>> PreallocateREEData(const ArraySpan& input_array,
-                                                      int64_t physical_length,
-                                                      int64_t physical_null_count,
-                                                      MemoryPool* pool) {
+Result<std::shared_ptr<ArrayData>> PreallocateRunEndsArray(
+    const std::shared_ptr<DataType>& run_end_type, int64_t physical_length,
+    MemoryPool* pool) {
   ARROW_ASSIGN_OR_RAISE(
       auto run_ends_buffer,
-      AllocateBuffer(physical_length * RunEndType().byte_width(), pool));
+      AllocateBuffer(physical_length * run_end_type->byte_width(), pool));
+  return ArrayData::Make(run_end_type, physical_length,
+                         {NULLPTR, std::move(run_ends_buffer)}, /*null_count=*/0);
+}
+
+ARROW_NOINLINE Result<std::shared_ptr<ArrayData>> PreallocateValuesArray(
+    const std::shared_ptr<DataType>& value_type, bool has_validity_buffer, int64_t length,
+    int64_t null_count, MemoryPool* pool, int64_t data_buffer_size) {
+  std::vector<std::shared_ptr<Buffer>> values_data_buffers;
   std::shared_ptr<Buffer> validity_buffer = NULLPTR;
-  if constexpr (has_validity_buffer) {
-    ARROW_ASSIGN_OR_RAISE(validity_buffer, AllocateBitmap(physical_length, pool));
+  if (has_validity_buffer) {
+    ARROW_ASSIGN_OR_RAISE(validity_buffer, AllocateBitmap(length, pool));
+  }
+  ARROW_ASSIGN_OR_RAISE(auto values_buffer, AllocateValuesBuffer(length, *value_type,
+                                                                 pool, data_buffer_size));
+  if (is_base_binary_like(value_type->id())) {
+    const int offset_byte_width = offset_bit_width(value_type->id()) / 8;
+    ARROW_ASSIGN_OR_RAISE(auto offsets_buffer,
+                          AllocateBuffer((length + 1) * offset_byte_width, pool));
+    // Ensure the first offset is zero
+    memset(offsets_buffer->mutable_data(), 0, offset_byte_width);
+    offsets_buffer->ZeroPadding();
+    values_data_buffers = {std::move(validity_buffer), std::move(offsets_buffer),
+                           std::move(values_buffer)};
+  } else {
+    values_data_buffers = {std::move(validity_buffer), std::move(values_buffer)};
   }
+  return ArrayData::Make(value_type, length, std::move(values_data_buffers), null_count);
+}
+
+/// \brief Preallocate the ArrayData for the run-end encoded version
+/// of the flat input array
+///
+/// \param data_buffer_size the size of the data buffer for string and binary types
+ARROW_NOINLINE Result<std::shared_ptr<ArrayData>> PreallocateREEArray(
+    std::shared_ptr<RunEndEncodedType> ree_type, bool has_validity_buffer,
+    int64_t logical_length, int64_t physical_length, int64_t physical_null_count,
+    MemoryPool* pool, int64_t data_buffer_size) {
+  ARROW_ASSIGN_OR_RAISE(
+      auto run_ends_data,
+      PreallocateRunEndsArray(ree_type->run_end_type(), physical_length, pool));
   ARROW_ASSIGN_OR_RAISE(
-      auto values_buffer,
-      AllocatePrimitiveBuffer(physical_length, *input_array.type, pool));
-
-  auto ree_type = std::make_shared<RunEndEncodedType>(std::make_shared<RunEndType>(),
-                                                      input_array.type->GetSharedPtr());
-  auto run_ends_data =
-      ArrayData::Make(ree_type->run_end_type(), physical_length,
-                      {NULLPTR, std::move(run_ends_buffer)}, /*null_count=*/0);
-  auto values_data = ArrayData::Make(
-      ree_type->value_type(), physical_length,
-      {std::move(validity_buffer), std::move(values_buffer)}, physical_null_count);
-
-  return ArrayData::Make(std::move(ree_type), input_array.length, {NULLPTR},
+      auto values_data,
+      PreallocateValuesArray(ree_type->value_type(), has_validity_buffer, physical_length,
+                             physical_null_count, pool, data_buffer_size));
+
+  return ArrayData::Make(std::move(ree_type), logical_length, {NULLPTR},
                          {std::move(run_ends_data), std::move(values_data)},
                          /*null_count=*/0);
 }
@@ -268,21 +485,20 @@ class RunEndEncodeImpl {
 
  public:
   using RunEndCType = typename RunEndType::c_type;
-  using CType = typename ValueType::c_type;
 
   RunEndEncodeImpl(KernelContext* ctx, const ArraySpan& input_array, ExecResult* out)
       : ctx_{ctx}, input_array_{input_array}, output_{out} {}
 
   Status Exec() {
     const int64_t input_length = input_array_.length;
-    const int64_t input_offset = input_array_.offset;
-    const auto* input_validity = input_array_.buffers[0].data;
-    const auto* input_values = input_array_.buffers[1].data;
 
+    auto ree_type = std::make_shared<RunEndEncodedType>(
+        TypeTraits<RunEndType>::type_singleton(), input_array_.type->GetSharedPtr());
     if (input_length == 0) {
-      ARROW_ASSIGN_OR_RAISE(auto output_array_data,
-                            (PreallocateREEData<RunEndType, has_validity_buffer>(
-                                input_array_, 0, 0, ctx_->memory_pool())));
+      ARROW_ASSIGN_OR_RAISE(
+          auto output_array_data,
+          PreallocateREEArray(std::move(ree_type), has_validity_buffer, input_length, 0,
+                              0, ctx_->memory_pool(), 0));
       output_->value = std::move(output_array_data);
       return Status::OK();
     }
@@ -290,29 +506,30 @@ class RunEndEncodeImpl {
     // First pass: count the number of runs
     int64_t num_valid_runs = 0;
     int64_t num_output_runs = 0;
+    int64_t data_buffer_size = 0;  // for string and binary types
     RETURN_NOT_OK(ValidateRunEndType<RunEndType>(input_length));
 
     RunEndEncodingLoop<RunEndType, ValueType, has_validity_buffer> counting_loop(
-        input_array_.length, input_array_.offset, input_validity, input_values);
-    std::tie(num_valid_runs, num_output_runs) = counting_loop.CountNumberOfRuns();
+        input_array_,
+        /*output_values_array_data=*/NULLPTR,
+        /*output_run_ends=*/NULLPTR);
+    std::tie(num_valid_runs, num_output_runs, data_buffer_size) =
+        counting_loop.CountNumberOfRuns();
 
-    ARROW_ASSIGN_OR_RAISE(auto output_array_data,
-                          (PreallocateREEData<RunEndType, has_validity_buffer>(
-                              input_array_, num_output_runs,
-                              num_output_runs - num_valid_runs, ctx_->memory_pool())));
+    ARROW_ASSIGN_OR_RAISE(
+        auto output_array_data,
+        PreallocateREEArray(std::move(ree_type), has_validity_buffer, input_length,
+                            num_output_runs, num_output_runs - num_valid_runs,
+                            ctx_->memory_pool(), data_buffer_size));
 
     // Initialize the output pointers
     auto* output_run_ends =
-        output_array_data->child_data[0]->template GetMutableValues<RunEndCType>(1);
-    auto* output_validity =
-        output_array_data->child_data[1]->template GetMutableValues<uint8_t>(0);
-    auto* output_values =
-        output_array_data->child_data[1]->template GetMutableValues<uint8_t>(1);
+        output_array_data->child_data[0]->template GetMutableValues<RunEndCType>(1, 0);
+    auto* output_values_array_data = output_array_data->child_data[1].get();
 
     // Second pass: write the runs
     RunEndEncodingLoop<RunEndType, ValueType, has_validity_buffer> writing_loop(
-        input_length, input_offset, input_validity, input_values, output_validity,
-        output_values, output_run_ends);
+        input_array_, output_values_array_data, output_run_ends);
     [[maybe_unused]] int64_t num_written_runs = writing_loop.WriteEncodedRuns();
     DCHECK_EQ(num_written_runs, num_output_runs);
 
@@ -322,12 +539,12 @@ class RunEndEncodeImpl {
 };
 
 template <typename RunEndType>
-Result<std::shared_ptr<ArrayData>> PreallocateNullREEData(int64_t logical_length,
-                                                          int64_t physical_length,
-                                                          MemoryPool* pool) {
+Result<std::shared_ptr<ArrayData>> PreallocateNullREEArray(int64_t logical_length,
+                                                           int64_t physical_length,
+                                                           MemoryPool* pool) {
   ARROW_ASSIGN_OR_RAISE(
       auto run_ends_buffer,
-      AllocateBuffer(physical_length * RunEndType().byte_width(), pool));
+      AllocateBuffer(TypeTraits<RunEndType>::bytes_required(physical_length), pool));
 
   auto ree_type =
       std::make_shared<RunEndEncodedType>(std::make_shared<RunEndType>(), null());
@@ -351,7 +568,7 @@ Status RunEndEncodeNullArray(KernelContext* ctx, const ArraySpan& input_array,
 
   if (input_length == 0) {
     ARROW_ASSIGN_OR_RAISE(auto output_array_data,
-                          PreallocateNullREEData<RunEndType>(0, 0, ctx->memory_pool()));
+                          PreallocateNullREEArray<RunEndType>(0, 0, ctx->memory_pool()));
     output->value = std::move(output_array_data);
     return Status::OK();
   }
@@ -359,21 +576,20 @@ Status RunEndEncodeNullArray(KernelContext* ctx, const ArraySpan& input_array,
   // Abort if run-end type cannot hold the input length
   RETURN_NOT_OK(ValidateRunEndType<RunEndType>(input_array.length));
 
-  ARROW_ASSIGN_OR_RAISE(auto output_array_data, PreallocateNullREEData<RunEndType>(
+  ARROW_ASSIGN_OR_RAISE(auto output_array_data, PreallocateNullREEArray<RunEndType>(
                                                     input_length, 1, ctx->memory_pool()));
 
   // Write the single run-end this REE has
   auto* output_run_ends =
-      output_array_data->child_data[0]->template GetMutableValues<RunEndCType>(1);
+      output_array_data->child_data[0]->template GetMutableValues<RunEndCType>(1, 0);
   output_run_ends[0] = static_cast<RunEndCType>(input_length);
 
   output->value = std::move(output_array_data);
   return Status::OK();
 }
 
-template <typename ValueType>
 struct RunEndEncodeExec {
-  template <typename RunEndType>
+  template <typename RunEndType, typename ValueType>
   static Status DoExec(KernelContext* ctx, const ExecSpan& span, ExecResult* result) {
     DCHECK(span.values[0].is_array());
     const auto& input_array = span.values[0].array;
@@ -390,20 +606,29 @@ struct RunEndEncodeExec {
     }
   }
 
+  template <typename ValueType>
   static Status Exec(KernelContext* ctx, const ExecSpan& span, ExecResult* result) {
     auto state = checked_cast<const RunEndEncondingState*>(ctx->state());
     switch (state->run_end_type->id()) {
       case Type::INT16:
-        return DoExec<Int16Type>(ctx, span, result);
+        return DoExec<Int16Type, ValueType>(ctx, span, result);
       case Type::INT32:
-        return DoExec<Int32Type>(ctx, span, result);
+        return DoExec<Int32Type, ValueType>(ctx, span, result);
       case Type::INT64:
-        return DoExec<Int64Type>(ctx, span, result);
+        return DoExec<Int64Type, ValueType>(ctx, span, result);
       default:
         break;
     }
     return Status::Invalid("Invalid run end type: ", *state->run_end_type);
   }
+
+  /// \brief The OutputType::Resolver of the "run_end_decode" function.
+  static Result<TypeHolder> ResolveOutputType(
+      KernelContext* ctx, const std::vector<TypeHolder>& input_types) {
+    auto state = checked_cast<const RunEndEncondingState*>(ctx->state());
+    return TypeHolder(std::make_shared<RunEndEncodedType>(state->run_end_type,
+                                                          input_types[0].GetSharedPtr()));
+  }
 };
 
 Result<std::unique_ptr<KernelState>> RunEndEncodeInit(KernelContext*,
@@ -418,52 +643,57 @@ template <typename RunEndType, typename ValueType, bool has_validity_buffer>
 class RunEndDecodingLoop {
  public:
   using RunEndCType = typename RunEndType::c_type;
-  using CType = typename ValueType::c_type;
 
  private:
-  const ArraySpan& input_array_;
+  using ReadWriteValue = ReadWriteValueImpl<ValueType, has_validity_buffer>;
+  using ValueRepr = typename ReadWriteValue::ValueRepr;
 
-  const uint8_t* input_validity_;
-  const void* input_values_;
+  const ArraySpan& input_array_;
+  ReadWriteValue read_write_value_;
   int64_t values_offset_;
 
-  uint8_t* output_validity_;
-  void* output_values_;
+  RunEndDecodingLoop(const ArraySpan& input_array, const ArraySpan& input_array_values,
+                     ArrayData* output_array_data)
+      : input_array_(input_array),
+        read_write_value_(input_array_values, output_array_data),
+        values_offset_(input_array_values.offset) {}
 
  public:
-  explicit RunEndDecodingLoop(const ArraySpan& input_array, ArrayData* output_array_data)
-      : input_array_(input_array) {
-    const ArraySpan& values = ree_util::ValuesArray(input_array);
-    input_validity_ = values.buffers[0].data;
-    input_values_ = values.buffers[1].data;
-    values_offset_ = values.offset;
-
-    output_validity_ = output_array_data->template GetMutableValues<uint8_t>(0);
-    output_values_ = output_array_data->template GetMutableValues<CType>(1);
-  }
-
- private:
-  [[nodiscard]] inline bool ReadValue(CType* out, int64_t read_offset) const {
-    return ReadValueImpl<ValueType, has_validity_buffer>{}.ReadValue(
-        input_validity_, input_values_, out, read_offset);
-  }
-
-  inline void WriteRun(int64_t write_offset, int64_t run_length, bool valid,
-                       CType value) {
-    WriteValueImpl<ValueType, has_validity_buffer>{}.WriteRun(
-        output_validity_, output_values_, write_offset, run_length, valid, value);
+  RunEndDecodingLoop(const ArraySpan& input_array, ArrayData* output_array_data)
+      : RunEndDecodingLoop(input_array, ree_util::ValuesArray(input_array),
+                           output_array_data) {}
+
+  /// \brief For variable-length types, calculate the total length of the data
+  /// buffer needed to store the expanded values.
+  int64_t CalculateOutputDataBufferSize() const {
+    auto& input_array_values = ree_util::ValuesArray(input_array_);
+    DCHECK_EQ(input_array_values.type->id(), ValueType::type_id);
+    if constexpr (is_base_binary_like(ValueType::type_id)) {
+      using offset_type = typename ValueType::offset_type;
+      int64_t data_buffer_size = 0;
+
+      const ree_util::RunEndEncodedArraySpan<RunEndCType> ree_array_span(input_array_);
+      const auto* offsets_buffer =
+          input_array_values.template GetValues<offset_type>(1, 0);
+      auto it = ree_array_span.begin();
+      auto offset = offsets_buffer[input_array_values.offset + it.index_into_array()];
+      while (it != ree_array_span.end()) {
+        const int64_t i = input_array_values.offset + it.index_into_array();
+        const int64_t value_length = offsets_buffer[i + 1] - offset;
+        data_buffer_size += it.run_length() * value_length;
+        offset = offsets_buffer[i + 1];
+        ++it;
+      }
+      return data_buffer_size;
+    }
+    return 0;
   }
 
- public:
   /// \brief Expand all runs into the output array
   ///
   /// \return the number of non-null values written.
   ARROW_NOINLINE int64_t ExpandAllRuns() {
-    // Ensure padding is zeroed in validity bitmap
-    if constexpr (has_validity_buffer) {
-      const int64_t validity_buffer_size = bit_util::BytesForBits(input_array_.length);
-      output_validity_[validity_buffer_size - 1] = 0;
-    }
+    read_write_value_.ZeroValidityPadding(input_array_.length);
 
     const ree_util::RunEndEncodedArraySpan<RunEndCType> ree_array_span(input_array_);
     int64_t write_offset = 0;
@@ -471,9 +701,9 @@ class RunEndDecodingLoop {
     for (auto it = ree_array_span.begin(); it != ree_array_span.end(); ++it) {
       const int64_t read_offset = values_offset_ + it.index_into_array();
       const int64_t run_length = it.run_length();
-      CType value;
-      const bool valid = ReadValue(&value, read_offset);
-      WriteRun(write_offset, run_length, valid, value);
+      ValueRepr value;
+      const bool valid = read_write_value_.ReadValue(&value, read_offset);
+      read_write_value_.WriteRun(write_offset, run_length, valid, value);
       write_offset += run_length;
       output_valid_count += valid ? run_length : 0;
     }
@@ -491,7 +721,6 @@ class RunEndDecodeImpl {
 
  public:
   using RunEndCType = typename RunEndType::c_type;
-  using CType = typename ValueType::c_type;
 
   RunEndDecodeImpl(KernelContext* ctx, const ArraySpan& input_array, ExecResult* out)
       : ctx_{ctx}, input_array_{input_array}, output_{out} {}
@@ -500,18 +729,19 @@ class RunEndDecodeImpl {
   Status Exec() {
     const auto* ree_type = checked_cast<const RunEndEncodedType*>(input_array_.type);
     const int64_t length = input_array_.length;
-    std::shared_ptr<Buffer> validity_buffer = NULLPTR;
-    if constexpr (has_validity_buffer) {
-      ARROW_ASSIGN_OR_RAISE(validity_buffer, AllocateBitmap(length, ctx_->memory_pool()));
+    int64_t data_buffer_size = 0;
+    if constexpr (is_base_binary_like(ValueType::type_id)) {
+      if (length > 0) {
+        RunEndDecodingLoop<RunEndType, ValueType, has_validity_buffer> loop(input_array_,
+                                                                            NULLPTR);
+        data_buffer_size = loop.CalculateOutputDataBufferSize();
+      }
     }
-    ARROW_ASSIGN_OR_RAISE(
-        auto values_buffer,
-        AllocatePrimitiveBuffer(length, *ree_type->value_type(), ctx_->memory_pool()));
 
-    auto output_array_data =
-        ArrayData::Make(ree_type->value_type(), length,
-                        {std::move(validity_buffer), std::move(values_buffer)},
-                        /*child_data=*/std::vector<std::shared_ptr<ArrayData>>{});
+    ARROW_ASSIGN_OR_RAISE(
+        auto output_array_data,
+        PreallocateValuesArray(ree_type->value_type(), has_validity_buffer, length,
+                               kUnknownNullCount, ctx_->memory_pool(), data_buffer_size));
 
     int64_t output_null_count = 0;
     if (length > 0) {
@@ -536,9 +766,8 @@ Status RunEndDecodeNullREEArray(KernelContext* ctx, const ArraySpan& input_array
   return Status::OK();
 }
 
-template <typename ValueType>
 struct RunEndDecodeExec {
-  template <typename RunEndType>
+  template <typename RunEndType, typename ValueType>
   static Status DoExec(KernelContext* ctx, const ExecSpan& span, ExecResult* result) {
     DCHECK(span.values[0].is_array());
     auto& input_array = span.values[0].array;
@@ -555,22 +784,81 @@ struct RunEndDecodeExec {
     }
   }
 
+  template <typename ValueType>
   static Status Exec(KernelContext* ctx, const ExecSpan& span, ExecResult* result) {
     const auto& ree_type = checked_cast<const RunEndEncodedType*>(span.values[0].type());
     switch (ree_type->run_end_type()->id()) {
       case Type::INT16:
-        return DoExec<Int16Type>(ctx, span, result);
+        return DoExec<Int16Type, ValueType>(ctx, span, result);
       case Type::INT32:
-        return DoExec<Int32Type>(ctx, span, result);
+        return DoExec<Int32Type, ValueType>(ctx, span, result);
       case Type::INT64:
-        return DoExec<Int64Type>(ctx, span, result);
+        return DoExec<Int64Type, ValueType>(ctx, span, result);
       default:
         break;
     }
     return Status::Invalid("Invalid run end type: ", *ree_type->run_end_type());
   }
+
+  /// \brief The OutputType::Resolver of the "run_end_decode" function.
+  static Result<TypeHolder> ResolveOutputType(KernelContext*,
+                                              const std::vector<TypeHolder>& in_types) {
+    const auto* ree_type = checked_cast<const RunEndEncodedType*>(in_types[0].type);
+    return TypeHolder(ree_type->value_type());
+  }
 };
 
+template <typename Functor>
+static ArrayKernelExec GenerateREEKernelExec(Type::type type_id) {
+  switch (type_id) {
+    case Type::NA:
+      return Functor::template Exec<NullType>;
+    case Type::BOOL:
+      return Functor::template Exec<BooleanType>;
+    case Type::UINT8:
+    case Type::INT8:
+      return Functor::template Exec<UInt8Type>;
+    case Type::UINT16:
+    case Type::INT16:
+      return Functor::template Exec<UInt16Type>;
+    case Type::UINT32:
+    case Type::INT32:
+    case Type::FLOAT:
+    case Type::DATE32:
+    case Type::TIME32:
+    case Type::INTERVAL_MONTHS:
+      return Functor::template Exec<UInt32Type>;
+    case Type::UINT64:
+    case Type::INT64:
+    case Type::DOUBLE:
+    case Type::DATE64:
+    case Type::TIMESTAMP:
+    case Type::TIME64:
+    case Type::DURATION:
+    case Type::INTERVAL_DAY_TIME:
+      return Functor::template Exec<UInt64Type>;
+    case Type::INTERVAL_MONTH_DAY_NANO:
+      return Functor::template Exec<MonthDayNanoIntervalType>;
+    case Type::DECIMAL128:
+      return Functor::template Exec<Decimal128Type>;
+    case Type::DECIMAL256:
+      return Functor::template Exec<Decimal256Type>;
+    case Type::FIXED_SIZE_BINARY:
+      return Functor::template Exec<FixedSizeBinaryType>;
+    case Type::STRING:
+      return Functor::template Exec<StringType>;
+    case Type::BINARY:
+      return Functor::template Exec<BinaryType>;
+    case Type::LARGE_STRING:
+      return Functor::template Exec<LargeStringType>;
+    case Type::LARGE_BINARY:
+      return Functor::template Exec<LargeBinaryType>;
+    default:
+      DCHECK(false);
+      return FailFunctor<ArrayKernelExec>::Exec;
+  }
+}
+
 static const FunctionDoc run_end_encode_doc(
     "Run-end encode array", ("Return a run-end encoded version of the input array."),
     {"array"}, "RunEndEncodeOptions");
@@ -578,13 +866,6 @@ static const FunctionDoc run_end_decode_doc(
     "Decode run-end encoded array",
     ("Return a decoded version of a run-end encoded input array."), {"array"});
 
-static Result<TypeHolder> VectorRunEndEncodedResolver(
-    KernelContext* ctx, const std::vector<TypeHolder>& input_types) {
-  auto state = checked_cast<const RunEndEncondingState*>(ctx->state());
-  return TypeHolder(std::make_shared<RunEndEncodedType>(state->run_end_type,
-                                                        input_types[0].GetSharedPtr()));
-}
-
 void RegisterVectorRunEndEncode(FunctionRegistry* registry) {
   auto function = std::make_shared<VectorFunction>("run_end_encode", Arity::Unary(),
                                                    run_end_encode_doc);
@@ -596,22 +877,37 @@ void RegisterVectorRunEndEncode(FunctionRegistry* registry) {
   // cannot be encoded as a single run in the output. This is a conscious trade-off as
   // trying to solve this corner-case would complicate the implementation,
   // require reallocations, and could create surprising behavior for users of this API.
-  auto add_kernel = [&function](const std::shared_ptr<DataType>& ty) {
-    auto sig =
-        KernelSignature::Make({InputType(ty)}, OutputType(VectorRunEndEncodedResolver));
-    auto exec = GenerateTypeAgnosticPrimitive<RunEndEncodeExec>(ty);
+  auto add_kernel = [&function](Type::type type_id) {
+    auto sig = KernelSignature::Make({InputType(match::SameTypeId(type_id))},
+                                     OutputType(RunEndEncodeExec::ResolveOutputType));
+    auto exec = GenerateREEKernelExec<RunEndEncodeExec>(type_id);
     VectorKernel kernel(sig, exec, RunEndEncodeInit);
     // A REE has null_count=0, so no need to allocate a validity bitmap for them.
     kernel.null_handling = NullHandling::OUTPUT_NOT_NULL;
     DCHECK_OK(function->AddKernel(std::move(kernel)));
   };
 
+  add_kernel(Type::NA);
+  add_kernel(Type::BOOL);
   for (const auto& ty : NumericTypes()) {
-    add_kernel(ty);
+    add_kernel(ty->id());
   }
-  add_kernel(boolean());
-  add_kernel(null());
-  // TODO(GH-34195): Add support for more types
+  add_kernel(Type::DATE32);
+  add_kernel(Type::DATE64);
+  add_kernel(Type::TIME32);
+  add_kernel(Type::TIME64);
+  add_kernel(Type::TIMESTAMP);
+  add_kernel(Type::DURATION);
+  for (const auto& ty : IntervalTypes()) {
+    add_kernel(ty->id());
+  }
+  add_kernel(Type::DECIMAL128);
+  add_kernel(Type::DECIMAL256);
+  add_kernel(Type::FIXED_SIZE_BINARY);
+  add_kernel(Type::STRING);
+  add_kernel(Type::BINARY);
+  add_kernel(Type::LARGE_STRING);
+  add_kernel(Type::LARGE_BINARY);
 
   DCHECK_OK(registry->AddFunction(std::move(function)));
 }
@@ -620,22 +916,39 @@ void RegisterVectorRunEndDecode(FunctionRegistry* registry) {
   auto function = std::make_shared<VectorFunction>("run_end_decode", Arity::Unary(),
                                                    run_end_decode_doc);
 
-  auto add_kernel = [&function](const std::shared_ptr<DataType>& ty) {
-    for (const auto& run_end_type : {int16(), int32(), int64()}) {
-      auto exec = GenerateTypeAgnosticPrimitive<RunEndDecodeExec>(ty);
-      auto input_type = std::make_shared<RunEndEncodedType>(run_end_type, ty);
-      auto sig = KernelSignature::Make({InputType(input_type)}, OutputType({ty}));
+  auto add_kernel = [&function](Type::type type_id) {
+    for (const auto& run_end_type_id : {Type::INT16, Type::INT32, Type::INT64}) {
+      auto exec = GenerateREEKernelExec<RunEndDecodeExec>(type_id);
+      auto input_type_matcher = match::RunEndEncoded(match::SameTypeId(run_end_type_id),
+                                                     match::SameTypeId(type_id));
+      auto sig = KernelSignature::Make({InputType(std::move(input_type_matcher))},
+                                       OutputType(RunEndDecodeExec::ResolveOutputType));
       VectorKernel kernel(sig, exec);
       DCHECK_OK(function->AddKernel(std::move(kernel)));
     }
   };
 
+  add_kernel(Type::NA);
+  add_kernel(Type::BOOL);
   for (const auto& ty : NumericTypes()) {
-    add_kernel(ty);
+    add_kernel(ty->id());
+  }
+  add_kernel(Type::DATE32);
+  add_kernel(Type::DATE64);
+  add_kernel(Type::TIME32);
+  add_kernel(Type::TIME64);
+  add_kernel(Type::TIMESTAMP);
+  add_kernel(Type::DURATION);
+  for (const auto& ty : IntervalTypes()) {
+    add_kernel(ty->id());
   }
-  add_kernel(boolean());
-  add_kernel(null());
-  // TODO(GH-34195): Add support for more types
+  add_kernel(Type::DECIMAL128);
+  add_kernel(Type::DECIMAL256);
+  add_kernel(Type::FIXED_SIZE_BINARY);
+  add_kernel(Type::STRING);
+  add_kernel(Type::BINARY);
+  add_kernel(Type::LARGE_STRING);
+  add_kernel(Type::LARGE_BINARY);
 
   DCHECK_OK(registry->AddFunction(std::move(function)));
 }
diff --git a/cpp/src/arrow/compute/kernels/vector_run_end_encode_test.cc b/cpp/src/arrow/compute/kernels/vector_run_end_encode_test.cc
index aec3211dd9..d415f9d14c 100644
--- a/cpp/src/arrow/compute/kernels/vector_run_end_encode_test.cc
+++ b/cpp/src/arrow/compute/kernels/vector_run_end_encode_test.cc
@@ -18,9 +18,11 @@
 #include <gtest/gtest.h>
 
 #include "arrow/array.h"
+#include "arrow/array/validate.h"
 #include "arrow/builder.h"
 #include "arrow/compute/api_vector.h"
 #include "arrow/testing/gtest_util.h"
+#include "arrow/type_fwd.h"
 #include "arrow/util/logging.h"
 #include "arrow/util/ree_util.h"
 
@@ -140,6 +142,7 @@ class TestRunEndEncodeDecode : public ::testing::TestWithParam<
     ASSERT_OK(builder->AppendNulls(offset));
     ASSERT_OK(builder->AppendArraySlice(ArraySpan(*child), 0, child->length));
     array->child_data[1] = builder->Finish().ValueOrDie()->Slice(offset)->data();
+    ASSERT_OK(arrow::internal::ValidateArrayFull(*array));
   }
 
   std::shared_ptr<ChunkedArray> AsChunkedArray(const Datum& datum) {
@@ -260,65 +263,104 @@ TEST_P(TestRunEndEncodeDecode, DecodeWithOffsetInChildArray) {
   ASSERT_ARRAYS_EQUAL(*array_without_first, *data.input->chunk(0));
 }
 
-INSTANTIATE_TEST_SUITE_P(
-    EncodeArrayTests, TestRunEndEncodeDecode,
-    ::testing::Combine(
-        ::testing::Values(
-            REETestData::JSON(int32(), "[1, 1, 0, -5, -5, -5, 255, 255]",
-                              "[1, 0, -5, 255]", "[2, 3, 6, 8]"),
-            REETestData::JSON(uint32(), "[null, 1, 1, null, null, 5]",
-                              "[null, 1, null, 5]", "[1, 3, 5, 6]"),
-            REETestData::JSON(boolean(), "[true, true, true, false, false]",
-                              "[true, false]", "[3, 5]"),
-            REETestData::JSON(
-                boolean(), "[true, false, true, false, true, false, true, false, true]",
-                "[true, false, true, false, true, false, true, false, true]",
-                "[1, 2, 3, 4, 5, 6, 7, 8, 9]"),
-            REETestData::JSON(uint32(), "[1]", "[1]", "[1]"),
-
-            REETestData::JSON(boolean(),
-                              "[true, true, true, false, null, null, false, null, null]",
-                              "[true, false, null, false, null]", "[3, 4, 6, 7, 9]"),
-            REETestData::JSONChunked(boolean(),
-                                     {"[true, true]", "[true, false, null, null, false]",
-                                      "[null, null]"},
-                                     {"[true]", "[true, false, null, false]", "[null]"},
-                                     {"[2]", "[1, 2, 4, 5]", "[2]"}),
-            REETestData::JSON(int32(), "[1, 1, 0, -5, -5, -5, 255, 255]", "[-5, 255]",
-                              "[3, 5]", 3),
-            REETestData::JSONChunked(int32(), {"[1, 1, 0, -5, -5]", "[-5, 255, 255]"},
-                                     {"[-5]", "[-5, 255]"}, {"[2]", "[1, 3]"}, 3),
-            REETestData::JSON(uint32(), "[4, 5, 5, null, null, 5]", "[5, null, 5]",
-                              "[1, 3, 4]", 2),
-            REETestData::JSONChunked(uint32(), {"[4, 5, 5, null, null, 5]"},
-                                     {"[5, null, 5]"}, {"[1, 3, 4]"}, 2),
-            REETestData::JSON(boolean(), "[true, true, false, false, true]",
-                              "[false, true]", "[2, 3]", 2),
-            REETestData::JSONChunked(boolean(), {"[true]", "[true, false, false, true]"},
-                                     {"[false, true]"}, {"[2, 3]"}, 2),
-            REETestData::JSON(boolean(), "[true, true, true, false, null, null, false]",
-                              "[null, false]", "[1, 2]", 5),
-
-            REETestData::JSON(float64(), "[]", "[]", "[]"),
-            REETestData::JSONChunked(float64(), {"[]"}, {"[]"}, {"[]"}),
-            REETestData::JSON(boolean(), "[]", "[]", "[]"),
-            REETestData::JSONChunked(boolean(), {"[]"}, {"[]"}, {"[]"}),
-
-            REETestData::NullArray(4),
-            REETestData::NullArray(std::numeric_limits<int16_t>::max()),
-            REETestData::NullArray(std::numeric_limits<int16_t>::max(), 1000),
-
-            REETestData::TypeMinMaxNull<Int8Type>(),
-            REETestData::TypeMinMaxNull<UInt8Type>(),
-            REETestData::TypeMinMaxNull<Int16Type>(),
-            REETestData::TypeMinMaxNull<UInt16Type>(),
-            REETestData::TypeMinMaxNull<Int32Type>(),
-            REETestData::TypeMinMaxNull<UInt32Type>(),
-            REETestData::TypeMinMaxNull<Int64Type>(),
-            REETestData::TypeMinMaxNull<UInt64Type>(),
-            REETestData::TypeMinMaxNull<FloatType>(),
-            REETestData::TypeMinMaxNull<DoubleType>()),
-        ::testing::Values(int16(), int32(), int64())));
+std::vector<REETestData> GenerateTestData() {
+  std::vector<REETestData> test_data = {
+      REETestData::JSON(int32(), "[1, 1, 0, -5, -5, -5, 255, 255]", "[1, 0, -5, 255]",
+                        "[2, 3, 6, 8]"),
+      REETestData::JSON(uint32(), "[null, 1, 1, null, null, 5]", "[null, 1, null, 5]",
+                        "[1, 3, 5, 6]"),
+      REETestData::JSON(boolean(), "[true, true, true, false, false]", "[true, false]",
+                        "[3, 5]"),
+      REETestData::JSON(boolean(),
+                        "[true, false, true, false, true, false, true, false, true]",
+                        "[true, false, true, false, true, false, true, false, true]",
+                        "[1, 2, 3, 4, 5, 6, 7, 8, 9]"),
+      REETestData::JSON(uint32(), "[1]", "[1]", "[1]"),
+
+      REETestData::JSON(boolean(),
+                        "[true, true, true, false, null, null, false, null, null]",
+                        "[true, false, null, false, null]", "[3, 4, 6, 7, 9]"),
+      REETestData::JSONChunked(
+          boolean(), {"[true, true]", "[true, false, null, null, false]", "[null, null]"},
+          {"[true]", "[true, false, null, false]", "[null]"},
+          {"[2]", "[1, 2, 4, 5]", "[2]"}),
+      REETestData::JSON(int32(), "[1, 1, 0, -5, -5, -5, 255, 255]", "[-5, 255]", "[3, 5]",
+                        3),
+      REETestData::JSONChunked(int32(), {"[1, 1, 0, -5, -5]", "[-5, 255, 255]"},
+                               {"[-5]", "[-5, 255]"}, {"[2]", "[1, 3]"}, 3),
+      REETestData::JSON(uint32(), "[4, 5, 5, null, null, 5]", "[5, null, 5]", "[1, 3, 4]",
+                        2),
+      REETestData::JSONChunked(uint32(), {"[4, 5, 5, null, null, 5]"}, {"[5, null, 5]"},
+                               {"[1, 3, 4]"}, 2),
+      REETestData::JSON(boolean(), "[true, true, false, false, true]", "[false, true]",
+                        "[2, 3]", 2),
+      REETestData::JSONChunked(boolean(), {"[true]", "[true, false, false, true]"},
+                               {"[false, true]"}, {"[2, 3]"}, 2),
+      REETestData::JSON(boolean(), "[true, true, true, false, null, null, false]",
+                        "[null, false]", "[1, 2]", 5),
+
+      REETestData::JSON(float64(), "[]", "[]", "[]"),
+      REETestData::JSONChunked(float64(), {"[]"}, {"[]"}, {"[]"}),
+      REETestData::JSON(boolean(), "[]", "[]", "[]"),
+      REETestData::JSONChunked(boolean(), {"[]"}, {"[]"}, {"[]"}),
+
+      REETestData::NullArray(4),
+      REETestData::NullArray(std::numeric_limits<int16_t>::max()),
+      REETestData::NullArray(std::numeric_limits<int16_t>::max(), 1000),
+
+      REETestData::TypeMinMaxNull<Int8Type>(),
+      REETestData::TypeMinMaxNull<UInt8Type>(),
+      REETestData::TypeMinMaxNull<Int16Type>(),
+      REETestData::TypeMinMaxNull<UInt16Type>(),
+      REETestData::TypeMinMaxNull<Int32Type>(),
+      REETestData::TypeMinMaxNull<UInt32Type>(),
+      REETestData::TypeMinMaxNull<Int64Type>(),
+      REETestData::TypeMinMaxNull<UInt64Type>(),
+      REETestData::TypeMinMaxNull<FloatType>(),
+      REETestData::TypeMinMaxNull<DoubleType>(),
+      // A few temporal types
+      REETestData::JSON(date32(),
+                        "[86400, 86400, 0, 432000, 432000, 432000, 22075200, 22075200]",
+                        "[86400, 0, 432000, 22075200]", "[2, 3, 6, 8]"),
+      REETestData::JSON(date64(),
+                        "[86400000, 86400000, 0, 432000000, 432000000, 432000000, "
+                        "22032000000, 22032000000]",
+                        "[86400000, 0, 432000000, 22032000000]", "[2, 3, 6, 8]"),
+      REETestData::JSON(time32(TimeUnit::SECOND), "[1, 1, 0, 5, 5, 5, 255, 255]",
+                        "[1, 0, 5, 255]", "[2, 3, 6, 8]"),
+      REETestData::JSON(time64(TimeUnit::MICRO), "[1, 1, 0, 5, 5, 5, 255, 255]",
+                        "[1, 0, 5, 255]", "[2, 3, 6, 8]"),
+      // Decimal and fixed size binary types
+      REETestData::JSON(decimal128(4, 1),
+                        R"(["1.0", "1.0", "0.0", "5.2", "5.2", "5.2", "255.0", "255.0"])",
+                        R"(["1.0", "0.0", "5.2", "255.0"])", "[2, 3, 6, 8]"),
+      REETestData::JSON(decimal256(4, 1),
+                        R"(["1.0", "1.0", "0.0", "5.2", "5.2", "5.2", "255.0", "255.0"])",
+                        R"(["1.0", "0.0", "5.2", "255.0"])", "[2, 3, 6, 8]"),
+      REETestData::JSON(fixed_size_binary(3),
+                        R"(["abc", "abc", "abc", "def", "def", "def", "ghi", "ghi"])",
+                        R"(["abc", "def", "ghi"])", "[3, 6, 8]"),
+      REETestData::JSON(
+          fixed_size_binary(3),
+          R"([null, "abc", "abc", "abc", "def", "def", "def", "ghi", "ghi", null, null])",
+          R"([null, "abc", "def", "ghi", null])", "[1, 4, 7, 9, 11]"),
+  };
+  for (auto& binary_type : {binary(), large_binary(), utf8(), large_utf8()}) {
+    test_data.push_back(REETestData::JSON(
+        binary_type, R"(["abc", "abc", "", "", "de", "de", "de", "ghijkl", "ghijkl"])",
+        R"(["abc", "", "de", "ghijkl"])", "[2, 4, 7, 9]"));
+    test_data.push_back(REETestData::JSON(
+        binary_type,
+        R"(["abc", "abc", "", "", "de", "de", "de", null, null, "ghijkl", "ghijkl"])",
+        R"(["abc", "", "de", null, "ghijkl"])", "[2, 4, 7, 9, 11]"));
+  }
+  return test_data;
+}
+
+INSTANTIATE_TEST_SUITE_P(EncodeArrayTests, TestRunEndEncodeDecode,
+                         ::testing::Combine(::testing::ValuesIn(GenerateTestData()),
+                                            ::testing::Values(int16(), int32(),
+                                                              int64())));
 
 }  // namespace compute
 }  // namespace arrow